New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are mistakenly correlated because the prefix is equal #1802

Open
Zelldon opened this Issue Dec 18, 2018 · 0 comments

Comments

2 participants
@Zelldon
Copy link
Member

Zelldon commented Dec 18, 2018

Say we have an catch event which waits of a message with name 'msg' and correlation key 'foobar'.
If a message is published with message name 'msg' and correlation key 'foo', then this message will be tried correlate and an exception will be thrown:

07:14:26.642 [zb-stream-processor] [0.0.0.0:25601-zb-actors-1] ERROR io.zeebe.logstreams - Stream processor 'zb-stream-processor' failed to process event. It stop processing further events.
java.lang.IllegalStateException: Expected to find subscription with key 7089072632783437824, but no subscription found
	at io.zeebe.broker.subscription.message.state.MessageSubscriptionState.lambda$visitSubscriptions$0(MessageSubscriptionState.java:216) ~[classes/:?]
	at io.zeebe.logstreams.rocksdb.ZbRocksDb.forEachPrefixed(ZbRocksDb.java:277) ~[classes/:?]
	at io.zeebe.broker.subscription.message.state.MessageSubscriptionState.visitSubscriptions(MessageSubscriptionState.java:206) ~[classes/:?]
	at io.zeebe.broker.subscription.message.processor.PublishMessageProcessor.handleNewMessage(PublishMessageProcessor.java:98) ~[classes/:?]
	at io.zeebe.broker.subscription.message.processor.PublishMessageProcessor.processRecord(PublishMessageProcessor.java:82) ~[classes/:?]
	at io.zeebe.broker.logstreams.processor.TypedRecordProcessor.processRecord(TypedRecordProcessor.java:56) ~[classes/:?]
	at io.zeebe.broker.logstreams.processor.TypedStreamProcessor$DelegatingEventProcessor.processEvent(TypedStreamProcessor.java:177) ~[classes/:?]
	at io.zeebe.logstreams.processor.StreamProcessorController.processEvent(StreamProcessorController.java:297) ~[classes/:?]
	at io.zeebe.logstreams.processor.StreamProcessorController.readNextEvent(StreamProcessorController.java:280) ~[classes/:?]
	at io.zeebe.util.sched.ActorJob.invoke(ActorJob.java:90) [classes/:?]
	at io.zeebe.util.sched.ActorJob.execute(ActorJob.java:53) [classes/:?]
	at io.zeebe.util.sched.ActorTask.execute(ActorTask.java:187) [classes/:?]
	at io.zeebe.util.sched.ActorThread.executeCurrentTask(ActorThread.java:154) [classes/:?]
	at io.zeebe.util.sched.ActorThread.doWork(ActorThread.java:135) [classes/:?]
	at io.zeebe.util.sched.ActorThread.run(ActorThread.java:112) [classes/:?]

Reproducible via this test:

  @Test
  public void shouldCloseWorkflowInstanceSubscrisption() {
    // given
    testClient.deploy(WORKFLOW_WITH_TIMER_AND_MESSAGE);
    testClient.createWorkflowInstance(PROCESS_ID, asMsgPack("key", "foobar"));

    assertThat(
        RecordingExporter.workflowInstanceSubscriptionRecords(
                WorkflowInstanceSubscriptionIntent.OPENED)
            .limit(1)
            .exists());

    // when
    final Record<WorkflowInstanceRecordValue> gatewayEvent =
        RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.GATEWAY_ACTIVATED)
            .getFirst();

    testClient.publishMessage("msg", "foo");

    // then
    final Record<WorkflowInstanceSubscriptionRecordValue> triggeredEvent =
        RecordingExporter.workflowInstanceSubscriptionRecords(
                WorkflowInstanceSubscriptionIntent.CORRELATED)
            .getFirst();
    Assertions.assertThat(triggeredEvent.getValue())
        .hasElementInstanceKey(gatewayEvent.getKey())
        .hasMessageName("msg");

    assertThat(
            RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN)
                .withElementId("to-end2")
                .exists())
        .isTrue();

    assertThat(
        RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED)
            .withElementId(PROCESS_ID)
            .exists());
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment