Skip to content

Commit

Permalink
sample app for IBMStreams#2494
Browse files Browse the repository at this point in the history
  • Loading branch information
markheger committed Jul 6, 2020
1 parent 3c04815 commit 2bc7fe5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 34 deletions.
4 changes: 4 additions & 0 deletions samples/java/functional/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
<java classname="simple.HelloWorld" classpathref="run.classpath" fork="yes"/>
</target>

<target name="run.temperatureSensor.bundle">
<java classname="simple.TemperatureSensor" classpathref="run.classpath" fork="yes"/>
</target>

<target name="run.temperatureSensor">
<java classname="simple.TemperatureSensor" classpathref="run.classpath" fork="yes">
<arg value="STANDALONE"/>
Expand Down
49 changes: 15 additions & 34 deletions samples/java/functional/src/simple/TemperatureSensor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,32 @@
import com.ibm.streamsx.topology.TWindow;

/**
* Sample Hello World topology application. This Java application builds a
* simple topology that prints Hello World to standard output. <BR>
* Sample topology application. This Java application builds a
* simple topology that demonstrates the usage of submission time parameter for time based window. <BR>
* The application implements the typical pattern of code that declares a
* topology followed by submission of the topology to a Streams context
* {@code com.ibm.streamsx.topology.context.StreamsContext}.
* <BR>
* This demonstrates the mechanics of declaring a topology and executing it.
* This demonstrates the mechanics of declaring a topology with submission time parameter and executing it.
* <P>
* This may be executed from the {@code samples/java/functional} directory as:
* <UL>
* <LI>{@code ant run.helloworld} - Using Apache Ant, this will run in embedded
* <LI>{@code ant run.temperaturesensor} - Using Apache Ant, this will run in standalone
* mode.</LI>
* <LI>{@code ant run.temperaturesensor.bundle} - Using Apache Ant, this will create the bundle only.</LI>
* <LI>
* {@code java -jar funcsamples.jar:../com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar
* simple.HelloWorld [CONTEXT_TYPE]
* simple.TemperatureSensor [CONTEXT_TYPE]
* } - Run directly from the command line.
* </LI>
* If no arguments are provided then the topology is executed in embedded mode,
* within this JVM.
* If no arguments are provided then the topology is executed in bundle mode.
* <BR>
* <i>CONTEXT_TYPE</i> is one of:
* <UL>
* <LI>{@code DISTRIBUTED} - Run as an IBM Streams distributed
* application.</LI>
* <LI>{@code STANDALONE} - Run as an IBM Streams standalone
* application.</LI>
* <LI>{@code EMBEDDED} - Run embedded within this JVM.</LI>
* <LI>{@code BUNDLE} - Create an IBM Streams application bundle.</LI>
* <LI>{@code TOOLKIT} - Create an IBM Streams application toolkit.</LI>
* </UL>
Expand All @@ -55,9 +54,10 @@
public class TemperatureSensor {

/**
* Sample Hello World topology application.
* Sample topology application.
* This Java application builds a simple topology
* that prints Hello World to standard output.
* that demonstrates the usage of submission time parameter for time based window
* and finally prints the aggregates values to standard output.
* <BR>
* The application implements the typical pattern
* of code that declares a topology followed by
Expand All @@ -78,29 +78,16 @@ public static void main(String[] args) throws Exception {
Random random = new Random();

@SuppressWarnings("unchecked")
TStream<Double> readingsSource = topology.endlessSource(new Supplier<Double>(){
TStream<Double> readings = topology.endlessSource(new Supplier<Double>(){
@Override
public Double get() {
return random.nextGaussian();
}

});

Supplier<Integer> width = topology.createSubmissionParameter("width", Integer.class);
TStream<Double> parallelReadings = readingsSource.parallel(width);

TStream<Double> parallelRegion = parallelReadings.transform(new Function<Double, Double>(){
@Override
public Double apply(Double temp) {
return temp;
}
});

TStream<Double> readings = parallelRegion.endParallel();


Supplier<Long> time = topology.createSubmissionParameter("time", Long.class);
TWindow<Double, ?> lastNSeconds = readings.last(time, TimeUnit.SECONDS);
Supplier<Integer> time = topology.createSubmissionParameter("time", 5);
TWindow<Double, ?> lastNSeconds = readings.lastSeconds(time);

TStream<Double> maxTemp = lastNSeconds.aggregate(new Function<List<Double>, Double>(){
@Override
Expand All @@ -115,15 +102,9 @@ public Double apply(List<Double> temps) {

});
maxTemp.print();
/*
* Now execute the topology by submitting to a StreamsContext.
* If no argument is provided then the topology is executed
* within this JVM (StreamsContext.Type.EMBEDDED).
* Otherwise the first and only argument is taken as the
* String representation of the
*/

if (args.length == 0)
StreamsContextFactory.getEmbedded().submit(topology).get();
StreamsContextFactory.getStreamsContext("BUNDLE").submit(topology).get();
else
StreamsContextFactory.getStreamsContext(args[0]).submit(topology)
.get();
Expand Down

0 comments on commit 2bc7fe5

Please sign in to comment.