-
Notifications
You must be signed in to change notification settings - Fork 22
Description
🐛 Current behavior
Using the java client, once subscribed to the $all stream and having set a stream prefix in filter, the listener's onEvent handler is invoked for an event which belongs to a stream whose name does not start with the given prefix.
🔍 Steps to reproduce
The following test fails after 10 seconds awaiting for the last await instruction:
@Test
fun `should filter subscription to $all by stream name prefix`() {
val onEventInvoked = AtomicInteger(0)
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info { "onEvent: ${event.originalEvent}" }
onEventInvoked.incrementAndGet()
}
}
val streamNamePrefix = "my-prefixed-stream"
val options = SubscribeToAllOptions.get()
.filter(
SubscriptionFilter.newBuilder()
.addStreamNamePrefix(streamNamePrefix)
.build()
)
kurrentDBClient.subscribeToAll(listener, options).join()
val eventDataList = arrayListOf<EventData>()
val expected = 5
for (i in 1..expected)
eventDataList.add(EventData.builderAsJson("eventType", "{\"value\":$i}".toByteArray()).build())
kurrentDBClient.appendToStream(
"$streamNamePrefix-1",
eventDataList.iterator()
).join()
await until { expected == onEventInvoked.get() }
// Add an event from a stream that does not start with the prefix set in the subscription filter
kurrentDBClient.appendToStream(
"not-$streamNamePrefix",
EventData.builderAsJson("type", "{}".toByteArray()).build()
).join()
await until { expected == onEventInvoked.get() }
}Logs produced by the listener's onEvent callback:
2026-02-23T22:38:24.089-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='my-prefixed-stream-1', revision=0, eventId=2717c172-a982-4c02-9de2-de267f2d4946, eventType='eventType', eventData=[123, 34, 118, 97, 108, 117, 101, 34, 58, 49, 125], userMetadata=[], created=2026-02-24T03:38:24.011195700Z, position=1475/1475, contentType='application/json'}
2026-02-23T22:38:24.089-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='my-prefixed-stream-1', revision=1, eventId=03b1642f-401f-44ce-a9de-a43e1898950e, eventType='eventType', eventData=[123, 34, 118, 97, 108, 117, 101, 34, 58, 50, 125], userMetadata=[], created=2026-02-24T03:38:24.011246200Z, position=1605/1605, contentType='application/json'}
2026-02-23T22:38:24.089-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='my-prefixed-stream-1', revision=2, eventId=44a74231-f619-4bfd-a906-7cefcb4cf9e6, eventType='eventType', eventData=[123, 34, 118, 97, 108, 117, 101, 34, 58, 51, 125], userMetadata=[], created=2026-02-24T03:38:24.011247300Z, position=1735/1735, contentType='application/json'}
2026-02-23T22:38:24.089-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='my-prefixed-stream-1', revision=3, eventId=56505947-516c-4592-ad9a-9bd601118c41, eventType='eventType', eventData=[123, 34, 118, 97, 108, 117, 101, 34, 58, 52, 125], userMetadata=[], created=2026-02-24T03:38:24.011247800Z, position=1865/1865, contentType='application/json'}
2026-02-23T22:38:24.089-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='my-prefixed-stream-1', revision=4, eventId=6f8db146-22c8-4fb7-ac31-0516298d11bd, eventType='eventType', eventData=[123, 34, 118, 97, 108, 117, 101, 34, 58, 53, 125], userMetadata=[], created=2026-02-24T03:38:24.011248300Z, position=1995/1995, contentType='application/json'}
2026-02-23T22:38:24.163-0500 [grpc-default-executor-1] INFO c.m.e.event.core.KurrentDBClientTest [] - onEvent: RecordedEvent{streamId='not-my-prefixed-stream', revision=0, eventId=00814559-bd5b-4101-8c8d-19866dea7018, eventType='type', eventData=[123, 125], userMetadata=[], created=2026-02-24T03:38:24.155231200Z, position=2125/2125, contentType='application/json'}Reproducible link
github.com
💭 Expected behavior
When setting a stream name prefix in subscription filter, the listener's onEvent callback should receive an event only when the corresponding stream has a name starting with that prefix.
It looks like the filter currently accepts the event if stream name contains the prefix.
Package version
io.kurrent:kurrentdb-client:1.1.1
KurrentDB Version
docker.kurrent.io/kurrent-lts/kurrentdb:26.0
Connection string
kurrentdb://localhost:2113?tls=false
☁️ Deployment Environment
Single-node (Docker)
Other Deployment Details
No response
Operating system
macOS 15.7.4
Code of Conduct
- I agree to follow this project's Code of Conduct