Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ jobs:
with:
redis-version: latest

- run: npm run cluster:setup

- run: npm install
- run: npm run lint
- run: npm run build
- run: npm run test:tsd
- run: npm run test:cov || npm run test:cov || npm run test:cov
- run: npm run test:cluster
3 changes: 3 additions & 0 deletions .github/workflows/test_with_cov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ jobs:
with:
redis-version: latest

- run: npm run cluster:setup

- run: npm install
- run: npm run lint
- run: npm run build
- run: npm run test:tsd
- run: npm run test:cov || npm run test:cov || npm run test:cov
- run: npm run test:cluster
- name: Coveralls
if: matrix.node == '18.x'
uses: coverallsapp/github-action@master
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
"built/"
],
"scripts": {
"cluster:setup": "docker compose -f test/cluster/docker-compose.cluster.yml up -d --wait",
"cluster:teardown": "docker compose -f test/cluster/docker-compose.cluster.yml down --volumes --remove-orphans",
"test:tsd": "npm run build && tsd",
"test:js": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \"test/helpers/*.ts\" \"test/unit/**/*.ts\" \"test/functional/**/*.ts\"",
"test:cov": "nyc npm run test:js",
"test:js:cluster": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \"test/cluster/**/*.ts\"",
"test:cluster": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \"test/cluster/**/*.ts\"",
"test": "npm run test:js && npm run test:tsd",
"lint": "eslint --ext .js,.ts ./lib",
"docs": "npx typedoc --logLevel Error --excludeExternals --excludeProtected --excludePrivate --readme none lib/index.ts",
Expand Down
6 changes: 3 additions & 3 deletions test/cluster/basic.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { expect } from "chai";
import Redis, { Cluster } from "../../lib";


const masters = [30000, 30001, 30002];
const replicas = [30003, 30004, 30005];
const masters = [3000, 3001, 3002];
const replicas = [3003, 3004, 3005];

async function cleanup() {
for (const port of masters) {
const redis = new Redis(port);
await redis.flushall();
await redis.script("FLUSH");
await redis.quit();
}
// Wait for replication
await new Promise((resolve) => setTimeout(resolve, 500));
Expand Down
76 changes: 46 additions & 30 deletions test/cluster/cluster_subscriber_group.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {expect} from "chai";
import Redis, {Cluster} from "../../lib";
import redis from "../../lib";

const host = "127.0.0.1";
const masters = [30000, 30001, 30002];
const masters = [3000, 3001, 3002];
const port: number = masters[0]

/**
Expand All @@ -22,33 +21,44 @@ describe("cluster:ClusterSubscriberGroup", () => {

const cluster: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});

//Subscribe to the three channels
cluster.ssubscribe("channel:1:{1}", "channel:2:{1}", "channel:3:{1}" ).then( ( count: number ) => {
console.log("Subscribed to 3 channels.");
expect(count).to.equal(3);
});

//Publish a message to one of the channels
cluster.spublish("channel:2:{1}", "This is a test message to channel 2.").then((value: number) => {
console.log("Published a message to channel:2:{1} and expect one subscriber.");
expect(value).to.be.eql(1);
});
// Subscribe to the three channels
const subscribeCount = await cluster.ssubscribe(
"channel:1:{1}",
"channel:2:{1}",
"channel:3:{1}"
);
expect(subscribeCount, "Should be subscribed to 3 channels.").to.equal(3);

// Publish a message to one of the channels
const publishValue = await cluster.spublish(
"channel:2:{1}",
"This is a test message to channel 2."
);
expect(
publishValue,
"Should have published a message to channel:2:{1} and expect one subscriber."
).to.be.eql(1);

await sleep(500);

//Unsubscribe from one of the channels
cluster.sunsubscribe("channel:2:{1}").then( ( count: number ) => {
console.log("Unsubscribed from channel:2:{1}.");
expect(count).to.equal(2);
});
// Unsubscribe from one of the channels
const unsubscribeCount = await cluster.sunsubscribe("channel:2:{1}");
expect(unsubscribeCount, "Should be subscribed from 2 channels.").to.equal(
2
);

await sleep(500);

//Publish a message to the channel from which we unsubscribed
cluster.spublish("channel:2:{1}", "This is a test message to channel 2.").then((value: number) => {
console.log("Published a second message to channel:2:{1} and expect to have nobody listening.");
expect(value).to.be.eql(0);
});
// Publish a message to the channel from which we unsubscribed
const value = await cluster.spublish(
"channel:2:{1}",
"This is a test message to channel 2."
);

expect(
value,
"Published a second message to channel:2:{1} and expect to have nobody listening."
).to.be.eql(0);

await sleep(1000);
await cluster.disconnect();
Expand Down Expand Up @@ -167,22 +177,22 @@ describe("cluster:ClusterSubscriberGroup", () => {
const channel = "channel:test:3";

//Used as control connections for orchestrating the slot migration
const source: Redis = new Redis({host: host, port: 30000});
const target: Redis = new Redis({host: host, port: 30001});
const source: Redis = new Redis({host: host, port: 3000});
const target: Redis = new Redis({host: host, port: 3001});

//Initialize the publisher cluster connections and verify that the slot is on node 1
const publisher: Cluster = new Cluster([{host: host, port: port}]);

publisher.on("ready", () => {
expect(publisher.slots[slot][0]).eql("127.0.0.1:30000");
expect(publisher.slots[slot][0]).eql("127.0.0.1:3000");
});


//Initialize the subscriber cluster connections and verify that the slot is on node 1
const subscriber: Cluster = new Cluster([{host: host, port: port}], {shardedSubscribers: true});

subscriber.on("ready", () => {
expect(subscriber.slots[slot][0]).eql("127.0.0.1:30000")
expect(subscriber.slots[slot][0]).eql("127.0.0.1:3000")
});

//The subscription callback. We should receive both messages
Expand Down Expand Up @@ -227,14 +237,20 @@ describe("cluster:ClusterSubscriberGroup", () => {
//TODO: What if there is no traffic on the cluster connection?
status = await subscriber.set("match_slot{" + channel + "}", "channel 3");
expect(status).to.eql("OK");
expect(subscriber.slots[slot][0]).eql("127.0.0.1:30001");
expect(subscriber.slots[slot][0]).eql("127.0.0.1:3001");

//Wait a bit to let the subscriber resubscribe to previous channels
await sleep(1000);

const numSubscribers = await publisher.spublish(channel, "This is a test message #2 to slot "
+ slot + " on channel " + channel + ".");
expect(publisher.slots[slot][0]).eql("127.0.0.1:30001");
expect(publisher.slots[slot][0]).eql("127.0.0.1:3001");
expect(numSubscribers).to.eql(1);

await sleep(1000);
subscriber.disconnect();
publisher.disconnect();
source.disconnect();
target.disconnect();
});
});
});
19 changes: 19 additions & 0 deletions test/cluster/docker-compose.cluster.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
redis-cluster:
image: redislabs/client-libs-test:${REDIS_VERSION:-8.2.1-pre}
environment:
REDIS_CLUSTER: "yes"
NODES: "6"
REPLICAS: "1"
ports:
- 3000:3000
- 3001:3001
- 3002:3002
- 3003:3003
- 3004:3004
- 3005:3005
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 3000 cluster info | grep -q 'cluster_state:ok'"]
interval: 1s
timeout: 10s
retries: 5
29 changes: 29 additions & 0 deletions test/functional/spub_ssub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as sinon from "sinon";
import Redis from "../../lib/Redis";
import { expect } from "chai";

Expand Down Expand Up @@ -143,4 +144,32 @@ describe("spub/ssub", function () {
redis.disconnect(true);
});
});

// This ensures we don't get CROSSSLOT exceptions
it("should call ssubscribe individually for each channel during auto-resubscription", async () => {
const subscriber = new Redis({ autoResubscribe: true });

await subscriber.ping();

subscriber.ssubscribe("shard1");
subscriber.ssubscribe("shard2");
subscriber.ssubscribe("shard3");

const stub = sinon.stub(Redis.prototype, "ssubscribe");

subscriber.disconnect({ reconnect: true });

await new Promise((resolve) => {
subscriber.once("ready", resolve);
});

await subscriber.ping();

expect(stub.getCall(0).args).to.deep.equal(["shard1"]);
expect(stub.getCall(1).args).to.deep.equal(["shard2"]);
expect(stub.getCall(2).args).to.deep.equal(["shard3"]);

stub.restore();
subscriber.disconnect();
});
});
Loading