Permalink
Browse files

after presentation

  • Loading branch information...
1 parent 63ae304 commit 37c3b9ed810d6967468e41ec9d3ecf858c9d6fd5 @rkuhn committed Mar 21, 2015
Showing with 56 additions and 0 deletions.
  1. +56 −0 src/main/scala/com/typesafe/TheDemo.scala
@@ -25,4 +25,60 @@ object TheDemo extends App {
implicit val timeout = Timeout(3.seconds)
import sys.dispatcher
+ val numbers = Source(List(1, 2, 3))
+ val strings = Source(List("a", "b", "c"))
+
+ val composite = Source() { implicit b =>
+ val zip = b.add(Zip[Int, String]())
+
+ numbers ~> zip.in0
+ strings ~> zip.in1
+
+ zip.out
+ }
+
+ val fast = Source(() => Iterator from 0)
+
+ val single = Flow[Int].withAttributes(OperationAttributes.inputBuffer(1, 1))
+ val f = Flow[ByteString].mapAsync(x => after(1.second, sys.scheduler)(Future.successful(x)))
+ // .via(single)
+ // .runForeach(println)
+
+ import Protocols._
+
+ val codec = BidiFlow() { implicit b =>
+ val top = b.add(Flow[Message].map(toBytes))
+ val bottom = b.add(Flow[ByteString].map(fromBytes))
+
+ BidiShape(top, bottom)
+ }
+
+ val protocol = codec atop framing
+
+ val addr = new InetSocketAddress("localhost", 0)
+ val server = StreamTcp().bind(addr).to(Sink.foreach { conn =>
+ conn.flow.join(protocol.reversed).join(Flow[Message]
+ .collect {
+ case Ping(id) => Pong(id)
+ }).run()
+ }).run()
+ val myaddr = Await.result(server, 1.second)
+
+ val client = StreamTcp().outgoingConnection(myaddr.localAddress)
+ val stack = protocol join client
+
+ Source(0 to 10).map(Ping).via(stack).runForeach(println)
+
+ val route =
+ pathPrefix("demo") {
+ getFromBrowseableDirectory("/Users/rkuhn/comp/demo/http")
+ } ~
+ path("upload") {
+ extractRequest { req =>
+ req.entity.dataBytes.via(f).to(Sink.ignore).run()
+ complete(StatusCodes.OK)
+ }
+ }
+
+ Http().bind("localhost", 8080).runForeach(conn => conn.flow.join(route).run())
}

0 comments on commit 37c3b9e

Please sign in to comment.