Skip to content

Commit

Permalink
added method to map materialized values
Browse files Browse the repository at this point in the history
  • Loading branch information
Endre Sándor Varga committed Feb 20, 2015
1 parent d32ef09 commit e4346ea
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 16 deletions.
17 changes: 17 additions & 0 deletions akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ private[akka] object StreamLayout {
sealed trait MaterializedValueNode
case class Combine(f: (Any, Any) Any, dep1: MaterializedValueNode, dep2: MaterializedValueNode) extends MaterializedValueNode
case class Atomic(module: Module) extends MaterializedValueNode
case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode
case object Ignore extends MaterializedValueNode

case class Mapping(module: Module, inPorts: Map[InPort, InPort], outPorts: Map[OutPort, OutPort])
Expand Down Expand Up @@ -53,6 +54,21 @@ private[akka] object StreamLayout {
attributes)
}

def transformMaterializedValue(f: Any Any): Module = {
CompositeModule(
subModules = this.subModules,
inPorts,
outPorts,
downstreams,
upstreams,
Transform(f, this.materializedValueComputation),
carbonCopy = () {
val copy = this.carbonCopy()
copy.copy(module = copy.module.transformMaterializedValue(f))
},
attributes)
}

def grow(that: Module): Module = grow(that, Keep.left)

def grow[A, B, C](that: Module, f: (A, B) C): Module = {
Expand Down Expand Up @@ -208,6 +224,7 @@ abstract class MaterializerSession(val topLevel: StreamLayout.Module) {
private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, Any]): Any = matNode match {
case Atomic(m) materializedValues(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
case Transform(f, d) f(resolveMaterialized(d, materializedValues))
case Ignore ()
}

Expand Down
24 changes: 15 additions & 9 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ final class Flow[-In, +Out, +Mat](m: StreamLayout.Module, val inlet: Graphs.InPo
inlet)
}

def mapMaterialized[Mat2](f: Mat Mat2): Repr[Out, Mat2] = {
val sourceCopy = module.carbonCopy()
new Flow(
sourceCopy.module.transformMaterializedValue(f.asInstanceOf[Any Any]),
sourceCopy.inPorts(inlet).asInstanceOf[Graphs.InPort[In]],
sourceCopy.outPorts(outlet).asInstanceOf[Graphs.OutPort[Out]])
}

/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]
*/
Expand All @@ -85,8 +93,8 @@ final class Flow[-In, +Out, +Mat](m: StreamLayout.Module, val inlet: Graphs.InPo
.connect(flowCopy.outlet, this.inlet))
}

