-
-
Notifications
You must be signed in to change notification settings - Fork 39
/
ESDBCommandBus.java
62 lines (52 loc) · 2.12 KB
/
ESDBCommandBus.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package io.eventdriven.distributedprocesses.core.commands;
import com.eventstore.dbclient.EventStoreDBClient;
import io.eventdriven.distributedprocesses.core.esdb.EventStore;
import io.eventdriven.distributedprocesses.core.retries.RetryPolicy;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static io.eventdriven.distributedprocesses.core.esdb.subscriptions.ESDBSubscription.subscribeToStream;
import static io.eventdriven.distributedprocesses.core.serialization.EventSerializer.deserializeCommand;
public class ESDBCommandBus implements CommandBus {
private static final String commandStreamId = "_commands-all";
private final EventStoreDBClient eventStoreDBClient;
private final EventStore eventStore;
private final RetryPolicy retryPolicy;
private final Supplier<String> currentCorrelationId;
private final Supplier<String> currentCausationId;
public ESDBCommandBus(
EventStoreDBClient eventStoreDBClient,
EventStore eventStore,
RetryPolicy retryPolicy,
Supplier<String> currentCorrelationId,
Supplier<String> currentCausationId
) {
this.eventStoreDBClient = eventStoreDBClient;
this.eventStore = eventStore;
this.retryPolicy = retryPolicy;
this.currentCorrelationId = currentCorrelationId;
this.currentCausationId = currentCausationId;
}
@Override
public <Command> EventStore.AppendResult schedule(Command command) {
return retryPolicy.run(ack -> {
var result = eventStore.append(
commandStreamId,
new CommandEnvelope<>(command, new CommandMetadata(currentCorrelationId.get(), currentCausationId.get()))
);
if (!(result instanceof EventStore.AppendResult.UnexpectedFailure))
ack.accept(result);
});
}
@Override
public void subscribe(Consumer<CommandEnvelope<Object>>... handlers) {
subscribeToStream(eventStoreDBClient, commandStreamId, (subscription, resolvedEvent) -> {
var commandEnvelope = deserializeCommand(resolvedEvent);
if (commandEnvelope.isEmpty()) {
return;
}
for (var handler : handlers) {
handler.accept(commandEnvelope.get());
}
});
}
}