Skip to content

Latest commit

 

History

History
797 lines (739 loc) · 31.2 KB

cs212.md

File metadata and controls

797 lines (739 loc) · 31.2 KB

CS212

GNU General Public License v3.0 licensed. Source available on github.com/zifeo/EPFL.

Spring 2015: Reactive Programming & Parallelism

[TOC]

Parallel programming in Scala

Introduction, basic constructs, analysis

  • sequential programming : one part at once
  • parallel programing : multiple parts at once
  • parallel is about performance
    • infrastructure : libraries, hardware, frameworks for parallelism
    • application : design parallel software
  • concurrent programming is a general concept : structure program into concurrent tasks that communicate both among each other and with the external environment
    • must ensure safe accesses to shared resources (locks)
    • can be used to ensure responsiveness in interactive apps (even on a single CPU core)
  • parallel programming : speed up computation through parallel hardware resources without solving all difficulties of concurrency (use frameworks instead)
    • get some part of the problem (isolation)
    • return result
  • avoid side effects (except for local performance improvements), so general rule : if one parallel tasks writes to a variable (or array entry), no other task may read or write this variable at the same time
  • def parallel[A, B](taskA: => A, taskB: => B): (A, B) = { ... }
    • running time can be maximum of time (instead of sum)
    • if no avaible parallel resources, executes tasks sequentially
    • invoking this carries a significant constant overhead (do not do it if not needed)
    • call by name
  • costs : $S(e)$
    • $S($sumSegment(a,p,s,t)$)=c_1(t-s)+c_2$
    • $S($parallel(t1,t2)$)=c_P+max(S($t1$),S($t2$))$
  • span : $S(e)$ number of steps if we had unbounded parallelism
    • $S($parallel($e_1$,$e_2$)$)=c_P+max(S(e_1),S(e_2))$
  • work : $W(e)$ number of steps $e$ would take if there was no parallelism (sequential)
    • $W($parallel($e_1$,$e_2$)$)=c_Q+W(e_1)+W(e_2)$
  • waiting : $max(S(e), \frac{W(e)}{K})$ for $K$ threads
    • if $K$ is constant but inputs grow, parallel programs have same asymptotic time cmplexity as sequeial ones
    • even if we have infinite resources ($K\to\infty$), we have non-zero complexity given by $S(e)$
  • Amdahl's law : speedup = $\frac{1}{1-f+\frac{f}{K}}$ for $K$ threads and $f$ part that is parallelized (%)
  • ScalaMeter
    • val time = measure { (0 until 1000000).toArray }
    • need warmup withWarmer(new Warmer.Default) measure { ... }
      • first the program is interpreted
      • then, part fo the program are compiled into machine code
      • JVM may apply additional dynamic optimizations during the run
      • eventually, the program reaches steady-state performance
    • running config
      • Default (plain running time)
      • IgnoringGC
      • OutlierElimination
      • MemoryFootprint
      • GarbageCollectionCycles
  • memory bandwidth : memory bound if computation is simple (the cache for a parallel sum do not help)
  • def task(c: => A) : ForkJoinTask[A] with trait ForkJoinTask[A] { def join: A }

Reductions, operator associativity

  • list traversal (map) is not parallelizable
  • alternative
    • array : be careful on accesses, good memory locality, hard to construct from pieces)
    • immutable tres : functional, high memory allocation overhead, bad locality
  • mid computation : avoid overflow by val mid = left + (right − left)/2
  • need a threshold for avoiding to much splitting at small scale
  • inlining high-order function helps but this is nothing
  • similar perf with trees
  • parallel reduce
    • messy behavior so associative operation f : (A,A) => A are required (no left nor right fold)
    • associativity iff f (x, f (y, z)) = f (f (x, y), z) for all x, y and z
      • prove by induction
    • important associative operations that happen to be also commutative
      • addition, multiplication modulo a positive integer
      • addition, multiplication of BigInt's
      • union, intersection, symmetric difference of sets
      • boolean operations
      • all complements of associative operations
    • floating point is not associative
  • parallel scan
    • upsweep : compute the tree in parallel
    • downsweep : compute scan result in parallel (recursive way)
  • combining and fusing operations
    • combine two maps into one
    • fusing basic operation in the sequential part (under threshold)

Data-parallel programming

  • task-parallel programming : a form of parallelization that distributes execution processes across computing nodes
  • data-parallel programming : a form of parallelization that distributes data across computing nodes
  • must also follow the rule : check the usage of shared memory
  • parallel collection : for (idx <- (0 until image.length).par) {
    • about 2x faster
  • workload : function that maps each input element to the amount of work required to process it
    • data-parallel scheduler: efficiently balance the workload across processors without any knowledge about the w(i)
  • deterministric as long as used functionnally
  • not all operation excute in parallel (foldRight, reduceLeft, scanRight, etc.)
  • need for associativity : we say that the neutral element z and the binary operator f must form a monoid
    • f(a, f(b, c)) == f(f(a, b), c)
    • f(z, a) == f(a, z) == a
  • commutativity is not need for fold
  • can only produce values from the same type
  • aggreagate : def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B (can change return type)
  • this was accessor combinators : return single value
  • transformer combinators : map, filter, flatMap, groupBy return new collections
  • Scala collections hierarchy
    • Traversable[T] – collection of elements with type T, with operations implemented using foreach
    • Iterable[T] – collection of elements with type T, with operations implemented using iterator
    • Seq[T] – an ordered sequence of elements with type T
    • Set[T] – a set of elements with type T (no duplicates)
    • Map[K, V] – a map of keys with type K associated with values of type V (no duplicate keys)
    • parallel equivalence : ParIterable[T], etc.
    • agnostic collections regroup both : GenIterable[T], etc.
    • parallelizable collections :
      • ParArray[T] – parallel array of objects, counterpart of Array and ArrayBuffer
      • ParRange – parallel range of integers, counterpart of Range
      • ParVector[T] – parallel vector, counterpart of Vector
      • mutable.ParHashSet[T] – counterpart of mutable.HashSet
      • mutable.PasHashMap[K, V] – counterpart of mutable.HashMap
      • immutable.ParHashSet[T] – counterpart of immutable.HashSet
      • immutable.ParHashMap[K, V] – counterpart of immutable.HashMap
      • ParTrieMap[K, V] – thread-safe parallel map with atomic snapshots, counterpart of TrieMap
      • for other collections, par creates the most similar parallel collection – e.g. a List is converted to a ParVector
  • 2 rules
    • avoid side effect without proper synchronization
    • never modifiy a parallel collection on which a data-parallel operation is in progress
    • exceptions : TrieMap allows traversing and modifying at the same time (by snapshot)
  • 4 abstractions :
    • iterators : trait Iterator[A] { def next(): A; def hasNext: Boolean }
    • splitters : trait Splitter[A] extends Iterator[A] { def split: Seq[Splitter[A]]; def remaining: Int }
      • after call to split, the original splitter is in an undefined state
      • disjoint
      • remaining is an estimate
      • split is efficient O(log n) or better
    • builders : trait Builder[A, Repr] { def +=(elem: A): Builder[A, Repr]; def result: Repr }
      • calling result leave the Builder in an undefined state
    • combiners : trait Combiner[A, Repr] extends Builder[A, Repr] { def combine(that: Combiner[A, Repr]): Combiner[A, Repr] }
      • combine returns a new combiner that contains elements of input combiners
      • combine leaves both original Combiners in an undefined state
      • combine is efficient O(P log (n)) or better (P number of processors)

Parallel data structures and algorithms

  • set data structures
    • lookup, intertion, deletion : hash tables O(1)
    • combine represents union
  • sequence data structures
    • mutable prepend, append O(1) or intertion O(n)
    • functional prepend O(1), or else O(n)
    • array lists append, random access O(1) or else O(n)
    • mutable linked list can have O(1) concatenation
    • combine represents concatenation
  • two phases construction : no efficient concatenation so use intermediate data structure is a data structure defined with
    • efficient combine method
    • efficient += method
    • result method max O(n) but parallelizable
    • Array combiner : class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int) { private var numElems = 0; private val buffers = new ArrayBuffer[ArrayBuffer[T]]; buffers += new ArrayBuffer[T]
      • compute multilevel array at the end
      • can be used : xs.par.aggregate(newCombiner)(_ += _, _ combine _).result
  • conc-trees
    • list are built for sequential computations (left to right)
    • trees allow parallel computation but not balanced...
    • lets use con-trees : sealed trait Conc[+T] { def level: Int; def size: Int; def left: Conc[T]; def right: Conc[T] }
    • invariants
      • A <> node can never contain Empty as its subtree
      • The level difference between the left and the right subtree of a <> node is always 1 or less (only an indication)
    • concatenation in O(h1-h2) (h is the height)
      • two trees could have height difference one or less : concat both
      • left tree is higher that right one
        • left tree is left-leaning : recursively concatenate the right subtree
        • left tree is right-leaning : get right tree subtrees with left tree and the right one and concatenate the 4 part according smallest levels

Data processing with Spark

Spark and futures

  • data-parallelism in distributed setting
  • spark abstraction (Apache)
  • distribution introduce important concerns
    • partial failure in a distributed computation
    • latency due to network communication
  • latency number
L1 cache reference ......................... 0.5 ns
Branch mispredict ............................ 5 ns
L2 cache reference ........................... 7 ns
Mutex lock/unlock ........................... 25 ns
Main memory reference ...................... 100 ns             
Compress 1K bytes with Zippy ............. 3,000 ns  =   3 µs
Send 2K bytes over 1 Gbps network ....... 20,000 ns  =  20 µs
SSD random read ........................ 150,000 ns  = 150 µs
Read 1 MB sequentially from memory ..... 250,000 ns  = 250 µs
Round trip within same datacenter ...... 500,000 ns  = 0.5 ms
Read 1 MB sequentially from SSD* ..... 1,000,000 ns  =   1 ms
Disk seek ........................... 10,000,000 ns  =  10 ms
Read 1 MB sequentially from disk .... 20,000,000 ns  =  20 ms
Send packet CA->Netherlands->CA .... 150,000,000 ns  = 150 ms
  • humanzied : multiply all these durations by a billion
L1 cache reference                  0.5 s         One heart beat (0.5 s)
Branch mispredict                   5 s           Yawn
L2 cache reference                  7 s           Long yawn
Mutex lock/unlock                   25 s          Making a coffee
Main memory reference               100 s         Brushing your teeth
Compress 1K bytes with Zippy        50 min        One episode of a TV show (including ad breaks)
Send 2K bytes over 1 Gbps network   5.5 hr        From lunch to end of work day
SSD random read                     1.7 days      A normal weekend
Read 1 MB sequentially from memory  2.9 days      A long weekend
Round trip within same datacenter   5.8 days      A medium vacation
Read 1 MB sequentially from SSD    11.6 days      Waiting for almost 2 weeks for a delivery
Disk seek                           16.5 weeks    A semester in university
Read 1 MB sequentially from disk    7.8 months    Almost producing a new human being
The above 2 together                1 year
Send packet CA->Netherlands->CA     4.8 years     Average time it takes to complete a bachelor's degree
  • distributed data-parallel
    • Shared memory case : Data partitioned in memory and operated upon in parallel
    • Distributed case : Data partitioned between machines, network in between, operated upon in parallel
  • RDDs : Resilient Distributed Datasets
    • like an immutable sequential or parallel scala collection
    • combinators : map, flatMap, filter, reduce, fold, aggregate but return RDD instead
    • creation : spark.textFile(”hdfs://...”) or sc.parallelize(largeList)
    • transformers (new collection as result) are transformations : lazy RDD not immediately computed
    • accessors (value as result) are actions : eager result immediately computed
    • important RDD transformations
      • sample : sample a fraction of the data
      • union (duplicates remain)
      • intersection (duplicates remain)
      • distinct
      • coalesce : decrease the number of partitions in the RDD to given number (after filtering)
      • repartition : reshuffle the data randomly
      • collect : return all elements as an array
      • count
      • foreach : for side effects
      • saveAsTextFile : write elements of the dataset as a text file
  • in the cluster
    • tranformations are deferred until we add an action : no intermediate RDD (chain of operations is optimized)
    • coordination done by the driver program of Spark
    • recomputed each time there is call for an actions : retains intermediate collection by calling .persist()
      • cached in memory or in disk
      • memory_only vs memory_only_ser (serizalized data) vs memory_and_disk vs memory_and_disk_ser vs disk_only
  • pair RDD
    • groupByKey
    • reduceByKey
    • join (inner) : combined pairs whose keys are present in both input
    • mapVsalues
    • leftOuterJoin/rightOuterJoin : combined pairs whose key is present in either
    • countByKey (action)
    • carefull on the shuffle (latency)
    • creation : simple map

Spark in-depth

  • no foldLeft : as sequential makes no sense when distributed
  • shuffling : reduce by key is better as it reduce locally first
    • happen (rule of thumb) : can occur when the resulting RDD depends on other elements from the same RDD or another RDD
    • can be reduced by join or smart partitioning
    • use method toDebugString to check whether it was shuffled (ShuffledRDD)
  • partitioning
    • partitions never span multiple machines
    • one machine can hold multiple partition (by default total number of cores on executor nodes)
    • hash partitioning (spread evenly)
    • range partitioning (better for specific data locality)
      • val partitioned = pairs.partitionBy(new RangePartitioner(8, pairs)).persist()
      • argument : desired number of partitions and provide a pair RDD with ordered keys which is used to sample a suitable set of sorted ranges
      • parent heritage
      • automatically-set partitions (i.e. sortByKey give a RangePartitioner and groupByKey is a hashed one)
      • partionners propagation : cogroup, groupWith, joins, byKeys, sort, mapValues (if parent) and filter (if parent)
  • closures and capturing
    • bad serialization or impossible (require all caputred variable to be serializable)
    • too large (always pass local variable and not class-wide ones)
    • think of JVM replacement this.myvar
  • shared variables
    • broadcast variables : allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
      • creation : sc.broadcast(team)
      • usage : broadcastTeam.value.keys
    • accumulators : are variables that are only “added” to through an associative operation and can therefore be efficiently supported across nodes in parallel
      • use for counters
      • only numeric
      • creation : val badRecords = sc.accumulator(0)
      • usage : badRecords += 1
      • result : badRecords.value
      • actions : each tasks' update is applied to each accumulator only once
      • transformation : can be used more than one time (should only be used for debugging)

Reactive programming

Introduction

  • reactive applications : readily responsive to a stimulus
    • event-driven : react to events
    • scalable : react to load
    • resilient : react to failures (overcome software, hardware, network and be loose coupling, state encapsulated)
    • responsive : react to users (rich, real-time interaction, careful on system design)
  • traditionally : multiple threads which communicate with syncrhonized shared state (hard to compose)
  • nowadays : loosely coupled event handlers (asynchronously, without blocking)
  • scalability : ability to be expanded according to its usage (important to minimize shared mutable state)
    • scale up : make use of parallelism
    • scale out : make use of multiple server (must be location transparent and resilient)
  • call-backs : problem with shared mutable state, no composition
  • composition : fundamental constructions of functional programming
    • events are first class
    • events are often represented as messages
    • handlers of events also first-class
    • complex handler can be composed from primitive ones

Functional programming notions

  • partial matches : val f: String => String = { case ”ping” => ”pong” }
    • MatchError on f(”pong”)
    • trait
trait PartialFunction[-A, +R] extends Function1[-A, +R] {
	def apply(x: A): R
	def isDefinedAt(x: A): Boolean
}
  • for-expression
    • for (x <- e1) yield e2 : e1.map(x => e2)
    • for (x <- e1 if f; s) yield e2 : for (x <- e1.withFilter(x => f); s) yield e2 (potentially empty)
    • for (x <- e1; y <- e2; s) yield e3 : e1.flatMap(x => for (y <- e2; s) yield e3)
    • pat <- expr : x <- expr withFilter { case pat => true; case _ => false } map { case pat => x }
  • random generator
trait Generator[+T] {
	self => // an alias for ”this”.
	def generate: T
	def map[S](f: T => S): Generator[S] = new Generator[S] {
		def generate = f(self.generate)
	}
	def flatMap[S](f: T => Generator[S]): Generator[S] = new Generator[S] {
		def generate = f(self.generate).generate
	}
}
  • scalacheck : forAll { (l1: List[Int], l2: List[Int]) => l1.size + l2.size == (l1 ++ l2).size }
  • monads (bind) : is a parametric type M[T] with following rules
    • unit operation : def unit[T](x: T): M[T]
    • flatMap : trait M[T] { def flatMap[U](f: T => M[U]): M[U] }
    • List, Set, Option, Generator
    • map is flatmap with unit : m map f == m flatMap (x => unit(f(x))) == m flatMap (f andThen unit)
    • associativity : m flatMap f flatMap g == m flatMap (x => f(x) flatMap g)
    • left unit : unit(x) flatMap f == f(x)
    • right unit : m flatMap unit == m
  • Try[+T] : pass computation that can fail between threads or computers
    • Success[T](x: T) extends Try[T]
    • Failure(ex: Excption) extends Try[Nothing]
    • left unit fails but ...
    • bullet-proof principle : an expression composed from Try, map, flatMap will never throw a non-fatal exception
  • statefulness : has a state because it depends on history
  • referential transparency : whether two vars are the same by operational equivalence (no possible test can distinguish between them)
  • observer pattern : publish/subscribe
trait Publisher {
	private var subscribers: Set[Subscriber] = Set()
	def subscribe(subscriber: Subscriber): Unit =
		subscribers += subscriber
	def unsubscribe(subscriber: Subscriber): Unit =
		subscribers -= subscriber
	def publish(): Unit =
		subscribers.foreach(_.handler(this))
}
trait Subscriber {
	def handler(pub: Publisher)
}
- force imperative pattern as unit typed
- moving part need to be co-ordinated
- concurrenvy makes things more complicated
- 1/3 in Adobe applications are devoted to events and 1/2 of the bugs are in it

Funtional reactive programming

  • reacting to sequences of events that happen in time
  • Signal[T] : change value over time (instead of propagating updates, define new signals in terms of existing ones)
    • obtain value : need for ()
    • constructor : Signal { val pos = mousePosition(); LL <= pos && pos <= UR } (immutable)
    • variable signal : Var(3)
    • update : sig.update(5) or sig() = 5
    • avoid cyclic definition by pulling out the value to be updated
    • stackable variables
class StackableVariable[T](init: T) {
	private var values: List[T] = List(init)
	def value: T = values.head
	def withValue[R](newValue: T)(op: => R): R = {
		values = newValue :: values
		try op finally values = values.tail
	}
}
- signal
class Signal[T](expr: => T) {
	import Signal._
	private var myExpr: () => T = _
	private var myValue: T = _
	private var observers: Set[Signal[_]] = Set()
	update(expr)
	protected def update(expr: => T): Unit = {
		myExpr = () => expr
		computeValue()
	}
	protected def computeValue(): Unit = {
		val newValue = caller.withValue(this)(myExpr())
		if (myValue != newValue) {
			myValue = newValue
			val obs = observers
			observers = Set()
			obs.foreach(_.computeValue())
		}		
	}
	def apply() = {
		observers += caller.value
		assert(!caller.value.observers.contains(this), ”cyclic signal definition”)
		myValue
	}
}
- thread-local state (each having its copy) : DynamicVariable (but can lead to performance problem and messy situations)
	- best would be having an implicit parameters to pass its current value
- continous signal : done by sampling instead of propagation

Futures

  • synchronous
    • one : T/Try[T]
    • many : Iterable[T]
  • asynchronous
    • one : Future[T]
    • many : Observable[T]
  • synchrone <=> asynchrone : duality
  • one <=> many : cardinality
  • honest type of return : "monad" Try
def map[S](f: T => S): Try[S] = this match{
	case Success(value) => Try(f(value))
	case failure@Failure(t) => failure
}
  • Duration
import scala.language.postfixOps
object Duration {
	def apply(length: Long, unit: TimeUnit): Duration
}
val fiveYears = 1826 minutes
  • type that handle latency : "monad" Future
    • need context : import scala.concurrent.ExecutionContext.Implicits.global
    • callback : def onComplete(callback: Try[T] => Unit)(implicit executor: ExecutionContext): Unit
    • need to use pattern matching in callback : sucess or error
    • recover : def recover(f: PartialFunction[Throwable,T]): Future[T]
    • recoverWith : def recover(f: PartialFunction[Throwable,Future[T]]): Future[T]
    • fallbackTo : def fallbackTo(that: =>Future[T]): Future[T]
    • retry : def retry(noTimes: Int)(block: =>Future[T]): Future[T]
    • await : val c = Await.result(confirmation, 2 seconds); println(c.toText)
trait Awaitable[T] extends AnyRef {
	abstract def ready(atMost: Duration): Unit
	abstract def result(atMost: Duration): T
}
trait Future[T] extends Awaitable[T] {
	def filter(p: T=>Boolean): Future[T]
	def flatMap[S](f: T=>Future[S]): Future[U]
	def map[S](f: T=>S): Future[S]
	def recoverWith(f: PartialFunction[Throwable, Future[T]]): Future[T]
}
object Future {
	def apply[T](body : =>T): Future[T]
}
def sequence[T](fts: List[Future[T]]): Future[List[T]] = {
	fts match {
		case Nil => Future(Nil)
		case (ft::fts) => ft.flatMap(t => sequence(fts).flatMap(ts => Future(t::ts)))
	}
}
  • async await magic
    • async : def async[T](body: =>T)(implicit context: ExecutionContext) :Future[T]
    • await : def await[T](future: Future[T]): T
    • always : async{ … await{…}…}
    • illegal use
      • await need direclty-enclosing async : no await inside closure nested within an async block
      • no await inside expression by-name parameter
      • no await inside Boolean short-circuit argument
      • no return expression inside async
      • no await under try/catch
def flatMap[S](f: T  Future[S]): Future[S] = async {
	val x: T = await { this }
	await { f(x) }
}
def retry(noTimes: Int)(block: Future[T]): Future[T] = async {
	var i = 0
	var result: Try[T] = Failure(new Exception(“…”))
	while (result.isFailure && i < noTimes) {
		result = await { Try(block) }
		i += 1
	}
	result.get
}
def filter(p: T  Boolean): Future[T] = async {
	val x = await { this }
	if (!p(x)) {
		throw new NoSuchElementException()
	} else {
		x
	}
}
  • promise
trait Promise[T] {
	def future: Future[T]
	def complete(result: Try[T]): Unit
	def tryComplete(result: Try[T]): Boolean
	def success(value: T): Unit = this.complete(Success(value))
	def failure(t: Throwable): Unit = this.complete(Failure(t))
}
def filter(pred: T  Boolean): Future[T] = {
	val p = Promise[T]()
	this onComplete {
		case Failure(e) 
			p.failure(e)
		case Success(x) 
			if (!pred(x)) p.failure(new NoSuchElementException)
			else p.success(x)
	}
	p.future
}
  • duality : Try <=> Future by simplifying types ((Try[T] => Unit) => Unit to () => (() => Try[T]))

Observables

  • Iterable : "monad"
trait Iterable[T] {
	def iterator(): Iterator[T]
}
trait Iterator[T] {
	def hasNext: Boolean
	def next(): T
}
  • Observables : "monad" by duality reversion (()=>(()=>Try[Option[T]]) to (Try[Option[T]]=>Unit)=>Unit)
trait Observable[T] {
	def Subscribe(observer: Observer[T]): Subscription
}
trait Observer[T] {
	def onNext(value: T): Unit
	def onError(error: Throwable): Unit
	def onCompleted(): Unit
}
trait Subscription {
	def unsubscribe(): Unit
	def isUnsubscribed: Boolean
}
- examples
val ticks: Observable[Long] = Observable.interval(1 seconds)
val evens: Observable[Long] = ticks.filter(_%2==0)
val bufs: Observable[Seq[Long]] = evens.slidingBuffer(2,1)
val s = bufs.subscribe(println(_))
s.unsubscribe()
- RX operators : marble diagrams
	- map
	- flatMap : second application could overlap with some other first application
	- flatten : respect time
	- merge : if one fails, everything stop
	- concat : flatten but with order respect
	- groupBy : separate flux
def flatMap(f: T=>Observable[S]): Observable[S] = { map(f).merge }
  • cold observer : emit when its observable finds it convienient
    • each subscriber has its own private source
    • subscription causes side effect
  • hot observer : emit when created
    • same source shared by all subscribers
    • no side effect
  • subscritions
    • CompositeSubscription : collection of subscriptions
    • MultiAssignementSubscription : swap underlying subscription
    • SerialSubscription : unsubscribed when swapped
    • idempotent : has same effect if called once or multiple times
trait Subscription {
	def unsubscribe(): Unit
	def isUnsubscribed: Boolean
}
object Subscription {
	def apply(unsubscribe: =>Unit): Subscription
}
  • RX streams
    • error
    • startWith
    • filter
    • map
    • reduce
    • create{ onNext(); onNext; onComplete } : def create[T](s: Observer[T] => Subscription): Observable[T]
object Observable {
	def create[T](subscribe: Observer[T]  Subscription): Observable[T]
	def apply[T](f: Subscriber[T] => Unit): Observable[T]
}
def never(): Observable[Nothing] =
	Observable[Nothing](subscriber=> {})
def empty(): Observable[Nothing] =
	Observable[Nothing](subscriber=> {
		subscriber.onCompleted()
	})
def +:[U >: T](elem: U): = {
	Observable.create[T](observer  {
		observer.onNext(s); this.subscribe(observer)
	})
}
def filter(p: T  Boolean): Observable[T] = {
	Observable.create[T](observer  {
		this.subscribe(
			(t: T) => if(p(t)) observer.onNext(t),
			(e: Throwable) => observer.onError(e),
			() => observer.onCompleted()
		)
	})
}
def map[S](f: T=>S): Observable[S] = {
	Observable.create[S](observer  {
		subscribe (
			(t: T) => observer.onNext(f(t)),
			(e: Throwable) => observer.onError(e),
			() => observer.onCompleted()
		)
	})
}
  • Subject[T] : Promise equivalent for many (stop if error)
    • can be subscribe/unsubscribe as wanted
    • PublishSubject : current value
    • ReplaySubject : repeat and caches all values
    • AsyncSubject : caches final value
    • BehaviorSubject : caches lastest value
object Observable {
	def apply[T](f: Future[T]): Observable[T] = {
		val subject = AsyncSubject[T]()
		f.onComplete {
			case Failure(e)  { subject.onError(e) }
			case Success(t)  {
				subject.onNext(t)
				subject.onCompleted()
			}
		}
		subject
	}
}
  • observable notification : case class enclosing message (materialize method)
  • Schedulder
trait ExecutionContext {
	def execute(runnable: Runnable): Unit
}
trait Scheduler {
	def now: Long
	def createWorker: Worker
}
trait Worker extends Subscription {
	def schedule (action: => Unit): Subscription
	def schedule(delay: Duration)(action: => Unit): Subscription
	def schedulePeriodically(initialDelay: Duration, period: (action: => Unit): Subscription
	def scheduleRec (action: => Unit): Subscription
	def now: Long
}
  • observable contract
    • auto-unsubscribe : when onCompleted or onError

Actors

  • scala lock : synchronized(obj) { }
    • block
    • introduce dead-lock risk
  • actor model : represents objects and their interactions build upon the laws of physics
  • an actor
    • is an object with identity
    • has a behiavor
    • only interacts using asynchronous message passing
    • send message
    • create actors
    • designate the behavior for the next message
type Receive = PartialFunction[Any, Unit]
trait Actor {
	implicit val self: ActorRef
	implicit val context: ActorContext
	def receive: Receive
	
}
trait ActorContext {
	def become(behavior: Receive, discardOld: Boolean = true): Unit
	def unbecome(): Unit
	def stop(a: ActorRef): Unit
	def actorOf(p: Props, name: String): ActorRef
}
abstract class ActorRef {
	def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit
	def tell ...
	
}
  • new actor : context.actorOf(Props[Counter], "counter")
  • message processing semantics
    • local execution, no notion of global synchronization
    • fully concurrent
    • actor itself is single-threaded
    • processing one message is the atomic unit
    • enqueues message
    • delivery guarantess : unreliable
      • at-most-once : sending once delivers 0 or 1 time
      • at-leat-once : resending until acknowledged
      • exactly-once : processing only first reception delivers 1 time
      • no reordering possible
    • reliable messaging
      • all emssage can be persisted
      • can include unique ID
      • retry delivery
  • actor should be created in its companion object as well as its message
  • logging : extending ActorLogging and use log.debug()
  • data structures should be immutable but in var
  • receive timeout : context.setReceiveTimeout(10 seconds)
  • scheduler : to send message to itself (otherwise break encapsulation)
    • scheduleOnce : delay, target
  • testing actors : expectMsg("happy")

Supervisors

  • failure : containment and automatic response needed
    • terminating or restarting : Restart, Stop, Escalate
    • decision taken by supervisor (the creator)
    • supervisorStrategy
      • OneForOneStrategy : partial function with error and reaction
      • AllForOneStrategy : allow finite number of restart (in a time)
    • events
      • preStart
      • preRestart
      • postRestart
      • postStop
  • DeathWatch : once an actor stopped, no message will come from him
  • lifecycle monitoring : def watch(target: ActorRef): ActorRef and may receive Terminated(true) (false if not sure whether it has existed) until unwatch
  • eventstream
    • subscribe(sub, topic)
    • unsubscribe(sub)
    • publish(event)
  • persistance : ability to recover state at (re)start
    • in-place updates : constant time, depends on volume records
    • persist changes in append-only fashion : history can be replayed, immutable
    • can be async or sync (not confirmation until persist confirmation) : tradeoff between throughput and consistency

Distributed actors