diff --git a/redis_test.go b/redis_test.go index 0df59187..f505e81d 100644 --- a/redis_test.go +++ b/redis_test.go @@ -628,6 +628,83 @@ func testPubSub(t *testing.T, client Client) { } } +func testPubSubSharded(t *testing.T, client Client) { + msgs := 5000 + mmap := make(map[string]struct{}) + for i := 0; i < msgs; i++ { + mmap[strconv.Itoa(i)] = struct{}{} + } + t.Logf("testing pubsub with %v messages\n", msgs) + jobs, wait := parallel(10) + + ctx := context.Background() + + messages := make(chan string, 10) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err := client.Receive(ctx, client.B().Ssubscribe().Channel("ch1").Build(), func(msg PubSubMessage) { + messages <- msg.Message + }) + if err != nil { + t.Errorf("unexpected subscribe response %v", err) + } + wg.Done() + }() + + go func() { + time.Sleep(time.Second) + for i := 0; i < msgs; i++ { + msg := strconv.Itoa(i) + ch := "ch1" + jobs <- func() { + if err := client.Do(context.Background(), client.B().Spublish().Channel(ch).Message(msg).Build()).Error(); err != nil { + t.Errorf("unexpected publish response %v", err) + } + } + } + wait() + }() + + for message := range messages { + delete(mmap, message) + if len(mmap) == 0 { + close(messages) + } + } + + for _, resp := range client.DoMulti(context.Background(), + client.B().Sunsubscribe().Channel("ch1").Build()) { + if err := resp.Error(); err != nil { + t.Fatal(err) + } + } + wg.Wait() + + t.Logf("testing pubsub hooks with 500 messages\n") + + for i := 0; i < 500; i++ { + cc, cancel := client.Dedicate() + msg := strconv.Itoa(i) + ch := cc.SetPubSubHooks(PubSubHooks{ + OnMessage: func(m PubSubMessage) { + cc.SetPubSubHooks(PubSubHooks{}) + }, + }) + if err := cc.Do(context.Background(), client.B().Ssubscribe().Channel("ch2").Build()).Error(); err != nil { + t.Fatal(err) + } + if err := client.Do(context.Background(), client.B().Spublish().Channel("ch2").Message(msg).Build()).Error(); err != nil { + t.Fatal(err) + } + if err := <-ch; err != nil { + t.Fatal(err) + } + cancel() + } +} + func testLua(t *testing.T, client Client) { script := NewLuaScript("return {KEYS[1],ARGV[1]}") @@ -679,7 +756,7 @@ func TestSingleClientIntegration(t *testing.T) { t.Fatal(err) } - run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua) + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) run(t, client, testFlush) client.Close() @@ -702,7 +779,7 @@ func TestSentinelClientIntegration(t *testing.T) { t.Fatal(err) } - run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua) + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) run(t, client, testFlush) client.Close() @@ -722,7 +799,7 @@ func TestClusterClientIntegration(t *testing.T) { if err != nil { t.Fatal(err) } - run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua) + run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua) client.Close() }