-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request akka#16498 from drewhk/wip-15239-stream-cookbook-d…
…rewhk Stream cookbook
- Loading branch information
Showing
19 changed files
with
1,645 additions
and
0 deletions.
There are no files selected for viewing
94 changes: 94 additions & 0 deletions
94
akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package docs.stream.cookbook | ||
|
||
import akka.stream.scaladsl.{ Flow, Sink, Source } | ||
import akka.util.ByteString | ||
|
||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class RecipeByteStrings extends RecipeSpec { | ||
|
||
"Recipes for bytestring streams" must { | ||
|
||
"have a working chunker" in { | ||
val rawBytes = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) | ||
val ChunkLimit = 2 | ||
|
||
//#bytestring-chunker | ||
import akka.stream.stage._ | ||
|
||
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] { | ||
private var buffer = ByteString.empty | ||
|
||
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = { | ||
buffer ++= elem | ||
emitChunkOrPull(ctx) | ||
} | ||
|
||
override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx) | ||
|
||
private def emitChunkOrPull(ctx: Context[ByteString]): Directive = { | ||
if (buffer.isEmpty) ctx.pull() | ||
else { | ||
val (emit, nextBuffer) = buffer.splitAt(chunkSize) | ||
buffer = nextBuffer | ||
ctx.push(emit) | ||
} | ||
} | ||
|
||
} | ||
|
||
val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit)) | ||
//#bytestring-chunker | ||
|
||
val chunksFuture = chunksStream.grouped(10).runWith(Sink.head) | ||
|
||
val chunks = Await.result(chunksFuture, 3.seconds) | ||
|
||
chunks.forall(_.size <= 2) should be(true) | ||
chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) | ||
} | ||
|
||
"have a working bytes limiter" in { | ||
val SizeLimit = 9 | ||
|
||
//#bytes-limiter | ||
import akka.stream.stage._ | ||
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] { | ||
private var count = 0 | ||
|
||
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = { | ||
count += chunk.size | ||
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes")) | ||
else ctx.push(chunk) | ||
} | ||
} | ||
|
||
val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit)) | ||
//#bytes-limiter | ||
|
||
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) | ||
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10))) | ||
|
||
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) | ||
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) | ||
|
||
an[IllegalStateException] must be thrownBy { | ||
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) | ||
} | ||
} | ||
|
||
"demonstrate compacting" in { | ||
|
||
val data = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) | ||
|
||
//#compacting-bytestrings | ||
val compacted: Source[ByteString] = data.map(_.compact) | ||
//#compacting-bytestrings | ||
|
||
Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true) | ||
} | ||
|
||
} | ||
|
||
} |
91 changes: 91 additions & 0 deletions
91
akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package docs.stream.cookbook | ||
|
||
import akka.stream.{ MaterializerSettings, FlowMaterializer } | ||
import akka.stream.scaladsl._ | ||
import akka.stream.testkit.StreamTestKit | ||
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } | ||
|
||
import scala.collection.immutable | ||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class RecipeCollectingMetrics extends RecipeSpec { | ||
import HoldOps._ | ||
implicit val m2 = FlowMaterializer(MaterializerSettings(system).withInputBuffer(1, 1)) | ||
|
||
"Recipe for periodically collecting metrics" must { | ||
|
||
"work" in { | ||
// type Tick = Unit | ||
// | ||
// val loadPub = PublisherProbe[Int]() | ||
// val tickPub = PublisherProbe[Tick]() | ||
// val reportTicks = Source(tickPub) | ||
// val loadUpdates = Source(loadPub) | ||
// val futureSink = Sink.head[immutable.Seq[String]] | ||
// val sink = Flow[String].grouped(10).to(futureSink) | ||
// | ||
// //#periodic-metrics-collection | ||
// val currentLoad = loadUpdates.transform(() => new HoldWithWait) | ||
// | ||
// val graph = FlowGraph { implicit builder => | ||
// import FlowGraphImplicits._ | ||
// val collector = ZipWith[Int, Tick, String]( | ||
// (load: Int, tick: Tick) => s"current load is $load") | ||
// | ||
// currentLoad ~> collector.left | ||
// reportTicks ~> collector.right | ||
// | ||
// collector.out ~> sink | ||
// } | ||
// //#periodic-metrics-collection | ||
// | ||
// val reports = graph.run().get(futureSink) | ||
// val manualLoad = new StreamTestKit.AutoPublisher(loadPub) | ||
// val manualTick = new StreamTestKit.AutoPublisher(tickPub) | ||
// | ||
// // Prefetch elimination | ||
// manualTick.sendNext(()) | ||
// | ||
// manualLoad.sendNext(53) | ||
// manualLoad.sendNext(61) | ||
// manualTick.sendNext(()) | ||
// | ||
// manualLoad.sendNext(44) | ||
// manualLoad.sendNext(54) | ||
// manualLoad.sendNext(78) | ||
// Thread.sleep(500) | ||
// | ||
// manualTick.sendNext(()) | ||
// | ||
// manualTick.sendComplete() | ||
// | ||
// Await.result(reports, 3.seconds) should be(List("current load is 53", "current load is 61", "current load is 78")) | ||
|
||
// Periodically collect values of metrics expressed as stream of updates | ||
// --------------------------------------------------------------------- | ||
// | ||
// **Situation:** Given performance counters expressed as a stream of updates we want to gather a periodic report of these. | ||
// We do not want to backpressure the counter updates but always take the last value instead. Whenever we don't have a new counter | ||
// value we want to repeat the last value. | ||
// | ||
// This recipe uses the :class:`HoldWithWait` recipe introduced previously. We use this element to gather updates from | ||
// the counter stream and store the final value, and also repeat this final value if no update is received between | ||
// metrics collection rounds. | ||
// | ||
// To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad`` | ||
// stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``. | ||
// | ||
// .. includecode:: code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection | ||
// | ||
// .. warning:: | ||
// In order for this recipe to work the buffer size for the :class:`ZipWith` must be set to 1. The reason for this is | ||
// explained in the "Buffering" section of the documentation. | ||
|
||
// FIXME: This recipe does only work with buffer size of 0, which is only available if graph fusing is implemented | ||
pending | ||
} | ||
|
||
} | ||
|
||
} |
62 changes: 62 additions & 0 deletions
62
akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package docs.stream.cookbook | ||
|
||
import java.security.MessageDigest | ||
|
||
import akka.stream.scaladsl.{ Sink, Source } | ||
import akka.util.ByteString | ||
|
||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class RecipeDigest extends RecipeSpec { | ||
|
||
"Recipe for calculating digest" must { | ||
|
||
"work" in { | ||
|
||
val data = Source(List( | ||
ByteString("abcdbcdecdef"), | ||
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq"))) | ||
|
||
//#calculating-digest | ||
import akka.stream.stage._ | ||
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] { | ||
val digest = MessageDigest.getInstance(algorithm) | ||
|
||
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = { | ||
digest.update(chunk.toArray) | ||
ctx.pull() | ||
} | ||
|
||
override def onPull(ctx: Context[ByteString]): Directive = { | ||
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest())) | ||
else ctx.pull() | ||
} | ||
|
||
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = { | ||
// If the stream is finished, we need to emit the last element in the onPull block. | ||
// It is not allowed to directly emit elements from a termination block | ||
// (onUpstreamFinish or onUpstreamFailure) | ||
ctx.absorbTermination() | ||
} | ||
} | ||
|
||
val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256")) | ||
//#calculating-digest | ||
|
||
Await.result(digest.runWith(Sink.head), 3.seconds) should be( | ||
ByteString( | ||
0x24, 0x8d, 0x6a, 0x61, | ||
0xd2, 0x06, 0x38, 0xb8, | ||
0xe5, 0xc0, 0x26, 0x93, | ||
0x0c, 0x3e, 0x60, 0x39, | ||
0xa3, 0x3c, 0xe4, 0x59, | ||
0x64, 0xff, 0x21, 0x67, | ||
0xf6, 0xec, 0xed, 0xd4, | ||
0x19, 0xdb, 0x06, 0xc1)) | ||
|
||
} | ||
|
||
} | ||
|
||
} |
58 changes: 58 additions & 0 deletions
58
akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package docs.stream.cookbook | ||
|
||
import akka.stream.OverflowStrategy | ||
import akka.stream.scaladsl._ | ||
import akka.stream.testkit.StreamTestKit.SubscriberProbe | ||
|
||
import scala.collection.immutable | ||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class RecipeDroppyBroadcast extends RecipeSpec { | ||
|
||
"Recipe for a droppy broadcast" must { | ||
"work" in { | ||
val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1)) | ||
|
||
val sub1 = SubscriberProbe[Int]() | ||
val sub2 = SubscriberProbe[Int]() | ||
val mySink1 = Sink(sub1) | ||
val mySink2 = Sink(sub2) | ||
val futureSink = Sink.head[Seq[Int]] | ||
val mySink3 = Flow[Int].grouped(200).to(futureSink) | ||
|
||
//#droppy-bcast | ||
// Makes a sink drop elements if too slow | ||
def droppySink[T](sink: Sink[T], bufferSize: Int): Sink[T] = { | ||
Flow[T].buffer(bufferSize, OverflowStrategy.dropHead).to(sink) | ||
} | ||
|
||
import FlowGraphImplicits._ | ||
val graph = FlowGraph { implicit builder => | ||
val bcast = Broadcast[Int] | ||
|
||
myElements ~> bcast | ||
|
||
bcast ~> droppySink(mySink1, 10) | ||
bcast ~> droppySink(mySink2, 10) | ||
bcast ~> droppySink(mySink3, 10) | ||
} | ||
//#droppy-bcast | ||
|
||
Await.result(graph.run().get(futureSink), 3.seconds).sum should be(5050) | ||
|
||
sub1.expectSubscription().request(10) | ||
sub2.expectSubscription().request(10) | ||
|
||
for (i <- 91 to 100) { | ||
sub1.expectNext(i) | ||
sub2.expectNext(i) | ||
} | ||
|
||
sub1.expectComplete() | ||
sub2.expectComplete() | ||
|
||
} | ||
} | ||
|
||
} |
28 changes: 28 additions & 0 deletions
28
akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package docs.stream.cookbook | ||
|
||
import akka.stream.scaladsl.{ Sink, Source } | ||
|
||
import scala.collection.immutable | ||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
|
||
class RecipeFlattenSeq extends RecipeSpec { | ||
|
||
"Recipe for flatteing a stream of seqs" must { | ||
|
||
"work" in { | ||
|
||
val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7"))) | ||
|
||
//#flattening-seqs | ||
val myData: Source[List[Message]] = someDataSource | ||
val flattened: Source[Message] = myData.mapConcat(identity) | ||
//#flattening-seqs | ||
|
||
Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) | ||
|
||
} | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.