diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml index 29fbacfd815..445413dd504 100644 --- a/mailbox/event/event-rabbitmq/pom.xml +++ b/mailbox/event/event-rabbitmq/pom.xml @@ -66,6 +66,19 @@ apache-james-mailbox-store test + + ${james.groupId} + event-sourcing-core + + + ${james.groupId} + event-sourcing-event-store-memory + test + + + ${james.groupId} + event-sourcing-event-store-api + ${james.groupId} james-server-lifecycle-api diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java new file mode 100644 index 00000000000..c8501ee58e3 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManager.java @@ -0,0 +1,89 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.time.Clock; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.EventSourcingSystem; +import org.apache.james.eventsourcing.Subscriber; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.mailbox.events.Group; +import org.reactivestreams.Publisher; + +import com.google.common.collect.ImmutableSet; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class GroupUnregistringManager { + @FunctionalInterface + public interface Unregisterer { + Mono unregister(Group group); + } + + private class UnregisterRemovedGroupsSubscriber implements Subscriber { + private final Unregisterer unregisterer; + + public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) { + this.unregisterer = unregisterer; + } + + @Override + public void handle(Event event) { + if (event instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; + + Flux.fromIterable(changeEvent.getRemovedGroups()) + .concatMap(group -> unregister(group, changeEvent.eventId())) + .then() + .block(); + } + } + + private Publisher unregister(Group group, EventId generatedForEventId) { + return unregisterer.unregister(group) + .then(notifyUnbind(group, generatedForEventId)); + } + } + + private final EventSourcingSystem eventSourcingSystem; + + public GroupUnregistringManager(EventStore eventStore, Unregisterer unregisterer, Clock clock) { + this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of( + new RequireGroupsCommandHandler(eventStore, clock), + new MarkUnbindAsSucceededCommandHandler(eventStore)), + ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), + eventStore); + } + + public Mono start(ImmutableSet groups) { + RequireGroupsCommand requireGroupsCommand = new RequireGroupsCommand(groups); + + return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); + } + + private Mono notifyUnbind(Group group, EventId generatedForEventId) { + MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, generatedForEventId); + + return Mono.from(eventSourcingSystem.dispatch(command)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java new file mode 100644 index 00000000000..332ef13efb5 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/Hostname.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Preconditions; + +public class Hostname { + public static Hostname of(String value) { + Preconditions.checkNotNull(value, "'value' should not be null"); + Preconditions.checkArgument(StringUtils.isBlank(value), "'value' "); + + return new Hostname(value); + } + + public static Hostname localHost() { + try { + return new Hostname(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new UncheckedIOException(e); + } + } + + private final String value; + + private Hostname(String value) { + this.value = value; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof Hostname) { + Hostname hostname = (Hostname) o; + + return Objects.equals(this.value, hostname.value); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(value); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java new file mode 100644 index 00000000000..4378c9e0051 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommand.java @@ -0,0 +1,42 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Command; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.mailbox.events.Group; + +public class MarkUnbindAsSucceededCommand implements Command { + private final Group succeededGroup; + private final EventId generatedForEventId; + + public MarkUnbindAsSucceededCommand(Group succeededGroup, EventId generatedForEventId) { + this.succeededGroup = succeededGroup; + this.generatedForEventId = generatedForEventId; + } + + public Group getSucceededGroup() { + return succeededGroup; + } + + public EventId getGeneratedForEventId() { + return generatedForEventId; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java new file mode 100644 index 00000000000..abb9a2f1b0b --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/MarkUnbindAsSucceededCommandHandler.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.util.List; + +import org.apache.james.eventsourcing.CommandHandler; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + +public class MarkUnbindAsSucceededCommandHandler implements CommandHandler { + private final EventStore eventStore; + + public MarkUnbindAsSucceededCommandHandler(EventStore eventStore) { + this.eventStore = eventStore; + } + + @Override + public Class handledClass() { + return MarkUnbindAsSucceededCommand.class; + } + + @Override + public Publisher> handle(MarkUnbindAsSucceededCommand command) { + return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) + .map(RegisteredGroupsAggregate::load) + .map(aggregate -> aggregate.handle(command)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java new file mode 100644 index 00000000000..cac9fe11b83 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupListenerChangeEvent.java @@ -0,0 +1,96 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID; + +import java.time.ZonedDateTime; +import java.util.Objects; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.mailbox.events.Group; + +import com.google.common.collect.ImmutableSet; + +public class RegisteredGroupListenerChangeEvent implements Event { + private final EventId eventId; + private final Hostname hostname; + private final ZonedDateTime zonedDateTime; + private final ImmutableSet addedGroups; + private final ImmutableSet removedGroups; + + public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet addedGroups, ImmutableSet removedGroups) { + this.hostname = hostname; + this.zonedDateTime = zonedDateTime; + this.eventId = eventId; + this.addedGroups = addedGroups; + this.removedGroups = removedGroups; + } + + + @Override + public EventId eventId() { + return eventId; + } + + @Override + public AggregateId getAggregateId() { + return AGGREGATE_ID; + } + + public ImmutableSet getAddedGroups() { + return addedGroups; + } + + public ImmutableSet getRemovedGroups() { + return removedGroups; + } + + public EventId getEventId() { + return eventId; + } + + public Hostname getHostname() { + return hostname; + } + + public ZonedDateTime getZonedDateTime() { + return zonedDateTime; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o; + + return Objects.equals(this.eventId, that.eventId) + && Objects.equals(this.addedGroups, that.addedGroups) + && Objects.equals(this.removedGroups, that.removedGroups); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(eventId, addedGroups, removedGroups); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java new file mode 100644 index 00000000000..60b5d5825e8 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RegisteredGroupsAggregate.java @@ -0,0 +1,164 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.time.Clock; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.History; +import org.apache.james.mailbox.events.Group; + +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class RegisteredGroupsAggregate { + static AggregateId AGGREGATE_ID = () -> "RegisteredGroupListenerChangeEvent"; + + enum Status { + USED, + UNUSED_BUT_BINDED + } + + private static class State { + static State initial() { + return new State(ImmutableMap.of()); + } + + final ImmutableMap groups; + + private State(ImmutableMap groups) { + this.groups = groups; + } + + private State apply(RegisteredGroupListenerChangeEvent event) { + ImmutableMap removedGroups = event.getRemovedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.UNUSED_BUT_BINDED)); + ImmutableMap addedGroups = event.getAddedGroups() + .stream() + .collect(Guavate.toImmutableMap(Functions.identity(), any -> Status.USED)); + ImmutableMap unchangedGroups = notIn(Sets.union(event.getAddedGroups(), event.getRemovedGroups())); + + return new State(ImmutableMap.builder() + .putAll(addedGroups) + .putAll(unchangedGroups) + .putAll(removedGroups) + .build()); + } + + private State apply(UnbindSucceededEvent event) { + return new State(ImmutableMap.builder() + .putAll(notIn(ImmutableSet.of(event.getGroup()))) + .build()); + } + + private ImmutableSet usedGroups() { + return groups.entrySet() + .stream() + .filter(group -> group.getValue() == Status.USED) + .map(Map.Entry::getKey) + .collect(Guavate.toImmutableSet()); + } + + private ImmutableSet bindedGroups() { + return groups.entrySet() + .stream() + .filter(group -> group.getValue() == Status.UNUSED_BUT_BINDED) + .map(Map.Entry::getKey) + .collect(Guavate.toImmutableSet()); + } + + private ImmutableMap notIn(Set toBeFilteredOut) { + return groups.entrySet() + .stream() + .filter(group -> !toBeFilteredOut.contains(group.getKey())) + .collect(Guavate.entriesToImmutableMap()); + } + } + + public static RegisteredGroupsAggregate load(History history) { + return new RegisteredGroupsAggregate(history); + } + + private final History history; + private State state; + + private RegisteredGroupsAggregate(History history) { + this.history = history; + this.state = State.initial(); + + history.getEventsJava() + .forEach(this::apply); + } + + public List handle(RequireGroupsCommand requireGroupsCommand, Clock clock) { + List detectedChanges = detectChanges(requireGroupsCommand, clock); + detectedChanges.forEach(this::apply); + return detectedChanges; + } + + public List handle(MarkUnbindAsSucceededCommand command) { + Preconditions.checkArgument(state.bindedGroups().contains(command.getSucceededGroup()), + "unbinding a non binded group, or a used group"); + + UnbindSucceededEvent event = new UnbindSucceededEvent(history.getNextEventId(), command.getSucceededGroup()); + apply(event); + return ImmutableList.of(event); + } + + private List detectChanges(RequireGroupsCommand requireGroupsCommand, Clock clock) { + ImmutableSet addedGroups = ImmutableSet.copyOf(Sets.difference(requireGroupsCommand.getRegisteredGroups(), state.usedGroups())); + ImmutableSet removedGroups = ImmutableSet.builder() + .addAll(Sets.difference(state.usedGroups(), requireGroupsCommand.getRegisteredGroups())) + .addAll(state.bindedGroups()) + .build(); + + if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) { + ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone()); + RegisteredGroupListenerChangeEvent event = new RegisteredGroupListenerChangeEvent(history.getNextEventId(), + Hostname.localHost(), + now, + addedGroups, + removedGroups); + return ImmutableList.of(event); + } + return ImmutableList.of(); + } + + private void apply(Event event) { + if (event instanceof RegisteredGroupListenerChangeEvent) { + state = state.apply((RegisteredGroupListenerChangeEvent) event); + } else if (event instanceof UnbindSucceededEvent) { + state = state.apply((UnbindSucceededEvent) event); + } else { + throw new RuntimeException("Unsupported event class " + event.getClass()); + } + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java new file mode 100644 index 00000000000..0ac01e271b1 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommand.java @@ -0,0 +1,37 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Command; +import org.apache.james.mailbox.events.Group; + +import com.google.common.collect.ImmutableSet; + +public class RequireGroupsCommand implements Command { + private final ImmutableSet registeredGroups; + + public RequireGroupsCommand(ImmutableSet registeredGroups) { + this.registeredGroups = registeredGroups; + } + + public ImmutableSet getRegisteredGroups() { + return registeredGroups; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java new file mode 100644 index 00000000000..e8818625c4a --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/RequireGroupsCommandHandler.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import java.time.Clock; +import java.util.List; + +import org.apache.james.eventsourcing.CommandHandler; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + +public class RequireGroupsCommandHandler implements CommandHandler { + + private final EventStore eventStore; + private final Clock clock; + + public RequireGroupsCommandHandler(EventStore eventStore, Clock clock) { + this.eventStore = eventStore; + this.clock = clock; + } + + @Override + public Class handledClass() { + return RequireGroupsCommand.class; + } + + @Override + public Publisher> handle(RequireGroupsCommand command) { + return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) + .map(RegisteredGroupsAggregate::load) + .map(aggregate -> aggregate.handle(command, clock)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java new file mode 100644 index 00000000000..cf79f0a172b --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnbindSucceededEvent.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.mailbox.events.Group; + +public class UnbindSucceededEvent implements Event { + private final EventId eventId; + private final Group group; + + public UnbindSucceededEvent(EventId eventId, Group group) { + this.eventId = eventId; + this.group = group; + } + + @Override + public EventId eventId() { + return eventId; + } + + public Group getGroup() { + return group; + } + + @Override + public AggregateId getAggregateId() { + return RegisteredGroupsAggregate.AGGREGATE_ID; + } +} diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java new file mode 100644 index 00000000000..459003967b8 --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/eventsourcing/UnregisterRemovedGroupsSubscriber.java @@ -0,0 +1,65 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.Subscriber; +import org.apache.james.mailbox.events.Group; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class UnregisterRemovedGroupsSubscriber implements Subscriber { + @FunctionalInterface + public interface Unregisterer { + Mono unregister(Group group); + } + + @FunctionalInterface + public interface SuccessNotifier { + Mono notifySuccess(Group group); + } + + private final Unregisterer unregisterer; + private final SuccessNotifier successNotifier; + + public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer, SuccessNotifier successNotifier) { + this.unregisterer = unregisterer; + this.successNotifier = successNotifier; + } + + @Override + public void handle(Event event) { + if (event instanceof RegisteredGroupListenerChangeEvent) { + RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; + + Flux.fromIterable(changeEvent.getRemovedGroups()) + .concatMap(this::unregister) + .then() + .block(); + } + } + + private Publisher unregister(Group group) { + return unregisterer.unregister(group) + .then(successNotifier.notifySuccess(group)); + } +} diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java new file mode 100644 index 00000000000..3c193d4712b --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/eventsourcing/GroupUnregistringManagerTest.java @@ -0,0 +1,168 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.eventsourcing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.time.Clock; + +import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; +import org.apache.james.mailbox.events.GenericGroup; +import org.apache.james.mailbox.events.Group; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import reactor.core.publisher.Mono; + +class GroupUnregistringManagerTest { + private static final GenericGroup GROUP_A = new GenericGroup("a"); + private static final GenericGroup GROUP_B = new GenericGroup("b"); + + static class TestUnregisterer implements GroupUnregistringManager.Unregisterer { + private ImmutableList.Builder unregisteredGroups = ImmutableList.builder(); + + @Override + public Mono unregister(Group group) { + return Mono.fromRunnable(() -> unregisteredGroups.add(group)); + } + + ImmutableList unregisteredGroups() { + return unregisteredGroups.build(); + } + } + + static class FailOnceUnregisterer implements GroupUnregistringManager.Unregisterer { + private boolean shouldFail = true; + private ImmutableList.Builder unregisteredGroups = ImmutableList.builder(); + + @Override + public Mono unregister(Group group) { + if (shouldFail) { + shouldFail = false; + return Mono.error(new RuntimeException()); + } + return Mono.fromRunnable(() -> unregisteredGroups.add(group)); + } + + ImmutableList unregisteredGroups() { + return unregisteredGroups.build(); + } + } + + GroupUnregistringManager testee; + TestUnregisterer unregisterer; + + @BeforeEach + void setUp() { + unregisterer = new TestUnregisterer(); + testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); + } + + @Test + void startShouldNotUnregisterGroupsWhenNoHistory() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldNotUnregisterGroupsWhenNoChanges() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldNotUnregisterGroupsWhenAdditions() { + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .isEmpty(); + } + + @Test + void startShouldUnregisterGroupsWhenRemoval() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + + @Test + void startShouldNotFailWhenUnbindDidFail() { + GroupUnregistringManager.Unregisterer unregisterer = new FailOnceUnregisterer(); + GroupUnregistringManager testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThatCode(() -> testee.start(ImmutableSet.of(GROUP_A)).block()); + } + + @Test + void failedUnregisterShouldBeRetried() { + FailOnceUnregisterer unregisterer = new FailOnceUnregisterer(); + GroupUnregistringManager testee = new GroupUnregistringManager(new InMemoryEventStore(), unregisterer, Clock.systemUTC()); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + + @Test + void startShouldUnregisterGroupsWhenSwap() { + testee.start(ImmutableSet.of(GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B); + } + + @Test + void startShouldBeAbleToUnregisterPreviouslyUnregisteredGroups() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of(GROUP_A)).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_B, GROUP_B); + } + + @Test + void startWithNoGroupsShouldUnregisterAllPreviousGroups() { + testee.start(ImmutableSet.of(GROUP_A, GROUP_B)).block(); + testee.start(ImmutableSet.of()).block(); + + assertThat(unregisterer.unregisteredGroups()) + .containsExactly(GROUP_A, GROUP_B); + } +} \ No newline at end of file