Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3142 Rely on event sourcing to correct current group registration #3303

Closed
wants to merge 9 commits into from
13 changes: 13 additions & 0 deletions mailbox/event/event-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
<artifactId>apache-james-mailbox-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-memory</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-lifecycle-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsSubscriber.Registrer;

import java.time.Clock;

import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsSubscriber.Unregisterer;
import org.reactivestreams.Publisher;

import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GroupUnregistringManager {
@FunctionalInterface
public interface RegisteredGroupsProvider {
Publisher<Group> registeredGroups();
}

private final EventSourcingSystem eventSourcingSystem;
private final EventStore eventStore;
private final RegisteredGroupsProvider registeredGroupsProvider;

public GroupUnregistringManager(EventStore eventStore,
Unregisterer unregisterer,
Registrer registrer,
RegisteredGroupsProvider registeredGroupsProvider,
Clock clock) {

this.eventStore = eventStore;
this.registeredGroupsProvider = registeredGroupsProvider;
this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new RequireGroupsCommandHandler(eventStore, clock)),
ImmutableSet.of(new RegisteredGroupsSubscriber(unregisterer, registrer)),
this.eventStore);
}

public Mono<Void> start(ImmutableSet<Group> requiredGroups) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need your aggregate to build your command, that looks like a design violation.

return Flux.from(registeredGroupsProvider.registeredGroups())
.collect(Guavate.toImmutableSet())
.map(registeredGroups -> new RequireGroupsCommand(requiredGroups, registeredGroups))
.flatMap(command -> Mono.from(eventSourcingSystem.dispatch(command)));
}

@VisibleForTesting
Mono<ImmutableSet<Group>> requiredGroups() {
return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID))
.map(RegisteredGroupsAggregate::load)
.map(RegisteredGroupsAggregate::requiredGroups);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;

public class Hostname {
public static Hostname of(String value) {
Preconditions.checkNotNull(value, "'value' should not be null");
Preconditions.checkArgument(StringUtils.isBlank(value), "'value' ");

return new Hostname(value);
}

public static Hostname localHost() {
try {
return new Hostname(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new UncheckedIOException(e);
}
}

private final String value;

private Hostname(String value) {
this.value = value;
}

@Override
public final boolean equals(Object o) {
if (o instanceof Hostname) {
Hostname hostname = (Hostname) o;

return Objects.equals(this.value, hostname.value);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID;

import java.time.ZonedDateTime;
import java.util.Objects;

import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.mailbox.events.Group;

import com.google.common.collect.ImmutableSet;

public class RegisteredGroupListenerChangeEvent implements Event {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not describing events (like groupAdded or groupRemoved) but a state, it's not event sourcing anymore.

private final EventId eventId;
private final Hostname hostname;
private final ZonedDateTime zonedDateTime;
private final ImmutableSet<Group> requiredGroups;
private final ImmutableSet<Group> registeredGroups;

public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet<Group> requiredGroups, ImmutableSet<Group> registeredGroups) {
this.hostname = hostname;
this.zonedDateTime = zonedDateTime;
this.eventId = eventId;
this.requiredGroups = requiredGroups;
this.registeredGroups = registeredGroups;
}

@Override
public EventId eventId() {
return eventId;
}

@Override
public AggregateId getAggregateId() {
return AGGREGATE_ID;
}

public ImmutableSet<Group> getRegisteredGroups() {
return registeredGroups;
}

public ImmutableSet<Group> getRequiredGroups() {
return requiredGroups;
}

public EventId getEventId() {
return eventId;
}

public Hostname getHostname() {
return hostname;
}

public ZonedDateTime getZonedDateTime() {
return zonedDateTime;
}

@Override
public final boolean equals(Object o) {
if (o instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o;

return Objects.equals(this.eventId, that.eventId)
&& Objects.equals(this.zonedDateTime, that.zonedDateTime)
&& Objects.equals(this.hostname, that.hostname)
&& Objects.equals(this.registeredGroups, that.registeredGroups)
&& Objects.equals(this.requiredGroups, that.requiredGroups);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(eventId, hostname, zonedDateTime, registeredGroups, requiredGroups);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.time.Clock;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.mailbox.events.Group;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public class RegisteredGroupsAggregate {
static AggregateId AGGREGATE_ID = () -> "RegisteredGroupListenerChangeEvent";

private static class State {
static State initial() {
return new State(ImmutableSet.of(), ImmutableSet.of());
}

final ImmutableSet<Group> registeredGroups;
final ImmutableSet<Group> requiredGroups;

public State(ImmutableSet<Group> registeredGroups, ImmutableSet<Group> requiredGroups) {
this.registeredGroups = registeredGroups;
this.requiredGroups = requiredGroups;
}

private State apply(RegisteredGroupListenerChangeEvent event) {
return new State(event.getRegisteredGroups(),
event.getRequiredGroups());
}
}

public static RegisteredGroupsAggregate load(History history) {
return new RegisteredGroupsAggregate(history);
}

private final History history;
private State state;

private RegisteredGroupsAggregate(History history) {
this.history = history;
this.state = State.initial();

history.getEventsJava()
.forEach(this::apply);
}

public List<RegisteredGroupListenerChangeEvent> handle(RequireGroupsCommand requireGroupsCommand, Clock clock) {
List<RegisteredGroupListenerChangeEvent> detectedChanges = detectChanges(requireGroupsCommand, clock);

detectedChanges.forEach(this::apply);

return detectedChanges;
}

public ImmutableSet<Group> requiredGroups() {
return state.requiredGroups;
}

private List<RegisteredGroupListenerChangeEvent> detectChanges(RequireGroupsCommand command, Clock clock) {
boolean historyChange = !state.requiredGroups.equals(command.getRequiredGroups()) ||
!state.registeredGroups.equals(command.getRegisteredGroups());
boolean registeredChangeNeeded = !command.getRegisteredGroups().equals(command.getRequiredGroups());

if (historyChange || registeredChangeNeeded) {
return emitEvent(command, clock);
}
return ImmutableList.of();
}

private List<RegisteredGroupListenerChangeEvent> emitEvent(RequireGroupsCommand command, Clock clock) {
ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), clock.getZone());
RegisteredGroupListenerChangeEvent event = new RegisteredGroupListenerChangeEvent(history.getNextEventId(),
Hostname.localHost(),
now,
command.getRequiredGroups(),
command.getRegisteredGroups());
return ImmutableList.of(event);
}

private void apply(Event event) {
Preconditions.checkArgument(event instanceof RegisteredGroupListenerChangeEvent);

state = state.apply((RegisteredGroupListenerChangeEvent) event);
}
}
Loading