From 8a7a2cbc165e5c956cdd436db023355b5c0fe873 Mon Sep 17 00:00:00 2001 From: niqdev Date: Tue, 14 Nov 2017 20:59:49 +0000 Subject: [PATCH] pitchme: update presentation --- PITCHME.md | 39 ++++++++++++++++++++------------------- PITCHME.yaml | 2 +- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/PITCHME.md b/PITCHME.md index 0562044..098d292 100644 --- a/PITCHME.md +++ b/PITCHME.md @@ -49,7 +49,7 @@ Initial approach +++ -``` +```scala abstract class Migration(...) { val pageSize = 1000 def rowFunction: com.google.common.base.Function @@ -72,7 +72,7 @@ abstract class Migration(...) { +++ -``` +```scala class Job(...) extends Migration(...) { override def rowFunction = new RowFunction } @@ -111,7 +111,7 @@ A better approach Main -``` +```scala class Job(...) extends BaseMigration(...) { implicit val actorSystem: ActorSystem = ??? implicit val materializer: ActorMaterializer = ??? @@ -135,7 +135,7 @@ class Job(...) extends BaseMigration(...) { Actor -``` +```scala class EntityActor(monitorActor: ActorRef, ...)(implicit ...) extends Actor with MigrationStream { override def receive: Receive = { @@ -160,7 +160,7 @@ class EntityActor(monitorActor: ActorRef, ...)(implicit ...) Package -``` +```scala package object stream { type AstyanaxRow = Row[String, String] type LeftMetadata = (Event, String) @@ -227,7 +227,7 @@ class CassandraSource[K, C](...)(implicit ...) Flow -``` +```scala trait MigrationStream { def convertNewEntity(oldEntity: OldEntity): Try[NewEntity] = Try(NewEntityConverter.convert(oldEntity)) @@ -251,35 +251,35 @@ trait MigrationStream { +++ -+++ - Monitor (part 1) -``` +```scala package object stream { sealed trait FlowControl case class Throttle(sleepMillis: Long) extends FlowControl case object Continue extends FlowControl } trait MonitorStream { - def controlDynamicFlow: Flow[FlowControl, FlowControl, NotUsed] = + def controlDynamicFlow: Flow[FlowControl,FlowControl,NotUsed] = Flow[FlowControl] flatMapConcat { case c@Continue => Source.single(c) case t@Throttle(sleepMillis) => Source.single(t) - .delay(sleepMillis, DelayOverflowStrategy.backpressure) + .delay(sleepMillis, DelayOverflowStrategy.backpressure) } } ``` -@[1, 5] ++++ + +@[1-5] @[6-7, 15] @[8-14] Monitor (part 2) -``` +```scala def monitorEventFlow[E](monitorActor: ActorRef)(implicit ...): Flow[Either[LeftMetadata, E], Either[LeftMetadata, E], _] = Flow.fromGraph { GraphDSL.create() { @@ -305,7 +305,7 @@ def monitorEventFlow[E](monitorActor: ActorRef)(implicit ...): Parallelism -``` +```scala class EntityActor(monitorActor: ActorRef, ...)(implicit ...) extends Actor with MigrationStream { override def receive: Receive = { @@ -326,22 +326,22 @@ class EntityActor(monitorActor: ActorRef, ...)(implicit ...) @[1-4, 14-15] @[5-13] -[Dispatcher](https://doc.akka.io/docs/akka/current/scala/dispatchers.html) documentation - +++ `akka-stream-testkit` -[Documentation](https://doc.akka.io/docs/akka/current/scala/stream/stream-testkit.html) ++ + +`ScalaTest` +++ Benefits - any step can fail: `Either[LeftMetadata, T]` -- simple to test with `akka-stream-testkit` +- simple to test - DRY: easy to abstract and reuse streams -- back-pressure - use `async` + custom `dispatcher` +- back-pressure - it's fun! --- @@ -351,6 +351,7 @@ Resources - [Astyanax](https://github.com/Netflix/astyanax) - [Datastax](http://docs.datastax.com/en/landing_page/doc/landing_page/docList.html) - [Akka](https://doc.akka.io/docs/akka/current/scala/index.html) +- [ScalaTest](http://www.scalatest.org) - [akka-stream-cassandra](https://github.com/niqdev/akka-stream-cassandra) --- diff --git a/PITCHME.yaml b/PITCHME.yaml index fbbb9b0..b57388d 100644 --- a/PITCHME.yaml +++ b/PITCHME.yaml @@ -1,2 +1,2 @@ -theme: night +theme: moon footnote : "C* migration with Akka Stream"