Skip to content

Commit

Permalink
pitchme: update presentation
Browse files Browse the repository at this point in the history
  • Loading branch information
niqdev committed Nov 14, 2017
1 parent 137abc3 commit 8a7a2cb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
39 changes: 20 additions & 19 deletions PITCHME.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Initial approach

+++

```
```scala
abstract class Migration(...) {
val pageSize = 1000
def rowFunction: com.google.common.base.Function
Expand All @@ -72,7 +72,7 @@ abstract class Migration(...) {

+++

```
```scala
class Job(...) extends Migration(...) {
override def rowFunction = new RowFunction
}
Expand Down Expand Up @@ -111,7 +111,7 @@ A better approach

Main

```
```scala
class Job(...) extends BaseMigration(...) {
implicit val actorSystem: ActorSystem = ???
implicit val materializer: ActorMaterializer = ???
Expand All @@ -135,7 +135,7 @@ class Job(...) extends BaseMigration(...) {

Actor

```
```scala
class EntityActor(monitorActor: ActorRef, ...)(implicit ...)
extends Actor with MigrationStream {
override def receive: Receive = {
Expand All @@ -160,7 +160,7 @@ class EntityActor(monitorActor: ActorRef, ...)(implicit ...)

Package

```
```scala
package object stream {
type AstyanaxRow = Row[String, String]
type LeftMetadata = (Event, String)
Expand Down Expand Up @@ -227,7 +227,7 @@ class CassandraSource[K, C](...)(implicit ...)

Flow

```
```scala
trait MigrationStream {
def convertNewEntity(oldEntity: OldEntity): Try[NewEntity] =
Try(NewEntityConverter.convert(oldEntity))
Expand All @@ -251,35 +251,35 @@ trait MigrationStream {

+++

+++

Monitor (part 1)

```
```scala
package object stream {
sealed trait FlowControl
case class Throttle(sleepMillis: Long) extends FlowControl
case object Continue extends FlowControl
}
trait MonitorStream {
def controlDynamicFlow: Flow[FlowControl, FlowControl, NotUsed] =
def controlDynamicFlow: Flow[FlowControl,FlowControl,NotUsed] =
Flow[FlowControl] flatMapConcat {
case c@Continue =>
Source.single(c)
case t@Throttle(sleepMillis) =>
Source.single(t)
.delay(sleepMillis, DelayOverflowStrategy.backpressure)
.delay(sleepMillis, DelayOverflowStrategy.backpressure)
}
}
```

@[1, 5]
+++

@[1-5]
@[6-7, 15]
@[8-14]

Monitor (part 2)

```
```scala
def monitorEventFlow[E](monitorActor: ActorRef)(implicit ...):
Flow[Either[LeftMetadata, E], Either[LeftMetadata, E], _] =
Flow.fromGraph { GraphDSL.create() {
Expand All @@ -305,7 +305,7 @@ def monitorEventFlow[E](monitorActor: ActorRef)(implicit ...):

Parallelism

```
```scala
class EntityActor(monitorActor: ActorRef, ...)(implicit ...)
extends Actor with MigrationStream {
override def receive: Receive = {
Expand All @@ -326,22 +326,22 @@ class EntityActor(monitorActor: ActorRef, ...)(implicit ...)
@[1-4, 14-15]
@[5-13]

[Dispatcher](https://doc.akka.io/docs/akka/current/scala/dispatchers.html) documentation

+++

`akka-stream-testkit`

[Documentation](https://doc.akka.io/docs/akka/current/scala/stream/stream-testkit.html)
+

`ScalaTest`

+++

Benefits
- any step can fail: `Either[LeftMetadata, T]`
- simple to test with `akka-stream-testkit`
- simple to test
- DRY: easy to abstract and reuse streams
- back-pressure
- use `async` + custom `dispatcher`
- back-pressure
- it's fun!

---
Expand All @@ -351,6 +351,7 @@ Resources
- [Astyanax](https://github.com/Netflix/astyanax)
- [Datastax](http://docs.datastax.com/en/landing_page/doc/landing_page/docList.html)
- [Akka](https://doc.akka.io/docs/akka/current/scala/index.html)
- [ScalaTest](http://www.scalatest.org)
- [akka-stream-cassandra](https://github.com/niqdev/akka-stream-cassandra)

---
Expand Down
2 changes: 1 addition & 1 deletion PITCHME.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
theme: night
theme: moon
footnote : "C* migration with Akka Stream"

0 comments on commit 8a7a2cb

Please sign in to comment.