Skip to content

Commit

Permalink
Issue 6677: Adding integration test to cover timestampAggregationTime…
Browse files Browse the repository at this point in the history
…out (#6678)

Adding an integration test to test the use case of timed out writers not getting considered for subsequent watermarks.

Signed-off-by: SrishT <Srishti.Thakkar@emc.com>

Signed-off-by: Tom Kaitchuck <tkaitchuck@users.noreply.github.com>

Co-authored-by: Tom Kaitchuck <tkaitchuck@users.noreply.github.com>
  • Loading branch information
SrishT and tkaitchuck committed Apr 15, 2022
1 parent db14f5a commit 213576b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 8 deletions.
Expand Up @@ -112,7 +112,7 @@ public interface EventStreamWriter<Type> extends AutoCloseable {
* {@link EventWriterConfigBuilder#automaticallyNoteTime(boolean)} to true when creating a
* writer.
*
* @param timestamp a timestamp that represents the current location in the stream.
* @param timestamp a timestamp (in milliseconds) that represents the current location in the stream.
*/
void noteTime(long timestamp);

Expand Down
Expand Up @@ -61,16 +61,22 @@ public class StreamConfiguration implements Serializable {
private final RetentionPolicy retentionPolicy;

/**
* The duration after the last call to {@link EventStreamWriter#noteTime(long)} which the
* timestamp should be considered valid before it is forgotten. Meaning that after this long of
* not calling {@link EventStreamWriter#noteTime(long)} the writer will be forgotten.
* If there are no known writers, readers that call {@link EventStreamReader#getCurrentTimeWindow(Stream)}
* The duration after the last call to {@link EventStreamWriter#noteTime(long)} until which the
* writer would be considered for computing {@link EventStreamReader#getCurrentTimeWindow(Stream)}
* Meaning that after this long of not calling {@link EventStreamWriter#noteTime(long)}
* a writer's previously reported time would be ignored for computing the time window.
*
* However, after the timestampAggregationTimeout elapses the same writer may resume noting time
* at any time.
*
* If no writer have noted time within the timestampAggregationTimeout, readers that call
* {@link EventStreamReader#getCurrentTimeWindow(Stream)}
* will receive a `null` when they are at the corresponding position in the stream.
*
* @param timestampAggregationTimeout The duration after the last call to {@link EventStreamWriter#noteTime(long)}
* which the timestamp should be considered valid before it is forgotten.
* @return The duration after the last call to {@link EventStreamWriter#noteTime(long)} which the timestamp should
* be considered valid before it is forgotten.
* until which the writer would be considered active.
* @return The duration after the last call to {@link EventStreamWriter#noteTime(long)}
* to continue to consider the provided time.
*/
private final long timestampAggregationTimeout;

Expand Down
Expand Up @@ -461,6 +461,95 @@ public void progressingWatermarkWithWriterTimeouts() throws Exception {
Watermark nullMark = watermarks.poll(10, TimeUnit.SECONDS);
assertNull(nullMark);
}

@Test(timeout = 60000)
public void progressingWatermarkWithTimestampAggregationTimeout() throws Exception {
String scope = "Timeout";
String streamName = "progressingWatermarkWithTimestampAggregationTimeout";
int numSegments = 1;

ClientConfig clientConfig = ClientConfig.builder().controllerURI(PRAVEGA.getControllerURI()).build();

@Cleanup
StreamManager streamManager = StreamManager.create(clientConfig);
assertNotNull(streamManager);

streamManager.createScope(scope);

streamManager.createStream(scope, streamName, StreamConfiguration.builder()
.timestampAggregationTimeout(100L)
.scalingPolicy(ScalingPolicy.fixed(numSegments))
.build());
@Cleanup
EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, clientConfig);
@Cleanup
SynchronizerClientFactory syncClientFactory = SynchronizerClientFactory.withScope(scope, clientConfig);

String markStream = NameUtils.getMarkStreamForStream(streamName);
@Cleanup
RevisionedStreamClient<Watermark> watermarkReader = syncClientFactory.createRevisionedStreamClient(markStream,
new WatermarkSerializer(),
SynchronizerConfig.builder().build());

LinkedBlockingQueue<Watermark> watermarks = new LinkedBlockingQueue<>();
AtomicBoolean stopFlag = new AtomicBoolean(false);
fetchWatermarks(watermarkReader, watermarks, stopFlag);

// create two writers and write two sevent and call note time for each writer.
@Cleanup
EventStreamWriter<String> writer0 = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(),
EventWriterConfig.builder().build());
writer0.writeEvent("0").get();
writer0.noteTime(50L);

@Cleanup
EventStreamWriter<String> writer1 = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(),
EventWriterConfig.builder().build());
writer1.writeEvent("1").get();
writer1.noteTime(100L);

@Cleanup
EventStreamWriter<String> writer2 = clientFactory.createEventWriter(streamName,
new JavaSerializer<>(),
EventWriterConfig.builder().build());
writer2.writeEvent("2").get();
writer2.noteTime(102L);

// writer0, writer1 and writer2 should result in the first watermark with following time:
// 1: 50L-102L
// then writer0 should time out and be discarded. The remaining two writers should continue to be active
// which should result in the second watermark with the time:
// 2: 100L-102L
// then writer1 should timeout and be discarded. But writer2 should continue to be active as its time
// is higher than first watermark. This should result in a third watermark to be emitted.
AssertExtensions.assertEventuallyEquals(true, () -> watermarks.size() == 2, 100000);

Watermark watermark1 = watermarks.poll();
Watermark watermark2 = watermarks.poll();
Watermark watermark3 = watermarks.poll(10, TimeUnit.SECONDS);

assertEquals(50L, watermark1.getLowerTimeBound());
assertEquals(102L, watermark1.getUpperTimeBound());

assertEquals(100L, watermark2.getLowerTimeBound());
assertEquals(102L, watermark2.getUpperTimeBound());

assertEquals(102L, watermark3.getLowerTimeBound());
assertEquals(102L, watermark3.getUpperTimeBound());

// stream cut should be same
assertTrue(watermark2.getStreamCut().entrySet().stream().allMatch(x -> watermark1.getStreamCut().get(x.getKey()).equals(x.getValue())));

// bring back writer1 and post an event with note time smaller than current watermark
writer1.writeEvent("3").get();
writer1.noteTime(90L);

// no watermark should be emitted.
Watermark nullMark = watermarks.poll(10, TimeUnit.SECONDS);
assertNull(nullMark);
}

private void scale(Controller controller, Stream streamObj, StreamConfiguration configuration) {
// perform several scales
Expand Down

0 comments on commit 213576b

Please sign in to comment.