Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add psubscribe/punsubscribe to PubSub api #691

Closed
wants to merge 3 commits into from

Conversation

cwe-dixa
Copy link
Contributor

Hello :-)

This PR adds support for PSUBSCRIBE/PUNSUBSCRIBE commands to the PubSub api. They function in roughly the same way as channel pubsub works today.

Be aware that pattern subscriptions to keyspace events require that
keyspace event notifications are enabled. In clusters, it is also
necessary to subscribe to keyspace notifications on each individual
redis node, since keyspace events are not broadcast across the cluster.
See https://redis.io/docs/manual/keyspace-notifications/#events-in-a-cluster.

Be aware that pattern subscriptions to keyspace events require that
keyspace event notifications are enabled. In clusters, it is also
necessary to subscribe to keyspace notifications on each individual
redis node, since keyspace events are not broadcast across the cluster.
See https://redis.io/docs/manual/keyspace-notifications/#events-in-a-cluster.
@gvolpe
Copy link
Member

gvolpe commented May 11, 2022

@cwe-dixa thank you! I will have a look when I get a chance, just came back from holidays.

@cwe-dixa
Copy link
Contributor Author

Hope it was relaxing :-)

I'm not sure what to make of the error. It works locally for me and in our fork https://github.com/dixahq/redis4cats/actions/runs/2311908800. Given that it is a timeout error and not an assertion error, I think it must be timing out during either resource acquisition or release, but I'm not sure at what step.
Have you seen errors related to acquiring or releasing pubsub streams before?

Copy link
Member

@gvolpe gvolpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks legit 👍🏽

I'm not that familiar with the pub-sub API anyway, but my only concern is the timeout-related issues in the tests.

After a re-run, the test passed, but we had a few flaky tests in the past, thus timeouts in tests are to be avoided whenever possible.

