Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add psubscribe/punsubscribe to PubSub api #691

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- "6379:6379"
environment:
- DEBUG=false
command: redis-server --notify-keyspace-events KEA

ReplicaNode:
restart: always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import javax.crypto.spec.SecretKeySpec
object data {

final case class RedisChannel[K](underlying: K) extends AnyVal
final case class RedisPattern[K](underlying: K) extends AnyVal
final case class RedisPatternEvent[K, V](pattern: K, channel: K, data: V)

final case class RedisCodec[K, V](underlying: JRedisCodec[K, V]) extends AnyVal
final case class NodeId(value: String) extends AnyVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect._
import dev.profunktor.redis4cats.pubsub.internals.{ LivePubSubCommands, Publisher, Subscriber }
import fs2.Stream
import fs2.concurrent.Topic
import dev.profunktor.redis4cats.pubsub.internals.PubSubState
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

object PubSub {
Expand Down Expand Up @@ -58,7 +58,7 @@ object PubSub {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
// One exclusive connection for subscriptions and another connection for publishing / stats
for {
state <- Resource.eval(Ref.of[F, Map[K, Topic[F, Option[V]]]](Map.empty))
state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
sConn <- Resource.make(acquire)(release)
pConn <- Resource.make(acquire)(release)
} yield new LivePubSubCommands[F, K, V](state, sConn, pConn)
Expand Down Expand Up @@ -88,7 +88,7 @@ object PubSub {
): Resource[F, SubscribeCommands[Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
for {
state <- Resource.eval(Ref.of[F, Map[K, Topic[F, Option[V]]]](Map.empty))
state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
conn <- Resource.make(acquire)(release)
} yield new Subscriber(state, conn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] {
trait SubscribeCommands[F[_], K, V] {
def subscribe(channel: RedisChannel[K]): F[V]
def unsubscribe(channel: RedisChannel[K]): F[Unit]
def psubscribe(channel: RedisPattern[K]): F[RedisPatternEvent[K, V]]
def punsubscribe(channel: RedisPattern[K]): F[Unit]
}

trait PubSubCommands[F[_], K, V] extends PublishCommands[F, K, V] with SubscribeCommands[F, K, V]
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package internals
import cats.effect.kernel._
import cats.syntax.all._
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import dev.profunktor.redis4cats.pubsub.data.Subscription
import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
import fs2.Stream
Expand All @@ -42,9 +44,17 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
subCommands.unsubscribe(channel)

override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
subCommands.psubscribe(pattern)

override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
subCommands.punsubscribe(pattern)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.flatMap { message =>
Stream.resource(Resource.eval(state.get) >>= PubSubInternals[F, K, V](state, subConnection).apply(channel)) >>
Stream.resource(
Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
) >>
Stream.eval(FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,82 @@ import cats.effect.kernel.{ Async, Ref, Resource, Sync }
import cats.effect.std.Dispatcher
import cats.syntax.all._
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import dev.profunktor.redis4cats.effect.Log
import fs2.concurrent.Topic
import io.lettuce.core.pubsub.{ RedisPubSubListener, StatefulRedisPubSubConnection }
import io.lettuce.core.pubsub.RedisPubSubAdapter

object PubSubInternals {

private[redis4cats] def defaultListener[F[_]: Async, K, V](
private[redis4cats] def channelListener[F[_]: Async, K, V](
channel: RedisChannel[K],
topic: Topic[F, Option[V]],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubListener[K, V] {
new RedisPubSubAdapter[K, V] {
override def message(ch: K, msg: V): Unit =
if (ch == channel.underlying) {
dispatcher.unsafeRunSync(topic.publish1(Option(msg)).void)
}
override def message(pattern: K, channel: K, message: V): Unit = this.message(channel, message)
override def psubscribed(pattern: K, count: Long): Unit = ()
override def subscribed(channel: K, count: Long): Unit = ()
override def unsubscribed(channel: K, count: Long): Unit = ()
override def punsubscribed(pattern: K, count: Long): Unit = ()
}
private[redis4cats] def patternListener[F[_]: Async, K, V](
redisPattern: RedisPattern[K],
topic: Topic[F, Option[RedisPatternEvent[K, V]]],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubAdapter[K, V] {
override def message(pattern: K, channel: K, message: V): Unit =
if (pattern == redisPattern.underlying) {
dispatcher.unsafeRunSync(topic.publish1(Option(RedisPatternEvent(pattern, channel, message))).void)
}
}

private[redis4cats] def apply[F[_]: Async: Log, K, V](
private[redis4cats] def channel[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
): GetOrCreateTopicListener[F, K, V] = { channel => st =>
st.get(channel.underlying)
st.channels
.get(channel.underlying)
.fold {
for {
dispatcher <- Dispatcher[F]
topic <- Resource.eval(Topic[F, Option[V]])
_ <- Resource.eval(Log[F].info(s"Creating listener for channel: $channel"))
listener = defaultListener(channel, topic, dispatcher)
listener = channelListener(channel, topic, dispatcher)
_ <- Resource.make {
Sync[F].delay(subConnection.addListener(listener)) *>
state.update(_.updated(channel.underlying, topic))
state.update(s => s.copy(channels = s.channels.updated(channel.underlying, topic)))
} { _ =>
Sync[F].delay(subConnection.removeListener(listener)) *>
state.update(_ - channel.underlying)
state.update(s => s.copy(channels = s.channels - channel.underlying))
}
} yield topic
}(Resource.pure)
}

private[redis4cats] def pattern[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
): GetOrCreatePatternListener[F, K, V] = { channel => st =>
st.patterns
.get(channel.underlying)
.fold {
for {
dispatcher <- Dispatcher[F]
topic <- Resource.eval(Topic[F, Option[RedisPatternEvent[K, V]]])
_ <- Resource.eval(Log[F].info(s"Creating listener for pattern: $channel"))
listener = patternListener(channel, topic, dispatcher)
_ <- Resource.make {
Sync[F].delay(subConnection.addListener(listener)) *>
state.update(s => s.copy(patterns = s.patterns.updated(channel.underlying, topic)))
} { _ =>
Sync[F].delay(subConnection.removeListener(listener)) *>
state.update(s => s.copy(patterns = s.patterns - channel.underlying))
}
} yield topic
}(Resource.pure)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018-2021 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats.pubsub.internals
import fs2.concurrent.Topic
import dev.profunktor.redis4cats.data.RedisPatternEvent
final case class PubSubState[F[_], K, V](
channels: Map[K, Topic[F, Option[V]]],
patterns: Map[K, Topic[F, Option[RedisPatternEvent[K, V]]]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import cats.effect.kernel._
import cats.effect.kernel.implicits._
import cats.syntax.all._
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
Expand All @@ -34,7 +36,7 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals[F, K, V](state, subConnection).apply(channel))
.resource(Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel))
.evalTap(_ => FutureLift[F].lift(subConnection.async().subscribe(channel.underlying)))
.flatMap(_.subscribe(500).unNone)

Expand All @@ -44,9 +46,27 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
.lift(subConnection.async().unsubscribe(channel.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state.update(
_ - channel.underlying
)
st.channels.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state
.update(s => s.copy(channels = s.channels - channel.underlying))
})
}

override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals.pattern[F, K, V](state, subConnection).apply(pattern))
.evalTap(_ => FutureLift[F].lift(subConnection.async().psubscribe(pattern.underlying)))
.flatMap(_.subscribe(500).unNone)

override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
Stream.eval {
FutureLift[F]
.lift(subConnection.async().punsubscribe(pattern.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.patterns
.get(pattern.underlying)
.fold(Applicative[F].unit)(_.publish1(none[RedisPatternEvent[K, V]]).void) *> state
.update(s => s.copy(patterns = s.patterns - pattern.underlying))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package dev.profunktor.redis4cats.pubsub
import cats.effect.kernel.Resource
import dev.profunktor.redis4cats.data.RedisChannel
import fs2.concurrent.Topic
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent

package object internals {
private[pubsub] type PubSubState[F[_], K, V] = Map[K, Topic[F, Option[V]]]
private[pubsub] type GetOrCreateTopicListener[F[_], K, V] =
RedisChannel[K] => PubSubState[F, K, V] => Resource[F, Topic[F, Option[V]]]

private[pubsub] type GetOrCreatePatternListener[F[_], K, V] =
RedisPattern[K] => PubSubState[F, K, V] => Resource[F, Topic[F, Option[RedisPatternEvent[K, V]]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ abstract class Redis4CatsFunSuite(isCluster: Boolean) extends IOSuite {
def withRedis[A](f: RedisCommands[IO, String, String] => IO[A]): Future[Unit] =
withAbstractRedis[A, String, String](f)(stringCodec)

def withRedisClient[A](f: RedisClient => IO[A]): Future[Unit] =
RedisClient[IO].from("redis://localhost").use(f).as(assert(true)).unsafeToFuture()

def withRedisStream[A](f: Streaming[fs2.Stream[IO, *], String, String] => IO[A]): Future[Unit] =
(for {
client <- fs2.Stream.resource(RedisClient[IO].from("redis://localhost"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios {

test("hyperloglog api")(withRedis(hyperloglogScenario))

test("pattern key sub")(withRedisClient(keyPatternSubScenario))

test("pattern channel sub")(withRedisClient(channelPatternSubScenario))

}

object LongCodec extends JRedisCodec[String, Long] with ToByteBufEncoder[String, Long] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import io.lettuce.core.{ GeoArgs, ZAggregateArgs }
import munit.FunSuite

import scala.concurrent.duration._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.pubsub.PubSub
import dev.profunktor.redis4cats.data.{ RedisChannel, RedisCodec, RedisPattern, RedisPatternEvent }

trait TestScenarios { self: FunSuite =>

Expand Down Expand Up @@ -575,4 +578,49 @@ trait TestScenarios { self: FunSuite =>
_ <- IO(assert(c4 > 0, "merged hyperloglog should think it has more than 0 items in"))
} yield ()
}

def keyPatternSubScenario(client: RedisClient): IO[Unit] = {
import dev.profunktor.redis4cats.effect.Log.NoOp._
import scala.language.postfixOps

val pattern = "__keyevent*__:*"
val key = "somekey"
val resources = for {
ref <- Resource.eval(Ref.of[IO, Option[RedisPatternEvent[String, String]]](None))
commands <- Redis[IO].fromClient(client, RedisCodec.Utf8)
sub <- PubSub.mkSubscriberConnection[IO, String, String](client, RedisCodec.Utf8)
stream <- Resource.pure(sub.psubscribe(RedisPattern(pattern)))
gvolpe marked this conversation as resolved.
Show resolved Hide resolved
_ <- stream.foreach(output => ref.set(Some(output))).compile.drain.background
_ <- Resource.eval(commands.set(key, ""))
_ <- Resource.eval(commands.expire(key, 1 seconds))
} yield ref.get
resources.use(get => get.iterateUntil(_.isDefined).timeout(2 seconds)).map { result =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeouts in tests usually end up becoming flaky. If there's a better way to test this, that would be preferable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to come up with a better way of testing it, but I can't come up with one. I have worked around it with a sleep for now.

We need to start pulling prior to publishing the message, otherwise we won't get the published message. So I start the pulling of the stream on another fiber and I could wait until resource acquisition/stream initialization on the fiber is done prior to proceeding. But that does not guarantee that we are actually subscribed. In principle, the subscription is not active until redis responds with subscribe. Today, we do not wait until redis responds, so there is an interval between the stream having started and the subscription actually being active. This is what the sleep solves in the tests.

We could change the stream implementation so it blocks the resource acquisition in PubSubInternals.channel and PubSubInternals.pattern until the subscribe event is received. I would be happy to implement this (I think this behavior makes more sense). But it is a change in behavior from how redis4cats work today.

What are your thoughts on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cwe-dixa I had a look at the code and it seems we could know when we are subscribed from both psubscribed and subscribed in defaultListener. I think the best idea would be to add the ability to fetch this information (also unsubscribed and punsubscribed) all the way to PubSubCommands, which could be internally powered by a Deferred[F, Unit] or so. E.g.

private val gate: Deferred[F, Unit] = ???

/* semantically blocks until subscribed */
def subscribed(channel: RedisChannel[K]): F[Unit] = gate.get

Or we could also expose it as def isSubscribed(channel: RedisChannel[K]): F[Boolean], but I think the former would have a better UX.

In my experience, using either sleep or timeout in tests becomes flakyness issues sooner or later, and we already have an unsolved one related to streams, so I'd prefer to avoid more maintenance burden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long delay :-(

We could add a subscribed method as suggested to PubSubCommands, but that does require quite a bit of care to handle correctly... And I'm starting to think that maybe we should just not handle it.

If we do add a subscribed method to PubSubCommands we need to consider what happens if we start two subscriptions to the same channel. Should PubSubCommands defer to the first or the second one? What if one of them is cancelled?
Having subscribed seems to imply also having a psubscribed, but patterns have a hierarchy of sorts, unlike channels. Does psubscribed(Pattern("f*")) imply psubscribed(Pattern("foo*"))?

Also, for newcomers, it might not be obvious that something like

for {
  stream <- pubsub.subscribe(RedisChannel("foo"))
  _ <- pubsub.subscribed(RedisChannel("foo"))
  // do something with stream
} yield ()

will block forever, since the Stream won't emit the subscribe command until it is pulled, hence .subscribed(RedisChannel("foo")) will never complete.

For the above, I'm starting to favor simply adding some retries to the tests, which I have done. Please let me know what you think :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, even with the retries, the test hangs sometimes, especially on CI.
It seems that there is some race condition in the listener. I can't explain why, but what I observe is:

  1. Test runs fine and gets to releasing resource
  2. Test tries to release resources (including the subscription)
  3. sub listener is removed (see PubSubInternals#pattern)
  4. dispatcher is closed/stopped (see PubSubInternals#pattern)
  5. Listener from 3 receives one more event and tries to dispatch it
  6. Dispatch call blocks forever (<-- I assume this prevent the connection cleanup as well)
  7. Test times out

I think something weird is happening with the dispatcher. I have tried dispatching on a closed dispatcher and that just throws an error. But we are not getting an error in these tests. Instead, it just block indefinitely.

Using either unsafeRunAndForget or unsafeRunTimed in the listener makes the test pass, but I'm not sure if we should be using either of those. Especially since I don't fully understand what the issue is.
I'm hoping that you do 🤞

assert(
result == Some(RedisPatternEvent(pattern, "__keyevent@0__:expired", key)),
s"Unexpected result $result"
)
}
}

def channelPatternSubScenario(client: RedisClient): IO[Unit] = {
import dev.profunktor.redis4cats.effect.Log.NoOp._
import scala.language.postfixOps

val pattern = "f*"
val channel = "foo"
val message = "somemessage"
val resources = for {
ref <- Resource.eval(Ref.of[IO, Option[RedisPatternEvent[String, String]]](None))
pubsub <- PubSub.mkPubSubConnection[IO, String, String](client, RedisCodec.Utf8)
stream <- Resource.pure(pubsub.psubscribe(RedisPattern(pattern)))
_ <- stream.foreach(output => ref.set(Some(output))).compile.drain.background
_ <- Resource.eval(pubsub.publish(RedisChannel(channel))(fs2.Stream(message)).compile.drain)
} yield ref.get
resources.use(get => get.iterateUntil(_.isDefined).timeout(2 seconds)).map { result =>
gvolpe marked this conversation as resolved.
Show resolved Hide resolved
assert(
result == Some(RedisPatternEvent(pattern, channel, message)),
s"Unexpected result $result"
)
}
}
}