Skip to content

Commit

Permalink
JAMES-3142 Avoid poluting aggregate history upon noop
Browse files Browse the repository at this point in the history
By moving the registered group detection to the emitter, and maintaining
the diff between registered and required groups throughout the aggregate
we are able to achieve retries of failed unbind without emmitting
events on the happy scenario (required groups = registered group & no
change), limiting history growth.
  • Loading branch information
chibenwa committed Apr 16, 2020
1 parent 4241513 commit c095e21
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,25 @@
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsSubscriber.RegisteredGroupsProvider;
import org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsSubscriber.Unregisterer;
import org.reactivestreams.Publisher;

import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GroupUnregistringManager {
@FunctionalInterface
public interface RegisteredGroupsProvider {
Publisher<Group> registeredGroups();
}

private final EventSourcingSystem eventSourcingSystem;
private final EventStore eventStore;
private final RegisteredGroupsProvider registeredGroupsProvider;

public GroupUnregistringManager(EventStore eventStore,
Unregisterer unregisterer,
Expand All @@ -45,15 +53,17 @@ public GroupUnregistringManager(EventStore eventStore,
Clock clock) {

this.eventStore = eventStore;
this.registeredGroupsProvider = registeredGroupsProvider;
this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new RequireGroupsCommandHandler(eventStore, clock)),
ImmutableSet.of(new RegisteredGroupsSubscriber(unregisterer, registrer, registeredGroupsProvider)),
ImmutableSet.of(new RegisteredGroupsSubscriber(unregisterer, registrer)),
this.eventStore);
}