_ <- Resource.eval(commands.set(key, ""))
_ <- Resource.eval(commands.expire(key, 1 seconds))
} yield ref.get
resources.use(get => get.iterateUntil(_.isDefined).timeout(2 seconds)).map { result =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeouts in tests usually end up becoming flaky. If there's a better way to test this, that would be preferable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to come up with a better way of testing it, but I can't come up with one. I have worked around it with a sleep for now.

We need to start pulling prior to publishing the message, otherwise we won't get the published message. So I start the pulling of the stream on another fiber and I could wait until resource acquisition/stream initialization on the fiber is done prior to proceeding. But that does not guarantee that we are actually subscribed. In principle, the subscription is not active until redis responds with subscribe. Today, we do not wait until redis responds, so there is an interval between the stream having started and the subscription actually being active. This is what the sleep solves in the tests.

We could change the stream implementation so it blocks the resource acquisition in PubSubInternals.channel and PubSubInternals.pattern until the subscribe event is received. I would be happy to implement this (I think this behavior makes more sense). But it is a change in behavior from how redis4cats work today.

What are your thoughts on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cwe-dixa I had a look at the code and it seems we could know when we are subscribed from both psubscribed and subscribed in defaultListener. I think the best idea would be to add the ability to fetch this information (also unsubscribed and punsubscribed) all the way to PubSubCommands, which could be internally powered by a Deferred[F, Unit] or so. E.g.

private val gate: Deferred[F, Unit] = ???

/* semantically blocks until subscribed */
def subscribed(channel: RedisChannel[K]): F[Unit] = gate.get

Or we could also expose it as def isSubscribed(channel: RedisChannel[K]): F[Boolean], but I think the former would have a better UX.

In my experience, using either sleep or timeout in tests becomes flakyness issues sooner or later, and we already have an unsolved one related to streams, so I'd prefer to avoid more maintenance burden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long delay :-(

We could add a subscribed method as suggested to PubSubCommands, but that does require quite a bit of care to handle correctly... And I'm starting to think that maybe we should just not handle it.

If we do add a subscribed method to PubSubCommands we need to consider what happens if we start two subscriptions to the same channel. Should PubSubCommands defer to the first or the second one? What if one of them is cancelled?
Having subscribed seems to imply also having a psubscribed, but patterns have a hierarchy of sorts, unlike channels. Does psubscribed(Pattern("f*")) imply psubscribed(Pattern("foo*"))?

Also, for newcomers, it might not be obvious that something like

for {
  stream <- pubsub.subscribe(RedisChannel("foo"))
  _ <- pubsub.subscribed(RedisChannel("foo"))
  // do something with stream
} yield ()

will block forever, since the Stream won't emit the subscribe command until it is pulled, hence .subscribed(RedisChannel("foo")) will never complete.

For the above, I'm starting to favor simply adding some retries to the tests, which I have done. Please let me know what you think :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, even with the retries, the test hangs sometimes, especially on CI.
It seems that there is some race condition in the listener. I can't explain why, but what I observe is:

  1. Test runs fine and gets to releasing resource
  2. Test tries to release resources (including the subscription)
  3. sub listener is removed (see PubSubInternals#pattern)
  4. dispatcher is closed/stopped (see PubSubInternals#pattern)
  5. Listener from 3 receives one more event and tries to dispatch it
  6. Dispatch call blocks forever (<-- I assume this prevent the connection cleanup as well)
  7. Test times out

I think something weird is happening with the dispatcher. I have tried dispatching on a closed dispatcher and that just throws an error. But we are not getting an error in these tests. Instead, it just block indefinitely.

Using either unsafeRunAndForget or unsafeRunTimed in the listener makes the test pass, but I'm not sure if we should be using either of those. Especially since I don't fully understand what the issue is.
I'm hoping that you do 🤞

listener.joinWith(IO.raiseError(new IllegalStateException("Fiber should not be cancelled")))
)
} yield firstEvent.headOption
resources.use(IO.pure).map { result =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be just resources.use { result => ... }? Same in the other scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It certainly can. Refactored

@kyri-petrou
Copy link
Collaborator

I have to admit I'm not to familiar with the pubsub api, nonetheless the code looks really good as @gvolpe already pointed out.

One thing though, maybe we should extend the examples on the site to include the new functionality? Otherwise it might go unappreciated 😅

@cwe-dixa cwe-dixa requested a review from gvolpe June 2, 2022 05:56
@gvolpe
Copy link
Member

gvolpe commented Jun 9, 2022

@cwe-dixa just letting you know that I saw this but it demands quite a lot of my time, which is currently really scarce. I'll get back to you as soon as I can, sorry :|

@gvolpe
Copy link
Member

gvolpe commented Jun 21, 2022

@cwe-dixa sorry for the long delay. Can you try applying this patch? I'm basically fixing the non-determinism in the two new tests using a Deferred, didn't touch anything else.

diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/PubSubState.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/PubSubState.scala
index 4cfa55f4..c2a3d02b 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/PubSubState.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/PubSubState.scala
@@ -15,8 +15,10 @@
  */
 
 package dev.profunktor.redis4cats.pubsub.internals
-import fs2.concurrent.Topic
+
 import dev.profunktor.redis4cats.data.RedisPatternEvent
+import fs2.concurrent.Topic
+
 final case class PubSubState[F[_], K, V](
     channels: Map[K, Topic[F, Option[V]]],
     patterns: Map[K, Topic[F, Option[RedisPatternEvent[K, V]]]]
diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala
index a1a7df10..f89cc689 100644
--- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala
+++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala
@@ -589,19 +589,23 @@ trait TestScenarios { self: FunSuite =>
       commands <- Redis[IO].fromClient(client, RedisCodec.Utf8)
       sub <- PubSub.mkSubscriberConnection[IO, String, String](client, RedisCodec.Utf8)
       stream <- Resource.pure(sub.psubscribe(RedisPattern(pattern)))
-      listener <- Resource.eval(stream.head.compile.toList.start)
-      _ <- (for {
-            _ <- commands.setEx(key, "", 1.second)
-            _ <- IO.sleep(2.seconds)
-          } yield ()).foreverM.background
-      firstEvent <- Resource.eval(
-                     listener.joinWith(IO.raiseError(new IllegalStateException("Fiber should not be cancelled")))
-                   )
-    } yield firstEvent.headOption
+      gate <- Resource.eval(IO.deferred[RedisPatternEvent[String, String]])
+      i = Stream.eval(gate.get.as(true))
+      s1 = stream
+        .evalMap(gate.complete(_).void)
+        .interruptWhen(i)
+      s2 = Stream
+        .eval(commands.setEx(key, "", 1.second))
+        .meteredStartImmediately(2.seconds)
+        .interruptWhen(i)
+      _ <- Resource.eval(Stream(s1, s2).parJoin(2).compile.drain)
+      fe <- Resource.eval(gate.get)
+    } yield fe
+
     resources.use { result =>
       IO(
         assert(
-          result == Some(RedisPatternEvent(pattern, "__keyevent@0__:expired", key)),
+          result == RedisPatternEvent(pattern, "__keyevent@0__:expired", key),
           s"Unexpected result $result"
         )
       )
@@ -614,28 +618,32 @@ trait TestScenarios { self: FunSuite =>
     val pattern = "f*"
     val channel = "foo"
     val message = "somemessage"
+
     val resources = for {
       pubsub <- PubSub.mkPubSubConnection[IO, String, String](client, RedisCodec.Utf8)
       stream <- Resource.pure(pubsub.psubscribe(RedisPattern(pattern)))
-      listener <- Resource.eval(stream.head.compile.toList.start)
-      _ <- Stream
-            .awakeEvery[IO](100.milli)
-            .map(_ => message)
-            .through(pubsub.publish(RedisChannel(channel)))
-            .compile
-            .drain
-            .background
-      firstEvent <- Resource.eval(
-                     listener.joinWith(IO.raiseError(new IllegalStateException("Fiber should not be cancelled")))
-                   )
-    } yield firstEvent.headOption
+      gate <- Resource.eval(IO.deferred[RedisPatternEvent[String, String]])
+      i = Stream.eval(gate.get.as(true))
+      s1 = stream
+        .evalMap(gate.complete(_).void)
+        .interruptWhen(i)
+      s2 = Stream
+        .awakeEvery[IO](100.milli)
+        .as(message)
+        .through(pubsub.publish(RedisChannel(channel)))
+        .interruptWhen(i)
+      _ <- Resource.eval(Stream(s1, s2).parJoin(2).compile.drain)
+      fe <- Resource.eval(gate.get)
+    } yield fe
+
     resources.use { result =>
       IO(
         assert(
-          result == Some(RedisPatternEvent(pattern, channel, message)),
+          result == RedisPatternEvent(pattern, channel, message),
           s"Unexpected result $result"
         )
       )
     }
   }
+
 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants