Skip to content

Commit

Permalink
refactor: move connection_pool to TypeScript
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Jul 18, 2018
1 parent 1397247 commit 3c7519c
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 197 deletions.
8 changes: 7 additions & 1 deletion lib/ScanStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import {Readable, ReadableOptions} from 'stream'

/**
* Options for ScanStream
*
* @export
* @interface IScanStreamOptions
* @extends {ReadableOptions}
*/
export interface IScanStreamOptions extends ReadableOptions {
key?: string
match?: string
Expand Down
43 changes: 43 additions & 0 deletions lib/SubscriptionSet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
type AddSet = 'subscribe' | 'psubscribe'
type DelSet = 'unsubscribe' | 'punsubscribe'

/**
* Tiny class to simplify dealing with subscription set
*
* @export
* @class SubscriptionSet
*/
export default class SubscriptionSet {
private set: {[key: string]: {[channel: string]: boolean}} = {
subscribe: {},
psubscribe: {}
}

add (set: AddSet, channel: string) {
this.set[mapSet(set)][channel] = true
}

del (set: DelSet, channel: string) {
delete this.set[mapSet(set)][channel]
}

channels (set: AddSet | DelSet): string[] {
return Object.keys(this.set[mapSet(set)])
}

isEmpty (): boolean {
return this.channels('subscribe').length === 0 &&
this.channels('psubscribe').length === 0
}
}


function mapSet (set: AddSet | DelSet): AddSet {
if (set === 'unsubscribe') {
return 'subscribe'
}
if (set === 'punsubscribe') {
return 'psubscribe'
}
return set
}
154 changes: 154 additions & 0 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import {parseURL} from '../utils'
import {EventEmitter} from 'events'
import {noop, defaults} from '../utils/lodash'

const Redis = require('../redis')
const debug = require('../utils/debug')('ioredis:cluster:connectionPool')

type NODE_TYPE = 'all' | 'master' | 'slave'

interface IRedisOptions {
[key: string]: any
}

interface IRedisOptionsWithKey extends IRedisOptions {
key: string
}

export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: {[key in NODE_TYPE]: {[key: string]: any}} = {
all: {},
master: {},
slave: {}
}

This comment has been minimized.

Copy link
@AVVS

AVVS Jul 18, 2018

Collaborator

maybe use Map here as we use delete, which makes obj go to dictionary mode? With Maps it should be better

This comment has been minimized.

Copy link
@AVVS

AVVS Jul 18, 2018

Collaborator

And then later can do .size() instead of Object.keys().length and so on


private specifiedOptions: {[key: string]: any} = {}

constructor (private redisOptions) {
super()
}

/**
* Find or create a connection to the node
*
* @param {IRedisOptions} node
* @param {boolean} [readOnly=false]
* @returns {*}
* @memberof ConnectionPool
*/
public findOrCreate (node: IRedisOptions, readOnly: boolean = false): any {
setKey(node)
readOnly = Boolean(readOnly)

if (this.specifiedOptions[node.key]) {
Object.assign(node, this.specifiedOptions[node.key])
} else {
this.specifiedOptions[node.key] = node
}

let redis
if (this.nodes.all[node.key]) {
redis = this.nodes.all[node.key]
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly
debug('Change role of %s to %s', node.key, readOnly ? 'slave' : 'master')
redis[readOnly ? 'readonly' : 'readwrite']().catch(noop)
if (readOnly) {
delete this.nodes.master[node.key]
this.nodes.slave[node.key] = redis
} else {
delete this.nodes.slave[node.key]
this.nodes.master[node.key] = redis
}
}
} else {
debug('Connecting to %s as %s', node.key, readOnly ? 'slave' : 'master')
redis = new Redis(defaults({
// Never try to reconnect when a node is lose,
// instead, waiting for a `MOVED` error and
// fetch the slots again.
retryStrategy: null,
// Offline queue should be enabled so that
// we don't need to wait for the `ready` event
// before sending commands to the node.
enableOfflineQueue: true,
readOnly: readOnly
}, node, this.redisOptions, { lazyConnect: true }))
this.nodes.all[node.key] = redis
this.nodes[readOnly ? 'slave' : 'master'][node.key] = redis

redis.once('end', () => {
delete this.nodes.all[node.key]
delete this.nodes.master[node.key]
delete this.nodes.slave[node.key]
this.emit('-node', redis)
if (!Object.keys(this.nodes.all).length) {
this.emit('drain')
}
})

this.emit('+node', redis)

redis.on('error', function (error) {
this.emit('nodeError', error)
})
}

return redis
}

/**
* Reset the pool with a set of nodes.
* The old node will be removed.
*
* @param {(Array<string | number | object>)} nodes
* @memberof ConnectionPool
*/
public reset (nodes: Array<string | number | object>): void {
const newNodes = {}
nodes.forEach((node) => {
const options: {port?: number | string, db?: number, key?: string} = {}
if (typeof node === 'object') {
defaults(options, node)
} else if (typeof node === 'string') {
defaults(options, parseURL(node))
} else if (typeof node === 'number') {
options.port = node
} else {
throw new Error('Invalid argument ' + node)
}
if (typeof options.port === 'string') {
options.port = parseInt(options.port, 10)
}
delete options.db

setKey(options)
newNodes[options.key] = options
}, this)

Object.keys(this.nodes.all).forEach((key) => {
if (!newNodes[key]) {
debug('Disconnect %s because the node does not hold any slot', key)
this.nodes.all[key].disconnect()
}
})
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key]
this.findOrCreate(node, node.readOnly)
})
}
}

/**
* Set key property
*
* @private
*/
function setKey(node: IRedisOptions): IRedisOptionsWithKey {
node = node || {}
node.port = node.port || 6379
node.host = node.host || '127.0.0.1'
node.key = node.key || node.host + ':' + node.port
return <IRedisOptionsWithKey>node
}
20 changes: 20 additions & 0 deletions lib/cluster/DelayQueue.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
import * as Deque from 'denque'
const debug = require('../utils/debug')('ioredis:delayqueue')

/**
* Options for DelayQueue
*
* @export
* @interface IDelayQueueOptions
*/
export interface IDelayQueueOptions {
callback?: Function
timeout: number
}

/**
* Queue that runs items after specified duration
*
* @export
* @class DelayQueue
*/
export default class DelayQueue {
private queues: {[key: string]: Deque | null} = {}
private timeouts: {[key: string]: NodeJS.Timer} = {}

/**
* Add a new item to the queue
*
* @param {string} bucket bucket name
* @param {Function} item function that will run later
* @param {IDelayQueueOptions} options
* @memberof DelayQueue
*/
public push (bucket: string, item: Function, options: IDelayQueueOptions): void {
const callback = options.callback || process.nextTick
if (!this.queues[bucket]) {
Expand Down
151 changes: 0 additions & 151 deletions lib/cluster/connection_pool.js

This file was deleted.

0 comments on commit 3c7519c

Please sign in to comment.