From 90d9a956f0644df5bd85a18fd7613f333b30b1de Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Thu, 9 Apr 2020 15:11:42 +0700 Subject: [PATCH 1/8] JAMES-3142 eventsourcing for group unregistration --- mailbox/event/event-rabbitmq/pom.xml | 13 ++ .../GroupUnregistringManager.java | 44 ++++++ .../RegisteredGroupListenerChangeEvent.java | 85 ++++++++++++ .../RegisteredGroupsAggregate.java | 94 +++++++++++++ .../events/eventsourcing/StartCommand.java | 37 ++++++ .../eventsourcing/StartCommandHandler.java | 50 +++++++ .../UnregisterRemovedGroupsSubscriber.java | 52 ++++++++ .../GroupUnregistringManagerTest.java | 125 ++++++++++++++++++ 8 files changed, 500 insertions(+) create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java create mode 100644 mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml index 29fbacfd815..445413dd504 100644 --- a/mailbox/event/event-rabbitmq/pom.xml +++ b/mailbox/event/event-rabbitmq/pom.xml @@ -66,6 +66,19 @@ apache-james-mailbox-store test + + ${james.groupId} + event-sourcing-core + + + ${james.groupId} + event-sourcing-event-store-memory + test + + + ${james.groupId} + event-sourcing-event-store-api + ${james.groupId} james-server-lifecycle-api diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java new file mode 100644 index 00000000000..6e7f9fc8ce6 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -0,0 +1,44 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.EventSourcingSystem; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.mailbox.events.Group; + +import com.google.common.collect.ImmutableSet; + +import reactor.core.publisher.Mono; + +public class GroupUnregistringManager { + private final EventSourcingSystem eventSourcingSystem; + + public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer) { + this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new StartCommandHandler(eventStore)), + ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), + eventStore); + } + + public Mono start(ImmutableSet groups) { + StartCommand startCommand = new StartCommand(groups); + + return Mono.from(eventSourcingSystem.dispatch(startCommand)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java new file mode 100644 index 00000000000..e0cba51d316 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java @@ -0,0 +1,85 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID; + +import java.util.Objects; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.mailbox.events.Group; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class RegisteredGroupListenerChangeEvent implements Event { + + private final EventId eventId; + private final ImmutableSet addedGroups; + private final ImmutableSet removedGroups; + + public RegisteredGroupListenerChangeEvent(EventId eventId, ImmutableSet addedGroups, ImmutableSet removedGroups) { + Preconditions.checkArgument(Sets.intersection(addedGroups, removedGroups).isEmpty(), + "'addedGroups' and 'removedGroups' elements needs to be distinct"); + + this.eventId = eventId; + this.addedGroups = addedGroups; + this.removedGroups = removedGroups; + } + + + @Override + public EventId eventId() { + return eventId; + } + + @Override + public AggregateId getAggregateId() { + return AGGREGATE_ID; + } + + public ImmutableSet getAddedGroups() { + return addedGroups; + } + + public ImmutableSet getRemovedGroups() { + return removedGroups; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o; + + return Objects.equals(this.eventId, that.eventId) + && Objects.equals(this.addedGroups, that.addedGroups) + && Objects.equals(this.removedGroups, that.removedGroups); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(eventId, addedGroups, removedGroups); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java new file mode 100644 index 00000000000..f0120ad286d --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -0,0 +1,94 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.util.List; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.History; +import org.apache.james.mailbox.events.Group; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class RegisteredGroupsAggregate { + static AggregateId AGGREGATE_ID = () -> "RegisteredGroupListenerChangeEvent"; + + private static class State { + static State initial() { + return new State(ImmutableSet.of()); + } + + final ImmutableSet groups; + + private State(ImmutableSet groups) { + this.groups = groups; + } + + private State apply(RegisteredGroupListenerChangeEvent event) { + return new State(ImmutableSet.builder() + .addAll(Sets.difference(groups, event.getRemovedGroups())) + .addAll(event.getAddedGroups()) + .build()); + } + } + + public static RegisteredGroupsAggregate load(History history) { + return new RegisteredGroupsAggregate(history); + } + + private final History history; + private State state; + + private RegisteredGroupsAggregate(History history) { + this.history = history; + this.state = State.initial(); + + history.getEventsJava() + .forEach(this::apply); + } + + public List handleStart(StartCommand startCommand) { + List detectedChanges = detectChanges(startCommand); + + detectedChanges.forEach(this::apply); + + return detectedChanges; + } + + private List detectChanges(StartCommand startCommand) { + ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(startCommand.getRegisteredGroups(), state.groups)); + ImmutableSet removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, startCommand.getRegisteredGroups())); + + if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) { + return ImmutableList.of(new RegisteredGroupListenerChangeEvent(history.getNextEventId(), addedGroups, removedGroups)); + } + return ImmutableList.of(); + } + + private void apply(Event event) { + Preconditions.checkArgument(event instanceof RegisteredGroupListenerChangeEvent); + + state = state.apply((RegisteredGroupListenerChangeEvent) event); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java new file mode 100644 index 00000000000..1a5ebbf12c2 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java @@ -0,0 +1,37 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Command; +import org.apache.james.mailbox.events.Group; + +import com.google.common.collect.ImmutableSet; + +public class StartCommand implements Command { + private final ImmutableSet registeredGroups; + + public StartCommand(ImmutableSet registeredGroups) { + this.registeredGroups = registeredGroups; + } + + public ImmutableSet getRegisteredGroups() { + return registeredGroups; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java new file mode 100644 index 00000000000..df4d3514c56 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.util.List; + +import org.apache.james.eventsourcing.CommandHandler; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + +public class StartCommandHandler implements CommandHandler { + + private final EventStore eventStore; + + public StartCommandHandler(EventStore eventStore) { + this.eventStore = eventStore; + } + + @Override + public Class handledClass() { + return StartCommand.class; + } + + @Override + public Publisher> handle(StartCommand command) { + return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) + .map(RegisteredGroupsAggregate::load) + .map(aggregate -> aggregate.handleStart(command)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java new file mode 100644 index 00000000000..da9ec06cb12 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java @@ -0,0 +1,52 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.Subscriber; +import org.apache.james.mailbox.events.Group; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; + +public class UnregisterRemovedGroupsSubscriber implements Subscriber { + @FunctionalInterface + public interface Unregisterer { + Publisher unregister(Group group); + } + + private final Unregisterer unregisterer; + + public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) { + this.unregisterer = unregisterer; + } + + @Override + public void handle(Event event) { + if (event instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; + + Flux.fromIterable(changeEvent.getRemovedGroups()) + .concatMap(unregisterer::unregister) + .then() + .block(); + } + } +} diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java new file mode 100644 index 00000000000..5b8a2b655a8 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java @@ -0,0 +1,125 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; +import org.apache.james.mailbox.events.GenericGroup; +import org.apache.james.mailbox.events.Group; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import reactor.core.publisher.Mono; + +class GroupUnregistringManagerTest { + private static final GenericGroup GROUP_A = new GenericGroup("a"); + private static final GenericGroup GROUP_B = new GenericGroup("b"); + + static class TestUnregisterer implements UnregisterRemovedGroupsSubscriber.Unregisterer { + private ImmutableList.Builder unregisteredGroups = ImmutableList.builder(); + + @Override + public Publisher unregister(Group group) { + return Mono.fromRunnable(() -> unregisteredGroups.add(group)); + } + + ImmutableList unregisteredGroups() { + return unregisteredGroups.build(); + } + } + + GroupUnregistringManager testee; + TestUnregisterer unregisterer; + + @BeforeEach + void setUp() { + unregisterer = new TestUnregisterer(); + testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer); + } + + @Test + void startShouldNotUnregisterGroupsWhenNoHistory() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldNotUnregisterGroupsWhenNoChanges() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldNotUnregisterGroupsWhenAdditions() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldUnregisterGroupsWhenRemoval() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + + @Test + void startShouldUnregisterGroupsWhenSwap() { + testee.start(ImmutableSet.of(GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + + @Test + void startShouldBeAbleToUnregisterPreviouslyUnregisteredGroups() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B, GROUP_B); + } + + @Test + void startWithNoGroupsShouldUnregisterAllPreviousGroups() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of()).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_A, GROUP_B); + } +} \ No newline at end of file From 4358c31bb7b4360ccf1ba35a6923c7fa5172fa79 Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Fri, 10 Apr 2020 16:47:17 +0700 Subject: [PATCH 2/8] JAMES-3142 Add timestamp & hostname to the event for diagnostic purposes --- .../GroupUnregistringManager.java | 6 +- .../events/eventsourcing/Hostname.java | 67 +++++++++++++++++++ .../RegisteredGroupListenerChangeEvent.java | 20 +++++- .../RegisteredGroupsAggregate.java | 16 +++-- .../eventsourcing/StartCommandHandler.java | 7 +- .../GroupUnregistringManagerTest.java | 4 +- 6 files changed, 109 insertions(+), 11 deletions(-) create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java index 6e7f9fc8ce6..a21d6dcee0a 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.events.eventsourcing; +import java.time.Clock; + import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.mailbox.events.Group; @@ -30,8 +32,8 @@ public class GroupUnregistringManager { private final EventSourcingSystem eventSourcingSystem; - public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer) { - this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new StartCommandHandler(eventStore)), + public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer, Clock clock) { + this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new StartCommandHandler(eventStore, clock)), ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), eventStore); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java new file mode 100644 index 00000000000..332ef13efb5 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Preconditions; + +public class Hostname { + public static Hostname of(String value) { + Preconditions.checkNotNull(value, "'value' should not be null"); + Preconditions.checkArgument(StringUtils.isBlank(value), "'value' "); + + return new Hostname(value); + } + + public static Hostname localHost() { + try { + return new Hostname(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new UncheckedIOException(e); + } + } + + private final String value; + + private Hostname(String value) { + this.value = value; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof Hostname) { + Hostname hostname = (Hostname) o; + + return Objects.equals(this.value, hostname.value); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(value); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java index e0cba51d316..2ba3cb4ddc5 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java @@ -21,6 +21,7 @@ import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID; +import java.time.ZonedDateTime; import java.util.Objects; import org.apache.james.eventsourcing.AggregateId; @@ -33,12 +34,15 @@ import com.google.common.collect.Sets; public class RegisteredGroupListenerChangeEvent implements Event { - private final EventId eventId; + private final Hostname hostname; + private final ZonedDateTime zonedDateTime; private final ImmutableSet addedGroups; private final ImmutableSet removedGroups; - public RegisteredGroupListenerChangeEvent(EventId eventId, ImmutableSet addedGroups, ImmutableSet removedGroups) { + public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet addedGroups, ImmutableSet removedGroups) { + this.hostname = hostname; + this.zonedDateTime = zonedDateTime; Preconditions.checkArgument(Sets.intersection(addedGroups, removedGroups).isEmpty(), "'addedGroups' and 'removedGroups' elements needs to be distinct"); @@ -66,6 +70,18 @@ public ImmutableSet getRemovedGroups() { return removedGroups; } + public EventId getEventId() { + return eventId; + } + + public Hostname getHostname() { + return hostname; + } + + public ZonedDateTime getZonedDateTime() { + return zonedDateTime; + } + @Override public final boolean equals(Object o) { if (o instanceof RegisteredGroupListenerChangeEvent) { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java index f0120ad286d..9bdf4e3357e 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.events.eventsourcing; +import java.time.Clock; +import java.time.ZonedDateTime; import java.util.List; import org.apache.james.eventsourcing.AggregateId; @@ -68,20 +70,26 @@ private RegisteredGroupsAggregate(History history) { .forEach(this::apply); } - public List handleStart(StartCommand startCommand) { - List detectedChanges = detectChanges(startCommand); + public List handleStart(StartCommand startCommand, Clock clock) { + List detectedChanges = detectChanges(startCommand, clock); detectedChanges.forEach(this::apply); return detectedChanges; } - private List detectChanges(StartCommand startCommand) { + private List detectChanges(StartCommand startCommand, Clock clock) { ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(startCommand.getRegisteredGroups(), state.groups)); ImmutableSet removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, startCommand.getRegisteredGroups())); if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) { - return ImmutableList.of(new RegisteredGroupListenerChangeEvent(history.getNextEventId(), addedGroups, removedGroups)); + ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone()); + RegisteredGroupListenerChangeEvent event = new RegisteredGroupListenerChangeEvent(history.getNextEventId(), + Hostname.localHost(), + now, + addedGroups, + removedGroups); + return ImmutableList.of(event); } return ImmutableList.of(); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java index df4d3514c56..6b8d3a3a004 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.events.eventsourcing; +import java.time.Clock; import java.util.List; import org.apache.james.eventsourcing.CommandHandler; @@ -31,9 +32,11 @@ public class StartCommandHandler implements CommandHandler { private final EventStore eventStore; + private final Clock clock; - public StartCommandHandler(EventStore eventStore) { + public StartCommandHandler(EventStore eventStore, Clock clock) { this.eventStore = eventStore; + this.clock = clock; } @Override @@ -45,6 +48,6 @@ public Class handledClass() { public Publisher> handle(StartCommand command) { return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) .map(RegisteredGroupsAggregate::load) - .map(aggregate -> aggregate.handleStart(command)); + .map(aggregate -> aggregate.handleStart(command, clock)); } } diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java index 5b8a2b655a8..e6f4a94a437 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java @@ -21,6 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.Clock; + import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; import org.apache.james.mailbox.events.GenericGroup; import org.apache.james.mailbox.events.Group; @@ -56,7 +58,7 @@ ImmutableList unregisteredGroups() { @BeforeEach void setUp() { unregisterer = new TestUnregisterer(); - testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer); + testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); } @Test From eb3988bd60fd3d310b3c96a339dcc53b7abcd7d1 Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Fri, 10 Apr 2020 16:48:10 +0700 Subject: [PATCH 3/8] fixup! JAMES-3142 eventsourcing for group unregistration --- .../eventsourcing/RegisteredGroupListenerChangeEvent.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java index 2ba3cb4ddc5..653ad0eaee6 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java @@ -43,9 +43,6 @@ public class RegisteredGroupListenerChangeEvent implements Event { public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet addedGroups, ImmutableSet removedGroups) { this.hostname = hostname; this.zonedDateTime = zonedDateTime; - Preconditions.checkArgument(Sets.intersection(addedGroups, removedGroups).isEmpty(), - "'addedGroups' and 'removedGroups' elements needs to be distinct"); - this.eventId = eventId; this.addedGroups = addedGroups; this.removedGroups = removedGroups; From 1166120b654034e153095accc619af191501cceb Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Fri, 10 Apr 2020 16:54:08 +0700 Subject: [PATCH 4/8] JAMES-3142 s/StartCommand/RequireGroupsCommand/ --- .../eventsourcing/GroupUnregistringManager.java | 6 +++--- .../eventsourcing/RegisteredGroupsAggregate.java | 10 +++++----- .../{StartCommand.java => RequireGroupsCommand.java} | 4 ++-- ...Handler.java => RequireGroupsCommandHandler.java} | 12 ++++++------ 4 files changed, 16 insertions(+), 16 deletions(-) rename mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/{StartCommand.java => RequireGroupsCommand.java} (92%) rename mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/{StartCommandHandler.java => RequireGroupsCommandHandler.java} (81%) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java index a21d6dcee0a..850686acbbb 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -33,14 +33,14 @@ public class GroupUnregistringManager { private final EventSourcingSystem eventSourcingSystem; public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer, Clock clock) { - this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new StartCommandHandler(eventStore, clock)), + this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new RequireGroupsCommandHandler(eventStore, clock)), ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), eventStore); } public Mono start(ImmutableSet groups) { - StartCommand startCommand = new StartCommand(groups); + RequireGroupsCommand requireGroupsCommand = new RequireGroupsCommand(groups); - return Mono.from(eventSourcingSystem.dispatch(startCommand)); + return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); } } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java index 9bdf4e3357e..83a7527f9ca 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -70,17 +70,17 @@ private RegisteredGroupsAggregate(History history) { .forEach(this::apply); } - public List handleStart(StartCommand startCommand, Clock clock) { - List detectedChanges = detectChanges(startCommand, clock); + public List handle(RequireGroupsCommand requireGroupsCommand, Clock clock) { + List detectedChanges = detectChanges(requireGroupsCommand, clock); detectedChanges.forEach(this::apply); return detectedChanges; } - private List detectChanges(StartCommand startCommand, Clock clock) { - ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(startCommand.getRegisteredGroups(), state.groups)); - ImmutableSet removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, startCommand.getRegisteredGroups())); + private List detectChanges(RequireGroupsCommand requireGroupsCommand, Clock clock) { + ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(requireGroupsCommand.getRegisteredGroups(), state.groups)); + ImmutableSet removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, requireGroupsCommand.getRegisteredGroups())); if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) { ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone()); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java similarity index 92% rename from mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java rename to mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java index 1a5ebbf12c2..0ac01e271b1 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommand.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java @@ -24,10 +24,10 @@ import com.google.common.collect.ImmutableSet; -public class StartCommand implements Command { +public class RequireGroupsCommand implements Command { private final ImmutableSet registeredGroups; - public StartCommand(ImmutableSet registeredGroups) { + public RequireGroupsCommand(ImmutableSet registeredGroups) { this.registeredGroups = registeredGroups; } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java similarity index 81% rename from mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java rename to mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java index 6b8d3a3a004..e8818625c4a 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/StartCommandHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java @@ -29,25 +29,25 @@ import reactor.core.publisher.Mono; -public class StartCommandHandler implements CommandHandler { +public class RequireGroupsCommandHandler implements CommandHandler { private final EventStore eventStore; private final Clock clock; - public StartCommandHandler(EventStore eventStore, Clock clock) { + public RequireGroupsCommandHandler(EventStore eventStore, Clock clock) { this.eventStore = eventStore; this.clock = clock; } @Override - public Class handledClass() { - return StartCommand.class; + public Class handledClass() { + return RequireGroupsCommand.class; } @Override - public Publisher> handle(StartCommand command) { + public Publisher> handle(RequireGroupsCommand command) { return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) .map(RegisteredGroupsAggregate::load) - .map(aggregate -> aggregate.handleStart(command, clock)); + .map(aggregate -> aggregate.handle(command, clock)); } } From 1aabf53e70d5bfc261ecfa3f1be5269867c869cb Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Fri, 10 Apr 2020 18:10:05 +0700 Subject: [PATCH 5/8] JAMES-3142 Eventual consistence for group unbinding --- .../GroupUnregistringManager.java | 46 +++++++++- .../MarkUnbindAsSucceededCommand.java | 35 ++++++++ .../MarkUnbindAsSucceededCommandHandler.java | 49 ++++++++++ .../RegisteredGroupsAggregate.java | 89 ++++++++++++++++--- .../eventsourcing/UnbindSucceededEvent.java | 49 ++++++++++ .../UnregisterRemovedGroupsSubscriber.java | 19 +++- .../GroupUnregistringManagerTest.java | 47 +++++++++- 7 files changed, 313 insertions(+), 21 deletions(-) create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java create mode 100644 mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java index 850686acbbb..5348ad6e8ee 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -21,19 +21,55 @@ import java.time.Clock; +import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.EventSourcingSystem; +import org.apache.james.eventsourcing.Subscriber; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.mailbox.events.Group; +import org.reactivestreams.Publisher; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class GroupUnregistringManager { + @FunctionalInterface + public interface Unregisterer { + Mono unregister(Group group); + } + + private class UnregisterRemovedGroupsSubscriber implements Subscriber { + private final Unregisterer unregisterer; + + public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) { + this.unregisterer = unregisterer; + } + + @Override + public void handle(Event event) { + if (event instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; + + Flux.fromIterable(changeEvent.getRemovedGroups()) + .concatMap(this::unregister) + .then() + .block(); + } + } + + private Publisher unregister(Group group) { + return unregisterer.unregister(group) + .then(notifyUnbind(group)); + } + } + private final EventSourcingSystem eventSourcingSystem; - public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer, Clock clock) { - this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new RequireGroupsCommandHandler(eventStore, clock)), + public GroupUnregistringManager(EventStore eventStore, Unregisterer unregisterer, Clock clock) { + this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of( + new RequireGroupsCommandHandler(eventStore, clock), + new MarkUnbindAsSucceededCommandHandler(eventStore)), ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), eventStore); } @@ -43,4 +79,10 @@ public Mono start(ImmutableSet groups) { return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); } + + private Mono notifyUnbind(Group group) { + MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group); + + return Mono.from(eventSourcingSystem.dispatch(command)); + } } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java new file mode 100644 index 00000000000..2985ee636e3 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java @@ -0,0 +1,35 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Command; +import org.apache.james.mailbox.events.Group; + +public class MarkUnbindAsSucceededCommand implements Command { + private final Group succeededGroup; + + public MarkUnbindAsSucceededCommand(Group succeededGroup) { + this.succeededGroup = succeededGroup; + } + + public Group getSucceededGroup() { + return succeededGroup; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java new file mode 100644 index 00000000000..abb9a2f1b0b --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.util.List; + +import org.apache.james.eventsourcing.CommandHandler; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + +public class MarkUnbindAsSucceededCommandHandler implements CommandHandler { + private final EventStore eventStore; + + public MarkUnbindAsSucceededCommandHandler(EventStore eventStore) { + this.eventStore = eventStore; + } + + @Override + public Class handledClass() { + return MarkUnbindAsSucceededCommand.class; + } + + @Override + public Publisher> handle(MarkUnbindAsSucceededCommand command) { + return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) + .map(RegisteredGroupsAggregate::load) + .map(aggregate -> aggregate.handle(command)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java index 83a7527f9ca..54279493140 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -22,37 +22,89 @@ import java.time.Clock; import java.time.ZonedDateTime; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.james.eventsourcing.AggregateId; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.History; import org.apache.james.mailbox.events.Group; +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Functions; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; public class RegisteredGroupsAggregate { static AggregateId AGGREGATE_ID = () -> "RegisteredGroupListenerChangeEvent"; + enum Status { + USED, + UNUSED_BUT_BINDED + } + private static class State { static State initial() { - return new State(ImmutableSet.of()); + return new State(ImmutableMap.of()); } - final ImmutableSet groups; + final ImmutableMap groups; - private State(ImmutableSet groups) { + private State(ImmutableMap groups) { this.groups = groups; } private State apply(RegisteredGroupListenerChangeEvent event) { - return new State(ImmutableSet.builder() - .addAll(Sets.difference(groups, event.getRemovedGroups())) - .addAll(event.getAddedGroups()) + Preconditions.checkArgument(Sets.intersection(usedGroups(), event.getAddedGroups()).isEmpty(), + "Trying to add an already existing group"); + Preconditions.checkArgument(Sets.difference(event.getRemovedGroups(), groups.keySet()).isEmpty(), + "Trying to remove a non existing group"); + + return new State(ImmutableMap.builder() + .putAll(event.getAddedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED))) + .putAll(notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups()))) + .putAll(event.getRemovedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED))) .build()); } + + private State apply(UnbindSucceededEvent event) { + Preconditions.checkArgument(bindedGroups().contains(event.getGroup()), "unbing a non binded group," + + " or a used group"); + + return new State(ImmutableMap.builder() + .putAll(notIn(ImmutableSet.of(event.getGroup()))) + .build()); + } + + private ImmutableSet usedGroups() { + return groups.entrySet() + .stream() + .filter(group -> group.getValue() == Status.USED) + .map(Map.Entry::getKey) + .collect(Guavate.toImmutableSet()); + } + + private ImmutableSet bindedGroups() { + return groups.entrySet() + .stream() + .filter(group -> group.getValue() == Status.UNUSED_BUT_BINDED) + .map(Map.Entry::getKey) + .collect(Guavate.toImmutableSet()); + } + + private ImmutableMap notIn(Set toBeFilteredOut) { + return groups.entrySet() + .stream() + .filter(group -> !toBeFilteredOut.contains(group.getKey())) + .collect(Guavate.entriesToImmutableMap()); + } } public static RegisteredGroupsAggregate load(History history) { @@ -72,15 +124,22 @@ private RegisteredGroupsAggregate(History history) { public List handle(RequireGroupsCommand requireGroupsCommand, Clock clock) { List detectedChanges = detectChanges(requireGroupsCommand, clock); - detectedChanges.forEach(this::apply); - return detectedChanges; } + public List handle(MarkUnbindAsSucceededCommand command) { + UnbindSucceededEvent event = new UnbindSucceededEvent(history.getNextEventId(), command.getSucceededGroup()); + apply(event); + return ImmutableList.of(event); + } + private List detectChanges(RequireGroupsCommand requireGroupsCommand, Clock clock) { - ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(requireGroupsCommand.getRegisteredGroups(), state.groups)); - ImmutableSet removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, requireGroupsCommand.getRegisteredGroups())); + ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(requireGroupsCommand.getRegisteredGroups(), state.usedGroups())); + ImmutableSet removedGroups = ImmutableSet.builder() + .addAll(Sets.difference(state.usedGroups(), requireGroupsCommand.getRegisteredGroups())) + .addAll(state.bindedGroups()) + .build(); if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) { ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone()); @@ -95,8 +154,12 @@ private List detectChanges(RequireGroupsComm } private void apply(Event event) { - Preconditions.checkArgument(event instanceof RegisteredGroupListenerChangeEvent); - - state = state.apply((RegisteredGroupListenerChangeEvent) event); + if (event instanceof RegisteredGroupListenerChangeEvent) { + state = state.apply((RegisteredGroupListenerChangeEvent) event); + } else if (event instanceof UnbindSucceededEvent) { + state = state.apply((UnbindSucceededEvent) event); + } else { + throw new RuntimeException("Unsupported event class " + event.getClass()); + } } } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java new file mode 100644 index 00000000000..cf79f0a172b --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.mailbox.events.Group; + +public class UnbindSucceededEvent implements Event { + private final EventId eventId; + private final Group group; + + public UnbindSucceededEvent(EventId eventId, Group group) { + this.eventId = eventId; + this.group = group; + } + + @Override + public EventId eventId() { + return eventId; + } + + public Group getGroup() { + return group; + } + + @Override + public AggregateId getAggregateId() { + return RegisteredGroupsAggregate.AGGREGATE_ID; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java index da9ec06cb12..459003967b8 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java @@ -25,17 +25,25 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class UnregisterRemovedGroupsSubscriber implements Subscriber { @FunctionalInterface public interface Unregisterer { - Publisher unregister(Group group); + Mono unregister(Group group); + } + + @FunctionalInterface + public interface SuccessNotifier { + Mono notifySuccess(Group group); } private final Unregisterer unregisterer; + private final SuccessNotifier successNotifier; - public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) { + public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer, SuccessNotifier successNotifier) { this.unregisterer = unregisterer; + this.successNotifier = successNotifier; } @Override @@ -44,9 +52,14 @@ public void handle(Event event) { RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; Flux.fromIterable(changeEvent.getRemovedGroups()) - .concatMap(unregisterer::unregister) + .concatMap(this::unregister) .then() .block(); } } + + private Publisher unregister(Group group) { + return unregisterer.unregister(group) + .then(successNotifier.notifySuccess(group)); + } } diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java index e6f4a94a437..3c193d4712b 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java @@ -20,6 +20,7 @@ package org.apache.james.mailbox.events.eventsourcing; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import java.time.Clock; @@ -28,7 +29,6 @@ import org.apache.james.mailbox.events.Group; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -39,11 +39,29 @@ class GroupUnregistringManagerTest { private static final GenericGroup GROUP_A = new GenericGroup("a"); private static final GenericGroup GROUP_B = new GenericGroup("b"); - static class TestUnregisterer implements UnregisterRemovedGroupsSubscriber.Unregisterer { + static class TestUnregisterer implements GroupUnregistringManager.Unregisterer { private ImmutableList.Builder unregisteredGroups = ImmutableList.builder(); @Override - public Publisher unregister(Group group) { + public Mono unregister(Group group) { + return Mono.fromRunnable(() -> unregisteredGroups.add(group)); + } + + ImmutableList unregisteredGroups() { + return unregisteredGroups.build(); + } + } + + static class FailOnceUnregisterer implements GroupUnregistringManager.Unregisterer { + private boolean shouldFail = true; + private ImmutableList.Builder unregisteredGroups = ImmutableList.builder(); + + @Override + public Mono unregister(Group group) { + if (shouldFail) { + shouldFail = false; + return Mono.error(new RuntimeException()); + } return Mono.fromRunnable(() -> unregisteredGroups.add(group)); } @@ -96,6 +114,29 @@ void startShouldUnregisterGroupsWhenRemoval() { .containsExactly(GROUP_B); } + @Test + void startShouldNotFailWhenUnbindDidFail() { + GroupUnregistringManager.Unregisterer unregisterer = new FailOnceUnregisterer(); + GroupUnregistringManager testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThatCode(() -> testee.start(ImmutableSet.of(GROUP_A)).block()); + } + + @Test + void failedUnregisterShouldBeRetried() { + FailOnceUnregisterer unregisterer = new FailOnceUnregisterer(); + GroupUnregistringManager testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + @Test void startShouldUnregisterGroupsWhenSwap() { testee.start(ImmutableSet.of(GROUP_B)).block(); From 9ad44cdbf87279be0dbe11bbd5df518b04461776 Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Fri, 10 Apr 2020 18:39:15 +0700 Subject: [PATCH 6/8] fixup! JAMES-3142 Eventual consistence for group unbinding --- .../eventsourcing/RegisteredGroupListenerChangeEvent.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java index 653ad0eaee6..cac9fe11b83 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java @@ -29,9 +29,7 @@ import org.apache.james.eventsourcing.EventId; import org.apache.james.mailbox.events.Group; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; public class RegisteredGroupListenerChangeEvent implements Event { private final EventId eventId; From cbc47c12147edf900cf00b3a583135333d8001c9 Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Tue, 14 Apr 2020 18:00:37 +0700 Subject: [PATCH 7/8] fixup! fixup! JAMES-3142 Eventual consistence for group unbinding --- .../GroupUnregistringManager.java | 11 ++++---- .../MarkUnbindAsSucceededCommand.java | 9 ++++++- .../RegisteredGroupsAggregate.java | 27 +++++++++---------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java index 5348ad6e8ee..83ba04f53c2 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -22,6 +22,7 @@ import java.time.Clock; import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.Subscriber; import org.apache.james.eventsourcing.eventstore.EventStore; @@ -52,15 +53,15 @@ public void handle(Event event) { RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; Flux.fromIterable(changeEvent.getRemovedGroups()) - .concatMap(this::unregister) + .concatMap(group -> unregister(group, changeEvent.eventId())) .then() .block(); } } - private Publisher unregister(Group group) { + private Publisher unregister(Group group, EventId generatedForEventId) { return unregisterer.unregister(group) - .then(notifyUnbind(group)); + .then(notifyUnbind(group, generatedForEventId)); } } @@ -80,8 +81,8 @@ public Mono start(ImmutableSet groups) { return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); } - private Mono notifyUnbind(Group group) { - MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group); + private Mono notifyUnbind(Group group, EventId ganaratedForEventId) { + MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, ganaratedForEventId); return Mono.from(eventSourcingSystem.dispatch(command)); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java index 2985ee636e3..4378c9e0051 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java @@ -20,16 +20,23 @@ package org.apache.james.mailbox.events.eventsourcing; import org.apache.james.eventsourcing.Command; +import org.apache.james.eventsourcing.EventId; import org.apache.james.mailbox.events.Group; public class MarkUnbindAsSucceededCommand implements Command { private final Group succeededGroup; + private final EventId generatedForEventId; - public MarkUnbindAsSucceededCommand(Group succeededGroup) { + public MarkUnbindAsSucceededCommand(Group succeededGroup, EventId generatedForEventId) { this.succeededGroup = succeededGroup; + this.generatedForEventId = generatedForEventId; } public Group getSucceededGroup() { return succeededGroup; } + + public EventId getGeneratedForEventId() { + return generatedForEventId; + } } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java index 54279493140..bf9555ef19a 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -58,26 +58,22 @@ private State(ImmutableMap groups) { } private State apply(RegisteredGroupListenerChangeEvent event) { - Preconditions.checkArgument(Sets.intersection(usedGroups(), event.getAddedGroups()).isEmpty(), - "Trying to add an already existing group"); - Preconditions.checkArgument(Sets.difference(event.getRemovedGroups(), groups.keySet()).isEmpty(), - "Trying to remove a non existing group"); + ImmutableMap removedGroups = event.getRemovedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED)); + ImmutableMap addedGroups = event.getAddedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED)); + ImmutableMap unchangedGroups = notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups())); return new State(ImmutableMap.builder() - .putAll(event.getAddedGroups() - .stream() - .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED))) - .putAll(notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups()))) - .putAll(event.getRemovedGroups() - .stream() - .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED))) + .putAll(addedGroups) + .putAll(unchangedGroups) + .putAll(removedGroups) .build()); } private State apply(UnbindSucceededEvent event) { - Preconditions.checkArgument(bindedGroups().contains(event.getGroup()), "unbing a non binded group," + - " or a used group"); - return new State(ImmutableMap.builder() .putAll(notIn(ImmutableSet.of(event.getGroup()))) .build()); @@ -129,6 +125,9 @@ public List handle(RequireGroupsCommand requ } public List handle(MarkUnbindAsSucceededCommand command) { + Preconditions.checkArgument(state.bindedGroups().contains(command.getSucceededGroup()), + "unbing a non binded group, or a used group"); + UnbindSucceededEvent event = new UnbindSucceededEvent(history.getNextEventId(), command.getSucceededGroup()); apply(event); return ImmutableList.of(event); From 7837e686a52b9617facb228b7e9b683b8f006e9f Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Wed, 15 Apr 2020 10:22:42 +0700 Subject: [PATCH 8/8] fixup! fixup! JAMES-3142 Eventual consistence for group unbinding --- .../events/eventsourcing/GroupUnregistringManager.java | 4 ++-- .../events/eventsourcing/RegisteredGroupsAggregate.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java index 83ba04f53c2..c8501ee58e3 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -81,8 +81,8 @@ public Mono start(ImmutableSet groups) { return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); } - private Mono notifyUnbind(Group group, EventId ganaratedForEventId) { - MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, ganaratedForEventId); + private Mono notifyUnbind(Group group, EventId generatedForEventId) { + MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, generatedForEventId); return Mono.from(eventSourcingSystem.dispatch(command)); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java index bf9555ef19a..60b5d5825e8 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -126,7 +126,7 @@ public List handle(RequireGroupsCommand requ public List handle(MarkUnbindAsSucceededCommand command) { Preconditions.checkArgument(state.bindedGroups().contains(command.getSucceededGroup()), - "unbing a non binded group, or a used group"); + "unbinding a non binded group, or a used group"); UnbindSucceededEvent event = new UnbindSucceededEvent(history.getNextEventId(), command.getSucceededGroup()); apply(event);