Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Random message ordering #64

Closed
etishka opened this issue Sep 20, 2017 · 1 comment
Closed

Random message ordering #64

etishka opened this issue Sep 20, 2017 · 1 comment

Comments

@etishka
Copy link

etishka commented Sep 20, 2017

Hi,

I have a RichCoFlatMapFunction with flatMap1 and flatMap2 functions for each kind of messages.

When I'm specifying order of original messages in a stream, the order they come to RichCoFlatMapFunction is arbitrary

I'm creating stream like that:
DataStream<Message> stream = createTestStreamWith(message1).emit(message2).close();

Doing assertion:
assertStream(job.processStream(stream), matcher);

processStream looks like:

DataStream<IN1> stream1 = ...
DataStream<IN2> stream2 = ...

return stream1.connect(stream2).keyBy("key1", "key2")
        .flatMap(new MyRichCoFlatMapFunction());

Is there a way to define strict order for messages?
Currently they can arrive to MyRichCoFlatMapFunction as "message1,message2" or "message2,message1" triggering flatMap1 or flatMap2 in arbitrary order, which is not good for unit testing

@lofifnc
Copy link
Contributor

lofifnc commented Sep 21, 2017

Hi,

This is an interesting case. The philosophy of flinkspector was always to simulate an environment as close to the actual cluster. Which is why for example tests run at default at a parallelism of 2. This approach is of course debatable, but the idea is to provide almost lightweight integration tests. Based on our experience this allows you to find certain errors, which would otherwise not be discovered until running your program on an actual cluster.

In your case you have two streams. And you would like to be able to define a order of messages across these two streams. And this order should be preserved until they arrive at a certain operator. With the current design of the framework, which just runs a local cluster and injects messages, I don't have enough control to achieve this in a satisfying way.

If you just wan't to unit test your RichCoFlatMapFunction I would suggest using a Test Harness:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java

This has also been our approach. Write tests for individual operators. And then test your operators in conjunction using flinkspector.

I hope this helps.

@lofifnc lofifnc closed this as completed Oct 16, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants