Skip to content

Commit

Permalink
test cases added, IBMStreams#2494
Browse files Browse the repository at this point in the history
  • Loading branch information
markheger committed Jul 7, 2020
1 parent 0571b5a commit 79c20a8
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions test/java/src/com/ibm/streamsx/topology/test/api/WindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,43 @@ public void testBasicCount() throws Exception {
assertNotNull(window);
assertWindow(f, window);
}

@Test
public void testSubmissionParamCount() throws Exception {
final Topology f = newTopology("CountWindowSubmissionParam");
TStream<String> source = f.strings("a", "b", "c");
Supplier<Integer> count = f.createSubmissionParameter("count", 10);
TWindow<String,?> window = source.last(count);
assertNotNull(window);
assertWindow(f, window);
}

@Test(expected=IllegalArgumentException.class)
public void testZeroCountWindowDefaultParam() throws Exception {
final Topology f = newTopology("testZeroCountWindowDefaultParam");
TStream<String> source = f.strings("a", "b", "c");
Supplier<Integer> count = f.createSubmissionParameter("count", 0);
source.last(count);
}

@Test(expected=IllegalArgumentException.class)
public void testZeroTimeWindowDefaultParam() throws Exception {
final Topology f = newTopology("testZeroTimeWindowDefaultParam");
TStream<String> source = f.strings("a", "b", "c");
Supplier<Integer> time = f.createSubmissionParameter("time", 0);
source.lastSeconds(time);
}

@Test
public void testSubmissionParamTime() throws Exception {
final Topology f = newTopology("TimeWindowSubmissionParam");
TStream<String> source = f.strings("a", "b", "c");
Supplier<Integer> time = f.createSubmissionParameter("time", 10);
TWindow<String,?> window = source.lastSeconds(time);
assertNotNull(window);
assertWindow(f, window);
}

@Test
public void testBasicTime() throws Exception {
final Topology f = newTopology("TimeWindow");
Expand Down Expand Up @@ -105,6 +141,18 @@ public void testCountAggregate() throws Exception {

completeAndValidate(aggregate, 10, "1", "3", "6", "9", "12", "15", "18");
}

@Test
public void testCountAggregateStv() throws Exception {
assumeTrue(!isEmbedded());
final Topology f = newTopology("CountAggregateStv");
TStream<Number> source = f.numbers(1, 2, 3, 4, 5, 6, 7);
Supplier<Integer> count = f.createSubmissionParameter("count", 3);
TWindow<Number,?> window = source.last(count);
TStream<Integer> aggregate = window.aggregate(new SumInt());

completeAndValidate(aggregate, 10, "1", "3", "6", "9", "12", "15", "18");
}

@Test
public void testKeyedAggregate() throws Exception {
Expand Down Expand Up @@ -289,15 +337,38 @@ public boolean test(String output) {

}



/**
* Test a periodic aggregation with submission parameter.
*/
@Test
public void testPeriodicAggregateLastSecondsStv() throws Exception {
assumeTrue(!isEmbedded());
final Topology t = newTopology();
TStream<String> source = t.periodicSource(new PeriodicStrings(), 100, TimeUnit.MILLISECONDS);

Supplier<Integer> time = t.createSubmissionParameter("time", 3);
TStream<JSONObject> aggregate = source.lastSeconds(time).aggregate(
new AggregateStrings(), 1, TimeUnit.SECONDS);
TStream<String> strings = JSONStreams.serialize(aggregate);

Tester tester = t.getTester();

Condition<String> checker = tester.stringTupleTester(strings, new PeriodicAggregateTester());

// 10 tuples per second, aggregate every second, so 15 seconds is around 15 tuples.
Condition<Long> ending = tester.atLeastTupleCount(strings, 15);
complete(tester, ending, 30, TimeUnit.SECONDS);

assertTrue(ending.valid());
assertTrue(checker.valid());
}

/**
* Test a periodic aggregation.
*/
@Test
public void testPeriodicAggregateLastSeconds() throws Exception {
assumeTrue(!isEmbedded());
final Topology t = newTopology();
TStream<String> source = t.periodicSource(new PeriodicStrings(), 100, TimeUnit.MILLISECONDS);

Expand Down

0 comments on commit 79c20a8

Please sign in to comment.