Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3142 eventsourcing for group unregistration #3280

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Clock;

import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.Subscriber;
import org.apache.james.eventsourcing.eventstore.EventStore;
Expand Down Expand Up @@ -52,15 +53,15 @@ public void handle(Event event) {
RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event;

Flux.fromIterable(changeEvent.getRemovedGroups())
.concatMap(this::unregister)
.concatMap(group -> unregister(group, changeEvent.eventId()))
.then()
.block();
}
}

private Publisher<Void> unregister(Group group) {
private Publisher<Void> unregister(Group group, EventId generatedForEventId) {
return unregisterer.unregister(group)
.then(notifyUnbind(group));
.then(notifyUnbind(group, generatedForEventId));
}
}

Expand All @@ -80,8 +81,8 @@ public Mono<Void> start(ImmutableSet<Group> groups) {
return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand));
}

private Mono<Void> notifyUnbind(Group group) {
MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group);
private Mono<Void> notifyUnbind(Group group, EventId ganaratedForEventId) {
chibenwa marked this conversation as resolved.
Show resolved Hide resolved
MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, ganaratedForEventId);

return Mono.from(eventSourcingSystem.dispatch(command));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@
package org.apache.james.mailbox.events.eventsourcing;

import org.apache.james.eventsourcing.Command;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.mailbox.events.Group;

public class MarkUnbindAsSucceededCommand implements Command {
private final Group succeededGroup;
private final EventId generatedForEventId;

public MarkUnbindAsSucceededCommand(Group succeededGroup) {
public MarkUnbindAsSucceededCommand(Group succeededGroup, EventId generatedForEventId) {
this.succeededGroup = succeededGroup;
this.generatedForEventId = generatedForEventId;
}

public Group getSucceededGroup() {
return succeededGroup;
}

public EventId getGeneratedForEventId() {
return generatedForEventId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,22 @@ private State(ImmutableMap<Group, Status> groups) {
}

private State apply(RegisteredGroupListenerChangeEvent event) {
Preconditions.checkArgument(Sets.intersection(usedGroups(), event.getAddedGroups()).isEmpty(),
"Trying to add an already existing group");
Preconditions.checkArgument(Sets.difference(event.getRemovedGroups(), groups.keySet()).isEmpty(),
"Trying to remove a non existing group");
ImmutableMap<Group, Status> removedGroups = event.getRemovedGroups()
.stream()
.collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED));
ImmutableMap<Group, Status> addedGroups = event.getAddedGroups()
.stream()
.collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED));
ImmutableMap<Group, Status> unchangedGroups = notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups()));

return new State(ImmutableMap.<Group, Status>builder()
.putAll(event.getAddedGroups()
.stream()
.collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED)))
.putAll(notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups())))
.putAll(event.getRemovedGroups()
.stream()
.collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED)))
.putAll(addedGroups)
.putAll(unchangedGroups)
.putAll(removedGroups)
.build());
}

private State apply(UnbindSucceededEvent event) {
Preconditions.checkArgument(bindedGroups().contains(event.getGroup()), "unbing a non binded group," +
" or a used group");

return new State(ImmutableMap.<Group, Status>builder()
.putAll(notIn(ImmutableSet.of(event.getGroup())))
.build());
Expand Down Expand Up @@ -129,6 +125,9 @@ public List<RegisteredGroupListenerChangeEvent> handle(RequireGroupsCommand requ
}

public List<UnbindSucceededEvent> handle(MarkUnbindAsSucceededCommand command) {
Preconditions.checkArgument(state.bindedGroups().contains(command.getSucceededGroup()),
"unbing a non binded group, or a used group");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/unbing/unbind


UnbindSucceededEvent event = new UnbindSucceededEvent(history.getNextEventId(), command.getSucceededGroup());
chibenwa marked this conversation as resolved.
Show resolved Hide resolved
apply(event);
return ImmutableList.of(event);
Expand Down