Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Configurable TimeCharacteristic for test environment #54

Closed
debelyoo opened this issue Aug 14, 2017 · 5 comments
Closed

Configurable TimeCharacteristic for test environment #54

debelyoo opened this issue Aug 14, 2017 · 5 comments

Comments

@debelyoo
Copy link

I'm trying to test some CEP with flink-spector, but I can't make it work.
My test is shown below. This test passes although it should not (the expected record should not match the actual one). I'm wondering if the CEP processing is applied correctly...
The code is in Scala, but I'm using the Java DataStream classes.

@Test
  def testWithFlinkspectorAndJavaStreams(): Unit = {
    val event1 = Event("xxx", "", "message")
    val event2 = Event("xxx", "", "web portal")
    val event3 = Event("xxx", "", "message")
    val event4 = Event("xxx", "", "message")
    val event5 = Event("xxx", "", "call")

    val input = createTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

    val pattern = Pattern.begin("event0").subtype(classOf[Event]).where(new EventActionCondition("message")).next("event1").where(new EventActionCondition("call"))

    val patternStream = CEP.pattern(input, pattern)
    val alertStream: DataStream[String] = patternStream.select(new AlertPatternSelectFunction())

    val output: ExpectedRecords[String] = ExpectedRecords.create[String]("Got one !")
    assertStream(alertStream, output)
  }

With the util classes:

class EventActionCondition(action: String) extends IterativeCondition[Event] {
  override def filter(value: Event, ctx: IterativeCondition.Context[Event]): Boolean = {
    value.action == action
  }
}
class AlertPatternSelectFunction extends PatternSelectFunction[Event, String] {
  override def select(pattern: util.Map[String, util.List[Event]]): String = {
    "got it !"
  }
}
@lofifnc
Copy link
Contributor

lofifnc commented Aug 15, 2017

I will have a detailed look at this later. I must confess I've never tested Flinkspector with CEP because I assumed It's producing the same results as the rest of the transformations.

@debelyoo
Copy link
Author

debelyoo commented Aug 15, 2017

Ok, I found what was the problem. My events do not have a timestamp and the stream time characteristic in the test environment is set to EventTime.
This is set in the initialize() method, and can not be changed.
@org.junit.Before public void initialize() throws Exception { testEnv = DataStreamTestEnvironment.createTestEnvironment(1); testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); }

Could you make this setting configurable ?

@debelyoo debelyoo reopened this Aug 15, 2017
@debelyoo debelyoo changed the title CEP testing is not working Configurable TimeCharacteristic for test environment Aug 15, 2017
@lofifnc
Copy link
Contributor

lofifnc commented Aug 15, 2017

Aaah, this is something which only happens when using CEP. The common non windowing transformations don't care if you have event time enabled and don't provide timestamps. Instead of changing the stream time charateristic you can instruct Flinkspector to provide timestamps.

Change this line:

val input = createTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

to

val input = createTimedTestStreamWith(event1).emit(event2).emit(event3).emit(event4).emit(event5).close()

and Flinkspector will provide timestamps.

This works for me and the test fails as expected. But I'm not familiar with cep so I'm not sure if this solution is sufficient.

@debelyoo
Copy link
Author

Yes, it's working fine. Thanks !

@lofifnc
Copy link
Contributor

lofifnc commented Aug 16, 2017

This issue has been bundled into #57

@lofifnc lofifnc closed this as completed Aug 16, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants