Skip to content

Commit

Permalink
Merge pull request akka#14 from rkuhn/wip-modules-javadsl-∂π
Browse files Browse the repository at this point in the history
Wip modules javadsl ∂π
  • Loading branch information
drewhk committed Feb 20, 2015
2 parents e4346ea + b2a7279 commit b819289
Show file tree
Hide file tree
Showing 76 changed files with 2,108 additions and 3,198 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package akka.stream.testkit

import akka.stream.scaladsl.FlowGraph.FlowGraphBuilder
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.{ FlowMaterializer, MaterializerSettings, Inlet, Outlet }
import akka.stream.scaladsl._
import org.reactivestreams.Publisher
import scala.collection.immutable
Expand All @@ -18,18 +17,18 @@ abstract class TwoStreamsSetup extends AkkaSpec {

type Outputs

abstract class Fixture(b: FlowGraphBuilder) {
def left: Graphs.InPort[Int]
def right: Graphs.InPort[Int]
def out: Graphs.OutPort[Outputs]
abstract class Fixture(b: Graph.Builder) {
def left: Inlet[Int]
def right: Inlet[Int]
def out: Outlet[Outputs]
}

def fixture(b: FlowGraphBuilder): Fixture
def fixture(b: Graph.Builder): Fixture

def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = StreamTestKit.SubscriberProbe[Outputs]()
FlowGraph() { implicit b
import FlowGraph.Implicits._
Graph.closed() { implicit b
import Graph.Implicits._
val f = fixture(b)

Source(p1) ~> f.left
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,17 @@ import org.scalatest.WordSpec

class DslConsistencySpec extends WordSpec with Matchers {

val sFlowClass = classOf[akka.stream.scaladsl.Flow[_, _]]
val jFlowClass = classOf[akka.stream.javadsl.Flow[_, _]]
val sFlowClass = classOf[akka.stream.scaladsl.Flow[_, _, _]]
val jFlowClass = classOf[akka.stream.javadsl.Flow[_, _, _]]

val sSourceClass = classOf[akka.stream.scaladsl.Source[_]]
val jSourceClass = classOf[akka.stream.javadsl.Source[_]]
val sSourceClass = classOf[akka.stream.scaladsl.Source[_, _]]
val jSourceClass = classOf[akka.stream.javadsl.Source[_, _]]

val sSinkClass = classOf[akka.stream.scaladsl.Sink[_]]
val jSinkClass = classOf[akka.stream.javadsl.Sink[_]]
val sSinkClass = classOf[akka.stream.scaladsl.Sink[_, _]]
val jSinkClass = classOf[akka.stream.javadsl.Sink[_, _]]

val sKeyClass = classOf[akka.stream.scaladsl.Key[_]]
val jKeyClass = classOf[akka.stream.javadsl.Key[_]]

val sMaterializedMapClass = classOf[akka.stream.scaladsl.MaterializedMap]
val jMaterializedMapClass = classOf[akka.stream.javadsl.MaterializedMap]

val jFlowGraphClass = classOf[akka.stream.javadsl.FlowGraph]
val sFlowGraphClass = classOf[akka.stream.scaladsl.FlowGraph]

val jPartialFlowGraphClass = classOf[akka.stream.javadsl.PartialFlowGraph]
val sPartialFlowGraphClass = classOf[akka.stream.scaladsl.PartialFlowGraph]
val jRunnableFlowClass = classOf[akka.stream.javadsl.RunnableFlow[_]]
val sRunnableFlowClass = classOf[akka.stream.scaladsl.RunnableFlow[_]]

val ignore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Expand All @@ -46,9 +37,8 @@ class DslConsistencySpec extends WordSpec with Matchers {
jSourceClass -> Set("timerTransform"),
jSinkClass -> Set(),

sFlowGraphClass -> Set("builder"),
jFlowGraphClass Set("graph", "cyclesAllowed"),
jPartialFlowGraphClass Set("graph", "cyclesAllowed", "disconnectedAllowed"))
sRunnableFlowClass -> Set("builder"),
jRunnableFlowClass Set("graph", "cyclesAllowed"))

def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[FlowMaterializer])

Expand All @@ -63,10 +53,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
("Source" -> List(sSourceClass, jSourceClass)) ::
("Flow" -> List(sFlowClass, jFlowClass)) ::
("Sink" -> List(sSinkClass, jSinkClass)) ::
("Key" -> List(sKeyClass, jKeyClass)) ::
("MaterializedMap" -> List(sMaterializedMapClass, jMaterializedMapClass)) ::
("FlowGraph" -> List(sFlowGraphClass, jFlowGraphClass)) ::
("PartialFlowGraph" -> List(sPartialFlowGraphClass, jPartialFlowGraphClass)) ::
("RunanbleFlow" -> List(sRunnableFlowClass, jRunnableFlowClass)) ::
Nil foreach {
case (element, classes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,21 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Function[_, _]]) ::
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
(classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) ::
(classOf[akka.stream.scaladsl.Source[_]], classOf[akka.stream.javadsl.Source[_]]) ::
(classOf[akka.stream.scaladsl.KeyedSource[_, _]], classOf[akka.stream.javadsl.KeyedSource[_, _]]) ::
(classOf[akka.stream.scaladsl.Sink[_]], classOf[akka.stream.javadsl.Sink[_]]) ::
(classOf[akka.stream.scaladsl.KeyedSink[_, _]], classOf[akka.stream.javadsl.KeyedSink[_, _]]) ::
(classOf[akka.stream.scaladsl.Flow[_, _]], classOf[akka.stream.javadsl.Flow[_, _]]) ::
(classOf[akka.stream.scaladsl.FlowGraph], classOf[akka.stream.javadsl.FlowGraph]) ::
(classOf[akka.stream.scaladsl.PartialFlowGraph], classOf[akka.stream.javadsl.PartialFlowGraph]) ::
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
(classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) ::
(classOf[akka.stream.scaladsl.RunnableFlow[_]], classOf[akka.stream.javadsl.RunnableFlow[_]]) ::
Nil
// format: ON

val sKeyedSource = classOf[scaladsl.KeyedSource[_, _]]
val jKeyedSource = classOf[javadsl.KeyedSource[_, _]]
val sSource = classOf[scaladsl.Source[_, _]]
val jSource = classOf[javadsl.Source[_, _]]

val sKeyedSink = classOf[scaladsl.KeyedSink[_, _]]
val jKeyedSink = classOf[javadsl.KeyedSink[_, _]]
val sSink = classOf[scaladsl.Sink[_, _]]
val jSink = classOf[javadsl.Sink[_, _]]

val sSource = classOf[scaladsl.Source[_]]
val jSource = classOf[javadsl.Source[_]]

val sSink = classOf[scaladsl.Sink[_]]
val jSink = classOf[javadsl.Sink[_]]

val sFlow = classOf[scaladsl.Flow[_, _]]
val jFlow = classOf[javadsl.Flow[_, _]]
val sFlow = classOf[scaladsl.Flow[_, _, _]]
val jFlow = classOf[javadsl.Flow[_, _, _]]

"Java DSL" must provide {
"Source" which {
Expand Down Expand Up @@ -96,7 +87,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
if (m.getDeclaringClass == akka.stream.scaladsl.Source.getClass
&& m.getName == "apply"
&& m.getParameterTypes.length == 1
&& m.getParameterTypes()(0) == classOf[scala.Function1[akka.stream.scaladsl.FlowGraphBuilder, akka.stream.scaladsl.UndefinedSink[_]]])
&& m.getParameterTypes()(0) == classOf[scala.Function1[_, _]])
false // conflict between two Source.apply(Function1)
else
true
Expand Down Expand Up @@ -182,10 +173,8 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
* If scaladsl is not a keyed type, javadsl shouldn't be as well.
*/
def returnTypeMatch(s: Class[_], j: Class[_]): Boolean =
(sKeyedSink.isAssignableFrom(s) && jKeyedSink.isAssignableFrom(j)) ||
(sKeyedSource.isAssignableFrom(s) && jKeyedSource.isAssignableFrom(j)) ||
(sSource.isAssignableFrom(s) && jSource.isAssignableFrom(j) && !jKeyedSource.isAssignableFrom(j)) ||
(sSink.isAssignableFrom(s) && jSink.isAssignableFrom(j) && !jKeyedSink.isAssignableFrom(j)) ||
(sSource.isAssignableFrom(s) && jSource.isAssignableFrom(j)) ||
(sSink.isAssignableFrom(s) && jSink.isAssignableFrom(j)) ||
(sFlow.isAssignableFrom(s) && jFlow.isAssignableFrom(j))

def typeMatch(scalaParams: Array[Class[_]], javaParams: Array[Class[_]]): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ package akka.stream.actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.stream.scaladsl.Broadcast
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.Merge
import akka.stream.scaladsl._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
Expand Down Expand Up @@ -290,12 +286,12 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref))))
val sink2 = Sink[String](receiverProps(probe2.ref))

val senderRef2 = FlowGraph(Source[Int](senderProps)) { implicit b
val senderRef2 = Graph.closed(Source[Int](senderProps)) { implicit b
source2
import FlowGraph.Implicits._
import Graph.Implicits._

val merge = Merge[Int](2)
val bcast = Broadcast[String](2)
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[String](2))

source1 ~> merge.in(0)
source2.outlet ~> merge.in(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package akka.stream.impl
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
import akka.stream._

class StreamLayoutSpec extends AkkaSpec {
import StreamLayout._

def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module {
override val inPorts: Set[InPort] = List.fill(inPortCount)(new InPort).toSet
override val outPorts: Set[OutPort] = List.fill(outPortCount)(new OutPort).toSet
override val shape = AmorphousShape(List.fill(inPortCount)(new Inlet("")), List.fill(outPortCount)(new Outlet("")))

override def subModules: Set[Module] = Set.empty
override def downstreams: Map[OutPort, InPort] = Map.empty
Expand Down

0 comments on commit b819289

Please sign in to comment.