Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3142 eventsourcing for group unregistration #3280

Closed
wants to merge 8 commits into from
13 changes: 13 additions & 0 deletions mailbox/event/event-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
<artifactId>apache-james-mailbox-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-memory</artifactId>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordering issue

<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-lifecycle-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.events.Group;

import com.google.common.collect.ImmutableSet;

import reactor.core.publisher.Mono;

public class GroupUnregistringManager {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about GroupRegistry?

private final EventSourcingSystem eventSourcingSystem;

public GroupUnregistringManager(EventStore eventStore, UnregisterRemovedGroupsSubscriber.Unregisterer unregisterer) {
this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(new StartCommandHandler(eventStore)),
ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no delivery guarantee for event handlers, what will happen if we miss one event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no delivery guarantee for event handlers, what will happen if we miss one event?

The error scenario here is a server stop as our eventsourcing-eventbus is in memory based.

I guess in this very unlikely case a manual admin intervention to cliclk the "unbind" button would be acceptable. (I don't see how we could do better)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error scenario here is a server stop as our eventsourcing-eventbus is in memory based.

You should not try to describe all errors cases because:

  1. you'll always miss some
  2. the system this code depends on will change over time

Either we can enforce transactionality or we can't.

In this case, as we are using handlers, we are not in a transaction and we will miss an event sooner or later.

I can't think of a better solution than a handler but it means we have to figure out what happens in this failure case.

I guess in this very unlikely case a manual admin intervention to cliclk the "unbind" button would be acceptable. (I don't see how we could do better)

How he would know he need to do that? (sorry, I tried to find a solution but can't in a decent timeframe)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have an aggregate handling 3 state for a group:

  • used
  • used but still binded
  • not used and unbinded

Then if the unRegisterSubscriber succeeds, it can fire a UnbindSucceededCommand on the aggregate.

That way we might attempt several time an unbind operation, without manual admin operation.

eventStore);
}

public Mono<Void> start(ImmutableSet<Group> groups) {
StartCommand startCommand = new StartCommand(groups);

return Mono.from(eventSourcingSystem.dispatch(startCommand));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID;

import java.util.Objects;

import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.mailbox.events.Group;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

public class RegisteredGroupListenerChangeEvent implements Event {

private final EventId eventId;
private final ImmutableSet<Group> addedGroups;
private final ImmutableSet<Group> removedGroups;

public RegisteredGroupListenerChangeEvent(EventId eventId, ImmutableSet<Group> addedGroups, ImmutableSet<Group> removedGroups) {
chibenwa marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(Sets.intersection(addedGroups, removedGroups).isEmpty(),
"'addedGroups' and 'removedGroups' elements needs to be distinct");

this.eventId = eventId;
this.addedGroups = addedGroups;
this.removedGroups = removedGroups;
}


@Override
public EventId eventId() {
return eventId;
}

@Override
public AggregateId getAggregateId() {
return AGGREGATE_ID;
}

public ImmutableSet<Group> getAddedGroups() {
return addedGroups;
}

public ImmutableSet<Group> getRemovedGroups() {
return removedGroups;
}

@Override
public final boolean equals(Object o) {
if (o instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o;

return Objects.equals(this.eventId, that.eventId)
&& Objects.equals(this.addedGroups, that.addedGroups)
&& Objects.equals(this.removedGroups, that.removedGroups);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(eventId, addedGroups, removedGroups);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.util.List;

import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.mailbox.events.Group;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

public class RegisteredGroupsAggregate {
static AggregateId AGGREGATE_ID = () -> "RegisteredGroupListenerChangeEvent";

private static class State {
static State initial() {
return new State(ImmutableSet.of());
}

final ImmutableSet<Group> groups;

private State(ImmutableSet<Group> groups) {
this.groups = groups;
}

private State apply(RegisteredGroupListenerChangeEvent event) {
return new State(ImmutableSet.<Group>builder()
.addAll(Sets.difference(groups, event.getRemovedGroups()))
.addAll(event.getAddedGroups())
.build());
}
}

public static RegisteredGroupsAggregate load(History history) {
return new RegisteredGroupsAggregate(history);
}

private final History history;
private State state;

private RegisteredGroupsAggregate(History history) {
this.history = history;
this.state = State.initial();

history.getEventsJava()
.forEach(this::apply);
}

public List<RegisteredGroupListenerChangeEvent> handleStart(StartCommand startCommand) {
List<RegisteredGroupListenerChangeEvent> detectedChanges = detectChanges(startCommand);
chibenwa marked this conversation as resolved.
Show resolved Hide resolved

detectedChanges.forEach(this::apply);

return detectedChanges;
}

private List<RegisteredGroupListenerChangeEvent> detectChanges(StartCommand startCommand) {
ImmutableSet<Group> addedGroups = ImmutableSet.copyOf(Sets.difference(startCommand.getRegisteredGroups(), state.groups));
ImmutableSet<Group> removedGroups = ImmutableSet.copyOf(Sets.difference(state.groups, startCommand.getRegisteredGroups()));

if (!addedGroups.isEmpty() || !removedGroups.isEmpty()) {
return ImmutableList.of(new RegisteredGroupListenerChangeEvent(history.getNextEventId(), addedGroups, removedGroups));
}
return ImmutableList.of();
}

private void apply(Event event) {
Preconditions.checkArgument(event instanceof RegisteredGroupListenerChangeEvent);

state = state.apply((RegisteredGroupListenerChangeEvent) event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import org.apache.james.eventsourcing.Command;
import org.apache.james.mailbox.events.Group;

import com.google.common.collect.ImmutableSet;

public class StartCommand implements Command {
chibenwa marked this conversation as resolved.
Show resolved Hide resolved
private final ImmutableSet<Group> registeredGroups;

public StartCommand(ImmutableSet<Group> registeredGroups) {
this.registeredGroups = registeredGroups;
}

public ImmutableSet<Group> getRegisteredGroups() {
return registeredGroups;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.util.List;

import org.apache.james.eventsourcing.CommandHandler;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Mono;

public class StartCommandHandler implements CommandHandler<StartCommand> {

private final EventStore eventStore;

public StartCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}

@Override
public Class<StartCommand> handledClass() {
return StartCommand.class;
}

@Override
public Publisher<List<? extends Event>> handle(StartCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID))
.map(RegisteredGroupsAggregate::load)
.map(aggregate -> aggregate.handleStart(command));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.Subscriber;
import org.apache.james.mailbox.events.Group;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;

public class UnregisterRemovedGroupsSubscriber implements Subscriber {
@FunctionalInterface
public interface Unregisterer {
Publisher<Void> unregister(Group group);
}

private final Unregisterer unregisterer;

public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) {
this.unregisterer = unregisterer;
}

@Override
public void handle(Event event) {
if (event instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event;

Flux.fromIterable(changeEvent.getRemovedGroups())
.concatMap(unregisterer::unregister)
.then()
.block();
}
}
}
Loading