Skip to content

Commit

Permalink
=str akka#16787 java cookbook
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Feb 10, 2015
1 parent 8ce8f1d commit 7a0beda
Show file tree
Hide file tree
Showing 10 changed files with 884 additions and 31 deletions.
409 changes: 409 additions & 0 deletions akka-docs-dev/rst/java/stream-cookbook.rst

Large diffs are not rendered by default.

Expand Up @@ -121,16 +121,8 @@ void deliverBuf() {
@Test
public void demonstrateActorPublisherUsage() {
new JavaTestKit(system) {
class MockSystem {
class Println {
public <T> void println(T s) {
getTestActor().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get();

{
//#actor-publisher-usage
Expand Down
Expand Up @@ -491,17 +491,7 @@ public void illustrateOrderingAndParallelismOfMapAsync() throws Exception {
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get();

{
//#sometimes-slow-mapAsync
Expand Down Expand Up @@ -537,17 +527,7 @@ public void illustrateOrderingAndParallelismOfMapAsyncUnordered() throws Excepti
final TestProbe probe = new TestProbe(system);
final EmailServer emailServer = new EmailServer(probe.ref());

class MockSystem {
class Println {
public <T> void println(T s) {
if (s.toString().startsWith("after:"))
probe.ref().tell(s, ActorRef.noSender());
}
}

public final Println out = new Println();
}
private final MockSystem System = new MockSystem();
final SilenceSystemOut.System System = SilenceSystemOut.get();

{
//#sometimes-slow-mapAsyncUnordered
Expand Down
@@ -0,0 +1,43 @@
package docs.stream;

import akka.actor.ActorRef;

/**
* Acts as if `System.out.println()` yet swallows all messages.
* Useful for putting printlines in examples yet without poluting the build with them.
*/
public class SilenceSystemOut {

private SilenceSystemOut() {}

public static System get() {
return new System(new System.Println() {
@Override
public <T> void println(T s) {
}
});
}

public static System get(ActorRef probe) {
return new System(new System.Println() {
@Override
public <T> void println(T s) {
probe.tell(s, ActorRef.noSender());
}
});
}

public static class System {
public final Println out;

public System(Println out) {
this.out = out;
}

public static interface Println {
public <T> void println(T s);
}

}

}
@@ -0,0 +1,75 @@
/**
* Copyright (C) 2015 Typesafe <http://typesafe.com/>
*/
package docs.stream.cookbook;

import akka.actor.ActorSystem;
import akka.stream.ActorFlowMaterializer;
import akka.stream.FlowMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
import docs.stream.SilenceSystemOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

public class RecipeDroppyBroadcast extends RecipeTest {
static ActorSystem system;

@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
}

@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}

final FlowMaterializer mat = ActorFlowMaterializer.create(system);

@Test
public void work() throws Exception {
new JavaTestKit(system) {
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

//#droppy-bcast
// Makes a sink drop elements if too slow
public <T> Sink<T> droppySink(Sink<T> sink, int bufferSize) {
return Flow.<T>create().buffer(bufferSize, OverflowStrategy.dropHead()).to(sink);
}
//#droppy-bcast

{

final List<Integer> nums = new ArrayList<>();
for (int i = 0; i < 100; i++) {
nums.add(i + 1);
}

final Sink<Integer> mySink1 = Sink.ignore();
final Sink<Integer> mySink2= Sink.ignore();
final Sink<Integer> mySink3 = Sink.ignore();

final Source<Integer> myData = Source.from(nums);

//#droppy-bcast
final Broadcast<Integer> bcast = Broadcast.create();

final FlowGraph g = new FlowGraphBuilder()
.addEdge(myData, bcast)
.addEdge(bcast, droppySink(mySink1, 10))
.addEdge(bcast, droppySink(mySink2, 10))
.addEdge(bcast, droppySink(mySink3, 10))
.build();
//#droppy-bcast
}
};
}

}
@@ -0,0 +1,64 @@
/**
* Copyright (C) 2015 Typesafe <http://typesafe.com/>
*/
package docs.stream.cookbook;

import akka.actor.ActorSystem;
import akka.stream.ActorFlowMaterializer;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import docs.stream.SilenceSystemOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class RecipeFlattenList extends RecipeTest {
static ActorSystem system;

@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
}

@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}

final FlowMaterializer mat = ActorFlowMaterializer.create(system);

@Test
public void workWithPrintln() throws Exception {
new JavaTestKit(system) {
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

{
final Source<String> myData = Source.from(Arrays.asList("1", "2", "3"));
final int MAX_ALLOWED_SIZE = 100;

//#draining-to-list
final Future<List<String>> strings =
myData.grouped(MAX_ALLOWED_SIZE).runWith(Sink.head(), mat);
//#draining-to-list

List<String> got = Await.result(strings, new FiniteDuration(1, TimeUnit.SECONDS));
assertEquals(got.get(0), "1");
assertEquals(got.get(1), "2");
assertEquals(got.get(2), "3");
}
};
}

}
@@ -0,0 +1,94 @@
/**
* Copyright (C) 2015 Typesafe <http://typesafe.com/>
*/
package docs.stream.cookbook;

import akka.actor.ActorSystem;
import akka.stream.ActorFlowMaterializer;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.stage.Context;
import akka.stream.stage.Directive;
import akka.stream.stage.PushStage;
import akka.stream.stage.TerminationDirective;
import akka.testkit.JavaTestKit;
import docs.stream.SilenceSystemOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;

public class RecipeLoggingElements extends RecipeTest {
static ActorSystem system;

@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
}

@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}

final FlowMaterializer mat = ActorFlowMaterializer.create(system);

@Test
public void workWithPrintln() throws Exception {
new JavaTestKit(system) {
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

{
final Source<String> mySource = Source.from(Arrays.asList("1", "2", "3"));

//#println-debug
mySource.map(elem -> {
System.out.println(elem);
return elem;
})
//#println-debug
.runWith(Sink.ignore(), mat);
}
};
}

@Test
public void workWithPushStage() throws Exception {
new JavaTestKit(system) {
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());

{
final Source<String> mySource = Source.from(Arrays.asList("1", "2", "3"));

//#loggingadapter
final PushStage<String, String> LoggingStage = new PushStage<String, String>() {
@Override
public Directive onPush(String elem, Context<String> ctx) {
java.lang.System.out.println("Element flowing thought: " + elem);
return ctx.push(elem);
}

@Override
public TerminationDirective onUpstreamFinish(Context<String> ctx) {
java.lang.System.out.println("Upstream finished.");
return super.onUpstreamFinish(ctx);
}

@Override
public TerminationDirective onUpstreamFailure(Throwable cause, Context<String> ctx) {
java.lang.System.out.println("Upstream failed: " + cause.getMessage());
return super.onUpstreamFailure(cause, ctx);
}
};

mySource.transform(() -> LoggingStage)
//#loggingadapter
.runWith(Sink.ignore(), mat);
}
};
}

}

0 comments on commit 7a0beda

Please sign in to comment.