def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[Unit] = {
join(flow, (_: Mat, _: Mat2) ())
def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[Mat2] = {
join(flow, (_: Mat, m2: Mat2) m2)
}

// FIXME: Materialized value is not combined!
Expand Down Expand Up @@ -114,8 +122,7 @@ final class Flow[-In, +Out, +Mat](m: StreamLayout.Module, val inlet: Graphs.InPo
new Flow[In, U, Mat2](module.grow(op, (m: Mat, m2: Mat2) m2).connect(outlet, op.inPort), inlet, op.outPort.asInstanceOf[Graphs.OutPort[U]])
}

/** INTERNAL API */
override private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = {
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = {
val newModule = module.withAttributes(attr)
new Flow(newModule, newModule.inPorts.head.asInstanceOf[Graphs.InPort[In]], newModule.outPorts.head.asInstanceOf[Graphs.OutPort[Out]])
}
Expand All @@ -129,7 +136,7 @@ final class Flow[-In, +Out, +Mat](m: StreamLayout.Module, val inlet: Graphs.InPo
source.via(this, (sm: Mat1, fm: Mat) sm).to(sink, (sourcem: Mat1, sinkm: Mat2) (sourcem, sinkm)).run()
}

def section[I <: In, O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[I, O, Mat3] = {
def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat3] = {
val subFlow = section(Flow[O2]).withAttributes(attributes).carbonCopy()
new Flow(
module
Expand All @@ -142,8 +149,8 @@ final class Flow[-In, +Out, +Mat](m: StreamLayout.Module, val inlet: Graphs.InPo
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[I <: In, O, O2 >: Out, Mat2](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[I, O, Mat2] = {
this.section[I, O, O2, Mat2, Mat2](attributes, (parentm: Mat, subm: Mat2) subm)(section)
def section[O, O2 >: Out, Mat2](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat2] = {
this.section[O, O2, Mat2, Mat2](attributes, (parentm: Mat, subm: Mat2) subm)(section)
}

}
Expand Down Expand Up @@ -478,8 +485,7 @@ trait FlowOps[+Out, +Mat] {
private[akka] def timerTransform[U](mkStage: () TimerTransformer[Out, U]): Repr[U, Mat] =
andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]]))

/** INTERNAL API */
private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out, Mat]
def withAttributes(attr: OperationAttributes): Repr[Out, Mat]

/** INTERNAL API */
private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ object FlowGraph extends FlowGraphApply {

def outlet: OutPort[Out] = port.asInstanceOf[OutPort[Out]]

override private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")

override private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat] = {
Expand Down
7 changes: 7 additions & 0 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ final class Sink[-In, Mat](m: StreamLayout.Module, val inlet: Graphs.InPort[In])
def runWith[Mat2](source: Source[In, Mat2])(implicit materializer: FlowMaterializer): Mat =
source.to(this).run()

def mapMaterialized[Mat2](f: Mat Mat2): Sink[In, Mat2] = {
val sinkCopy = module.carbonCopy()
new Sink(
sinkCopy.module.transformMaterializedValue(f.asInstanceOf[Any Any]),
sinkCopy.inPorts(inlet).asInstanceOf[Graphs.InPort[In]])
}

def withAttributes(attr: OperationAttributes): Sink[In, Mat] = {
val newModule = module.withAttributes(attr)
new Sink(newModule, newModule.inPorts.head.asInstanceOf[Graphs.InPort[In]])
Expand Down
23 changes: 18 additions & 5 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ final class Source[+Out, +Mat](m: StreamLayout.Module, val outlet: Graphs.OutPor
.connect(outlet, sinkCopy.inlet))
}

def mapMaterialized[Mat2](f: Mat Mat2): Repr[Out, Mat2] = {
val sourceCopy = module.carbonCopy()
new Source(
sourceCopy.module.transformMaterializedValue(f.asInstanceOf[Any Any]),
sourceCopy.outPorts(outlet).asInstanceOf[Graphs.OutPort[Out]])
}

/** INTERNAL API */
override private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat] = {
// No need to copy here, op is a fresh instance
Expand Down Expand Up @@ -107,7 +114,7 @@ final class Source[+Out, +Mat](m: StreamLayout.Module, val outlet: Graphs.OutPor
* emitted by that source is emitted after the last element of this
* source.
*/
def concat[Out2 >: Out, Mat2](second: Source[Out2, Mat2]): Source[Out2, (Mat, Mat2)] = Source.concat(this, second)
def concat[Out2 >: Out](second: Source[Out2, _]): Source[Out2, Unit] = Source.concat(this, second)

/**
* Concatenates a second source so that the first element
Expand All @@ -116,7 +123,7 @@ final class Source[+Out, +Mat](m: StreamLayout.Module, val outlet: Graphs.OutPor
*
* This is a shorthand for [[concat]]
*/
def ++[Out2 >: Out, Mat2](second: Source[Out2, Mat2]): Source[Out2, (Mat, Mat2)] = concat(second)
def ++[Out2 >: Out](second: Source[Out2, _]): Source[Out2, Unit] = concat(second)

/**
* Applies given [[OperationAttributes]] to a given section.
Expand All @@ -134,8 +141,7 @@ final class Source[+Out, +Mat](m: StreamLayout.Module, val outlet: Graphs.OutPor
this.section[O, O2, Mat2, Mat2](attributes, (parentm: Mat, subm: Mat2) subm)(section)
}

/** INTERNAL API */
override private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = {
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = {
val newModule = module.withAttributes(attr)
new Source(newModule, newModule.outPorts.head.asInstanceOf[Graphs.OutPort[Out]])
}
Expand Down Expand Up @@ -273,7 +279,14 @@ object Source extends SourceApply {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] = ??? // FIXME
def concat[T](source1: Source[T, _], source2: Source[T, _]): Source[T, Unit] = {
source1.via(Flow() { implicit builder
import FlowGraph.Implicits._
val concat = Concat[T]()
source2 ~> concat.second
(concat.first, concat.out)
})
}

/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider {
* Convenience shortcut for: `flow.join(handler).run()`.
*/
def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat])(implicit materializer: FlowMaterializer): Mat =
flow.join(handler, (_: Unit, m: Mat) m).run()
flow.join(handler).run()

}

Expand Down

0 comments on commit e4346ea

Please sign in to comment.