public Mono<Void> start(ImmutableSet<Group> groups) {
RequireGroupsCommand requireGroupsCommand = new RequireGroupsCommand(groups);

return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand));
public Mono<Void> start(ImmutableSet<Group> requiredGroups) {
return Flux.from(registeredGroupsProvider.registeredGroups())
.collect(Guavate.toImmutableSet())
.map(registeredGroups -> new RequireGroupsCommand(requiredGroups, registeredGroups))
.flatMap(command -> Mono.from(eventSourcingSystem.dispatch(command)));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ public class RegisteredGroupListenerChangeEvent implements Event {
private final EventId eventId;
private final Hostname hostname;
private final ZonedDateTime zonedDateTime;
private final ImmutableSet<Group> requiredGroups;
private final ImmutableSet<Group> registeredGroups;

public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet<Group> registeredGroups) {
public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet<Group> requiredGroups, ImmutableSet<Group> registeredGroups) {
this.hostname = hostname;
this.zonedDateTime = zonedDateTime;
this.eventId = eventId;
this.requiredGroups = requiredGroups;
this.registeredGroups = registeredGroups;
}


@Override
public EventId eventId() {
return eventId;
Expand All @@ -59,6 +60,10 @@ public ImmutableSet<Group> getRegisteredGroups() {
return registeredGroups;
}

public ImmutableSet<Group> getRequiredGroups() {
return requiredGroups;
}

public EventId getEventId() {
return eventId;
}
Expand All @@ -79,13 +84,14 @@ public final boolean equals(Object o) {
return Objects.equals(this.eventId, that.eventId)
&& Objects.equals(this.zonedDateTime, that.zonedDateTime)
&& Objects.equals(this.hostname, that.hostname)
&& Objects.equals(this.registeredGroups, that.registeredGroups);
&& Objects.equals(this.registeredGroups, that.registeredGroups)
&& Objects.equals(this.requiredGroups, that.requiredGroups);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(eventId, hostname, zonedDateTime, registeredGroups);
return Objects.hash(eventId, hostname, zonedDateTime, registeredGroups, requiredGroups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@ public class RegisteredGroupsAggregate {

private static class State {
static State initial() {
return new State(ImmutableSet.of());
return new State(ImmutableSet.of(), ImmutableSet.of());
}

final ImmutableSet<Group> groups;
final ImmutableSet<Group> registeredGroups;
final ImmutableSet<Group> requiredGroups;

private State(ImmutableSet<Group> groups) {
this.groups = groups;
public State(ImmutableSet<Group> registeredGroups, ImmutableSet<Group> requiredGroups) {
this.registeredGroups = registeredGroups;
this.requiredGroups = requiredGroups;
}

private State apply(RegisteredGroupListenerChangeEvent event) {
return new State(event.getRegisteredGroups());
return new State(event.getRegisteredGroups(),
event.getRequiredGroups());
}
}

Expand Down Expand Up @@ -75,15 +78,27 @@ public List<RegisteredGroupListenerChangeEvent> handle(RequireGroupsCommand requ
}

public ImmutableSet<Group> requiredGroups() {
return state.groups;
return state.requiredGroups;
}

private List<RegisteredGroupListenerChangeEvent> detectChanges(RequireGroupsCommand requireGroupsCommand, Clock clock) {
private List<RegisteredGroupListenerChangeEvent> detectChanges(RequireGroupsCommand command, Clock clock) {
boolean historyChange = !state.requiredGroups.equals(command.getRequiredGroups()) ||
!state.registeredGroups.equals(command.getRegisteredGroups());
boolean registeredChangeNeeded = !command.getRegisteredGroups().equals(command.getRequiredGroups());

if (historyChange || registeredChangeNeeded) {
return emitEvent(command, clock);
}
return ImmutableList.of();
}

private List<RegisteredGroupListenerChangeEvent> emitEvent(RequireGroupsCommand command, Clock clock) {
ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone());
RegisteredGroupListenerChangeEvent event = new RegisteredGroupListenerChangeEvent(history.getNextEventId(),
Hostname.localHost(),
now,
requireGroupsCommand.getRegisteredGroups());
command.getRequiredGroups(),
command.getRegisteredGroups());
return ImmutableList.of(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.james.mailbox.events.Group;
import org.reactivestreams.Publisher;

import com.google.common.collect.Sets;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -40,48 +42,37 @@ public interface Registrer {
Publisher<Void> register(Group group);
}

@FunctionalInterface
public interface RegisteredGroupsProvider {
Publisher<Group> registeredGroups();
}

private final Unregisterer unregisterer;
private final Registrer registrer;
private final RegisteredGroupsProvider registeredGroupsProvider;

RegisteredGroupsSubscriber(Unregisterer unregisterer, Registrer registrer, RegisteredGroupsProvider registeredGroupsProvider) {
RegisteredGroupsSubscriber(Unregisterer unregisterer, Registrer registrer) {
this.unregisterer = unregisterer;
this.registrer = registrer;
this.registeredGroupsProvider = registeredGroupsProvider;
}

@Override
public void handle(Event event) {
if (event instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event;

Mono<List<Group>> registeredGroupsMono = Flux.from(registeredGroupsProvider.registeredGroups()).collectList();
Mono<List<Group>> registeredGroupsMono = Flux.fromIterable(((RegisteredGroupListenerChangeEvent) event).getRegisteredGroups()).collectList();

registeredGroupsMono.flatMapMany(registeredGroups -> Flux.concat(
unbindUnrequiredRegisteredGroups(changeEvent, registeredGroups),
bindRequiredUnregisteredGroups(changeEvent, registeredGroups)))
unbindUnrequiredRegisteredGroups(changeEvent),
bindRequiredUnregisteredGroups(changeEvent)))
.then()
.block();
}
}

private Mono<Void> unbindUnrequiredRegisteredGroups(RegisteredGroupListenerChangeEvent changeEvent,
List<Group> registeredGroups) {
return Flux.fromIterable(registeredGroups)
.filter(registeredGroup -> !changeEvent.getRegisteredGroups().contains(registeredGroup))
private Mono<Void> unbindUnrequiredRegisteredGroups(RegisteredGroupListenerChangeEvent changeEvent) {
return Flux.fromIterable(Sets.difference(changeEvent.getRegisteredGroups(), changeEvent.getRequiredGroups()))
.concatMap(unregisterer::unregister)
.then();
}

private Mono<Void> bindRequiredUnregisteredGroups(RegisteredGroupListenerChangeEvent changeEvent,
List<Group> registeredGroups) {
return Flux.fromIterable(changeEvent.getRegisteredGroups())
.filter(requiredGroup -> !registeredGroups.contains(requiredGroup))
private Mono<Void> bindRequiredUnregisteredGroups(RegisteredGroupListenerChangeEvent changeEvent) {
return Flux.fromIterable(Sets.difference(changeEvent.getRequiredGroups(), changeEvent.getRegisteredGroups()))
.concatMap(registrer::register)
.then();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
import com.google.common.collect.ImmutableSet;

public class RequireGroupsCommand implements Command {
private final ImmutableSet<Group> requiredGroups;
private final ImmutableSet<Group> registeredGroups;

public RequireGroupsCommand(ImmutableSet<Group> registeredGroups) {
public RequireGroupsCommand(ImmutableSet<Group> requiredGroups, ImmutableSet<Group> registeredGroups) {
this.requiredGroups = requiredGroups;
this.registeredGroups = registeredGroups;
}

public ImmutableSet<Group> getRegisteredGroups() {
return registeredGroups;
}

public ImmutableSet<Group> getRequiredGroups() {
return requiredGroups;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.james.mailbox.events.eventsourcing;

import static org.apache.james.mailbox.events.eventsourcing.GroupUnregistringManager.RegisteredGroupsProvider;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
Expand Down Expand Up @@ -69,7 +70,7 @@ ImmutableList<Group> registeredGroups() {
}
}

static class TestRegisteredGroupsProvider implements RegisteredGroupsSubscriber.RegisteredGroupsProvider {
static class TestRegisteredGroupsProvider implements RegisteredGroupsProvider {
private List<Group> groups = ImmutableList.of();

void setRegisteredGroups(ImmutableList<Group> registeredGroups) {
Expand Down

0 comments on commit c095e21

Please sign in to comment.