diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index d9030517..fb635944 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -80,6 +80,11 @@ export default class DataHandler { args: item.command.args, }; + if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) { + this.redis.emit("moved"); + return; + } + this.redis.handleReconnection(err, item); } diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 5c728560..89b17508 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -160,6 +160,10 @@ export default class ClusterSubscriber { // Ignore the errors since they're handled in the connection pool. this.subscriber.on("error", noop); + this.subscriber.on("moved", () => { + this.emitter.emit("forceRefresh"); + }); + // The node we lost connection to may not come back up in a // reasonable amount of time (e.g. a slave that's taken down // for maintainence), we could potentially miss many published diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index b55b0482..be85d05f 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -31,7 +31,7 @@ export default class ClusterSubscriberGroup { * * @param cluster */ - constructor(private cluster: Cluster) { + constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { cluster.on("+node", (redis) => { this._addSubscriber(redis); @@ -44,6 +44,10 @@ export default class ClusterSubscriberGroup { cluster.on("refresh", () => { this._refreshSlots(cluster); }); + + cluster.on("forceRefresh", () => { + refreshSlotsCacheCallback(); + }); } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index a046c3b6..b598a161 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -126,7 +126,7 @@ class Cluster extends Commander { this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup(this); + this.shardedSubscribers = new ClusterSubscriberGroup(this, this.refreshSlotsCache.bind(this)); if ( this.options.redisOptions &&