Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3142 eventsourcing for group unregistration #3280

Closed
wants to merge 8 commits into from
13 changes: 13 additions & 0 deletions mailbox/event/event-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@
<artifactId>apache-james-mailbox-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-memory</artifactId>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordering issue

<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-lifecycle-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.time.Clock;

import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.Subscriber;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.events.Group;
import org.reactivestreams.Publisher;

import com.google.common.collect.ImmutableSet;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GroupUnregistringManager {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about GroupRegistry?

@FunctionalInterface
public interface Unregisterer {
Mono<Void> unregister(Group group);
}

private class UnregisterRemovedGroupsSubscriber implements Subscriber {
private final Unregisterer unregisterer;

public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) {
this.unregisterer = unregisterer;
}

@Override
public void handle(Event event) {
if (event instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event;

Flux.fromIterable(changeEvent.getRemovedGroups())
.concatMap(this::unregister)
.then()
.block();
}
}

private Publisher<Void> unregister(Group group) {
return unregisterer.unregister(group)
.then(notifyUnbind(group));
}
}

private final EventSourcingSystem eventSourcingSystem;

public GroupUnregistringManager(EventStore eventStore, Unregisterer unregisterer, Clock clock) {
this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of(
new RequireGroupsCommandHandler(eventStore, clock),
new MarkUnbindAsSucceededCommandHandler(eventStore)),
ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no delivery guarantee for event handlers, what will happen if we miss one event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no delivery guarantee for event handlers, what will happen if we miss one event?

The error scenario here is a server stop as our eventsourcing-eventbus is in memory based.

I guess in this very unlikely case a manual admin intervention to cliclk the "unbind" button would be acceptable. (I don't see how we could do better)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error scenario here is a server stop as our eventsourcing-eventbus is in memory based.

You should not try to describe all errors cases because:

  1. you'll always miss some
  2. the system this code depends on will change over time

Either we can enforce transactionality or we can't.

In this case, as we are using handlers, we are not in a transaction and we will miss an event sooner or later.

I can't think of a better solution than a handler but it means we have to figure out what happens in this failure case.

I guess in this very unlikely case a manual admin intervention to cliclk the "unbind" button would be acceptable. (I don't see how we could do better)

How he would know he need to do that? (sorry, I tried to find a solution but can't in a decent timeframe)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have an aggregate handling 3 state for a group:

  • used
  • used but still binded
  • not used and unbinded

Then if the unRegisterSubscriber succeeds, it can fire a UnbindSucceededCommand on the aggregate.

That way we might attempt several time an unbind operation, without manual admin operation.

eventStore);
}

public Mono<Void> start(ImmutableSet<Group> groups) {
RequireGroupsCommand requireGroupsCommand = new RequireGroupsCommand(groups);

return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand));
}

private Mono<Void> notifyUnbind(Group group) {
MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group);

return Mono.from(eventSourcingSystem.dispatch(command));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;

public class Hostname {
public static Hostname of(String value) {
Preconditions.checkNotNull(value, "'value' should not be null");
Preconditions.checkArgument(StringUtils.isBlank(value), "'value' ");

return new Hostname(value);
}

public static Hostname localHost() {
try {
return new Hostname(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new UncheckedIOException(e);
}
}

private final String value;

private Hostname(String value) {
this.value = value;
}

@Override
public final boolean equals(Object o) {
if (o instanceof Hostname) {
Hostname hostname = (Hostname) o;

return Objects.equals(this.value, hostname.value);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import org.apache.james.eventsourcing.Command;
import org.apache.james.mailbox.events.Group;

public class MarkUnbindAsSucceededCommand implements Command {
private final Group succeededGroup;

public MarkUnbindAsSucceededCommand(Group succeededGroup) {
this.succeededGroup = succeededGroup;
}

public Group getSucceededGroup() {
return succeededGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import java.util.List;

import org.apache.james.eventsourcing.CommandHandler;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Mono;

public class MarkUnbindAsSucceededCommandHandler implements CommandHandler<MarkUnbindAsSucceededCommand> {
private final EventStore eventStore;

public MarkUnbindAsSucceededCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}

@Override
public Class<MarkUnbindAsSucceededCommand> handledClass() {
return MarkUnbindAsSucceededCommand.class;
}

@Override
public Publisher<List<? extends Event>> handle(MarkUnbindAsSucceededCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID))
.map(RegisteredGroupsAggregate::load)
.map(aggregate -> aggregate.handle(command));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.events.eventsourcing;

import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID;

import java.time.ZonedDateTime;
import java.util.Objects;

import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.mailbox.events.Group;

import com.google.common.collect.ImmutableSet;

public class RegisteredGroupListenerChangeEvent implements Event {
private final EventId eventId;
private final Hostname hostname;
private final ZonedDateTime zonedDateTime;
private final ImmutableSet<Group> addedGroups;
private final ImmutableSet<Group> removedGroups;

public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet<Group> addedGroups, ImmutableSet<Group> removedGroups) {
this.hostname = hostname;
this.zonedDateTime = zonedDateTime;
this.eventId = eventId;
this.addedGroups = addedGroups;
this.removedGroups = removedGroups;
}


@Override
public EventId eventId() {
return eventId;
}

@Override
public AggregateId getAggregateId() {
return AGGREGATE_ID;
}

public ImmutableSet<Group> getAddedGroups() {
return addedGroups;
}

public ImmutableSet<Group> getRemovedGroups() {
return removedGroups;
}

public EventId getEventId() {
return eventId;
}

public Hostname getHostname() {
return hostname;
}

public ZonedDateTime getZonedDateTime() {
return zonedDateTime;
}

@Override
public final boolean equals(Object o) {
if (o instanceof RegisteredGroupListenerChangeEvent) {
RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o;

return Objects.equals(this.eventId, that.eventId)
&& Objects.equals(this.addedGroups, that.addedGroups)
&& Objects.equals(this.removedGroups, that.removedGroups);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(eventId, addedGroups, removedGroups);
}
}
Loading