Skip to content

Commit

Permalink
Convenience operators for publishing and subscribing to typed pub sub…
Browse files Browse the repository at this point in the history
… from streams akka#31037
  • Loading branch information
johanandren committed Jan 7, 2022
1 parent 4fb7bd9 commit 35a3d4e
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 0 deletions.
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.typed.javadsl

import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source

/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {
/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](topicActor: ActorRef[Topic.Command[T]], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.source(topicActor, bufferSize, overflowStrategy).asJava

/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published the message is
* sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.sink[T](topicActor).asJava

}
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.typed.scaladsl

import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {

/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](topicActor: ActorRef[Topic.Command[T]], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
ActorSource.actorRef[T](
PartialFunction.empty,
PartialFunction.empty,
bufferSize,
overflowStrategy).mapMaterializedValue { ref =>
topicActor ! Topic.Subscribe(ref)
NotUsed
}

/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published the message is
* sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] = {
Sink.foreach[T] { message =>
topicActor ! Topic.Publish(message)
}.mapMaterializedValue(_ => NotUsed)
}
}
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/

/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/package akka.stream.typed.scaladsl

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.actor.typed.pubsub.Topic
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike

class PubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {

implicit def sys = testKit.system.classicSystem

"PubSub.source" should {

"emit messages from the topic" in {
val topic = testKit.spawn(Topic[String]("my-topic-1"))

val source = PubSub.source(topic, 100, OverflowStrategy.fail)
val sourceProbe = source.runWith(TestSink.probe)
sourceProbe.ensureSubscription()

// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should === (1)
}

topic ! Topic.Publish("published")
sourceProbe.requestNext("published")
sourceProbe.cancel()
}

}

"PubSub.sink" should {
"publish messages" in {
val topic = testKit.spawn(Topic[String]("my-topic-2"))

val subscriberProbe = testKit.createTestProbe[String]()
topic ! Topic.Subscribe(subscriberProbe.ref)

// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should === (1)
}

Source.single("published").runWith(PubSub.sink(topic))

subscriberProbe.expectMessage("published")
}
}

}

0 comments on commit 35a3d4e

Please sign in to comment.