-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JAMES-3142 eventsourcing for group unregistration #3280
Changes from 7 commits
90d9a95
4358c31
eb3988b
1166120
1aabf53
9ad44cd
cbc47c1
7837e68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/**************************************************************** | ||
* Licensed to the Apache Software Foundation (ASF) under one * | ||
* or more contributor license agreements. See the NOTICE file * | ||
* distributed with this work for additional information * | ||
* regarding copyright ownership. The ASF licenses this file * | ||
* to you under the Apache License, Version 2.0 (the * | ||
* "License"); you may not use this file except in compliance * | ||
* with the License. You may obtain a copy of the License at * | ||
* * | ||
* http://www.apache.org/licenses/LICENSE-2.0 * | ||
* * | ||
* Unless required by applicable law or agreed to in writing, * | ||
* software distributed under the License is distributed on an * | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * | ||
* KIND, either express or implied. See the License for the * | ||
* specific language governing permissions and limitations * | ||
* under the License. * | ||
****************************************************************/ | ||
|
||
package org.apache.james.mailbox.events.eventsourcing; | ||
|
||
import java.time.Clock; | ||
|
||
import org.apache.james.eventsourcing.Event; | ||
import org.apache.james.eventsourcing.EventId; | ||
import org.apache.james.eventsourcing.EventSourcingSystem; | ||
import org.apache.james.eventsourcing.Subscriber; | ||
import org.apache.james.eventsourcing.eventstore.EventStore; | ||
import org.apache.james.mailbox.events.Group; | ||
import org.reactivestreams.Publisher; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
|
||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
public class GroupUnregistringManager { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about |
||
@FunctionalInterface | ||
public interface Unregisterer { | ||
Mono<Void> unregister(Group group); | ||
} | ||
|
||
private class UnregisterRemovedGroupsSubscriber implements Subscriber { | ||
private final Unregisterer unregisterer; | ||
|
||
public UnregisterRemovedGroupsSubscriber(Unregisterer unregisterer) { | ||
this.unregisterer = unregisterer; | ||
} | ||
|
||
@Override | ||
public void handle(Event event) { | ||
if (event instanceof RegisteredGroupListenerChangeEvent) { | ||
RegisteredGroupListenerChangeEvent changeEvent = (RegisteredGroupListenerChangeEvent) event; | ||
|
||
Flux.fromIterable(changeEvent.getRemovedGroups()) | ||
.concatMap(group -> unregister(group, changeEvent.eventId())) | ||
.then() | ||
.block(); | ||
} | ||
} | ||
|
||
private Publisher<Void> unregister(Group group, EventId generatedForEventId) { | ||
return unregisterer.unregister(group) | ||
.then(notifyUnbind(group, generatedForEventId)); | ||
} | ||
} | ||
|
||
private final EventSourcingSystem eventSourcingSystem; | ||
|
||
public GroupUnregistringManager(EventStore eventStore, Unregisterer unregisterer, Clock clock) { | ||
this.eventSourcingSystem = EventSourcingSystem.fromJava(ImmutableSet.of( | ||
new RequireGroupsCommandHandler(eventStore, clock), | ||
new MarkUnbindAsSucceededCommandHandler(eventStore)), | ||
ImmutableSet.of(new UnregisterRemovedGroupsSubscriber(unregisterer)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's no delivery guarantee for event handlers, what will happen if we miss one event? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The error scenario here is a server stop as our eventsourcing-eventbus is in memory based. I guess in this very unlikely case a manual admin intervention to cliclk the "unbind" button would be acceptable. (I don't see how we could do better) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You should not try to describe all errors cases because:
Either we can enforce transactionality or we can't. In this case, as we are using handlers, we are not in a transaction and we will miss an event sooner or later. I can't think of a better solution than a handler but it means we have to figure out what happens in this failure case.
How he would know he need to do that? (sorry, I tried to find a solution but can't in a decent timeframe) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could have an aggregate handling 3 state for a group:
Then if the unRegisterSubscriber succeeds, it can fire a UnbindSucceededCommand on the aggregate. That way we might attempt several time an unbind operation, without manual admin operation. |
||
eventStore); | ||
} | ||
|
||
public Mono<Void> start(ImmutableSet<Group> groups) { | ||
RequireGroupsCommand requireGroupsCommand = new RequireGroupsCommand(groups); | ||
|
||
return Mono.from(eventSourcingSystem.dispatch(requireGroupsCommand)); | ||
} | ||
|
||
private Mono<Void> notifyUnbind(Group group, EventId ganaratedForEventId) { | ||
chibenwa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
MarkUnbindAsSucceededCommand command = new MarkUnbindAsSucceededCommand(group, ganaratedForEventId); | ||
|
||
return Mono.from(eventSourcingSystem.dispatch(command)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/**************************************************************** | ||
* Licensed to the Apache Software Foundation (ASF) under one * | ||
* or more contributor license agreements. See the NOTICE file * | ||
* distributed with this work for additional information * | ||
* regarding copyright ownership. The ASF licenses this file * | ||
* to you under the Apache License, Version 2.0 (the * | ||
* "License"); you may not use this file except in compliance * | ||
* with the License. You may obtain a copy of the License at * | ||
* * | ||
* http://www.apache.org/licenses/LICENSE-2.0 * | ||
* * | ||
* Unless required by applicable law or agreed to in writing, * | ||
* software distributed under the License is distributed on an * | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * | ||
* KIND, either express or implied. See the License for the * | ||
* specific language governing permissions and limitations * | ||
* under the License. * | ||
****************************************************************/ | ||
|
||
package org.apache.james.mailbox.events.eventsourcing; | ||
|
||
import java.io.UncheckedIOException; | ||
import java.net.InetAddress; | ||
import java.net.UnknownHostException; | ||
import java.util.Objects; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
||
public class Hostname { | ||
public static Hostname of(String value) { | ||
Preconditions.checkNotNull(value, "'value' should not be null"); | ||
Preconditions.checkArgument(StringUtils.isBlank(value), "'value' "); | ||
|
||
return new Hostname(value); | ||
} | ||
|
||
public static Hostname localHost() { | ||
try { | ||
return new Hostname(InetAddress.getLocalHost().getHostName()); | ||
} catch (UnknownHostException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
} | ||
|
||
private final String value; | ||
|
||
private Hostname(String value) { | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public final boolean equals(Object o) { | ||
if (o instanceof Hostname) { | ||
Hostname hostname = (Hostname) o; | ||
|
||
return Objects.equals(this.value, hostname.value); | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public final int hashCode() { | ||
return Objects.hash(value); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/**************************************************************** | ||
* Licensed to the Apache Software Foundation (ASF) under one * | ||
* or more contributor license agreements. See the NOTICE file * | ||
* distributed with this work for additional information * | ||
* regarding copyright ownership. The ASF licenses this file * | ||
* to you under the Apache License, Version 2.0 (the * | ||
* "License"); you may not use this file except in compliance * | ||
* with the License. You may obtain a copy of the License at * | ||
* * | ||
* http://www.apache.org/licenses/LICENSE-2.0 * | ||
* * | ||
* Unless required by applicable law or agreed to in writing, * | ||
* software distributed under the License is distributed on an * | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * | ||
* KIND, either express or implied. See the License for the * | ||
* specific language governing permissions and limitations * | ||
* under the License. * | ||
****************************************************************/ | ||
|
||
package org.apache.james.mailbox.events.eventsourcing; | ||
|
||
import org.apache.james.eventsourcing.Command; | ||
import org.apache.james.eventsourcing.EventId; | ||
import org.apache.james.mailbox.events.Group; | ||
|
||
public class MarkUnbindAsSucceededCommand implements Command { | ||
private final Group succeededGroup; | ||
private final EventId generatedForEventId; | ||
|
||
public MarkUnbindAsSucceededCommand(Group succeededGroup, EventId generatedForEventId) { | ||
this.succeededGroup = succeededGroup; | ||
this.generatedForEventId = generatedForEventId; | ||
} | ||
|
||
public Group getSucceededGroup() { | ||
return succeededGroup; | ||
} | ||
|
||
public EventId getGeneratedForEventId() { | ||
return generatedForEventId; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/**************************************************************** | ||
* Licensed to the Apache Software Foundation (ASF) under one * | ||
* or more contributor license agreements. See the NOTICE file * | ||
* distributed with this work for additional information * | ||
* regarding copyright ownership. The ASF licenses this file * | ||
* to you under the Apache License, Version 2.0 (the * | ||
* "License"); you may not use this file except in compliance * | ||
* with the License. You may obtain a copy of the License at * | ||
* * | ||
* http://www.apache.org/licenses/LICENSE-2.0 * | ||
* * | ||
* Unless required by applicable law or agreed to in writing, * | ||
* software distributed under the License is distributed on an * | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * | ||
* KIND, either express or implied. See the License for the * | ||
* specific language governing permissions and limitations * | ||
* under the License. * | ||
****************************************************************/ | ||
|
||
package org.apache.james.mailbox.events.eventsourcing; | ||
|
||
import java.util.List; | ||
|
||
import org.apache.james.eventsourcing.CommandHandler; | ||
import org.apache.james.eventsourcing.Event; | ||
import org.apache.james.eventsourcing.eventstore.EventStore; | ||
import org.reactivestreams.Publisher; | ||
|
||
import reactor.core.publisher.Mono; | ||
|
||
public class MarkUnbindAsSucceededCommandHandler implements CommandHandler<MarkUnbindAsSucceededCommand> { | ||
private final EventStore eventStore; | ||
|
||
public MarkUnbindAsSucceededCommandHandler(EventStore eventStore) { | ||
this.eventStore = eventStore; | ||
} | ||
|
||
@Override | ||
public Class<MarkUnbindAsSucceededCommand> handledClass() { | ||
return MarkUnbindAsSucceededCommand.class; | ||
} | ||
|
||
@Override | ||
public Publisher<List<? extends Event>> handle(MarkUnbindAsSucceededCommand command) { | ||
return Mono.from(eventStore.getEventsOfAggregate(RegisteredGroupsAggregate.AGGREGATE_ID)) | ||
.map(RegisteredGroupsAggregate::load) | ||
.map(aggregate -> aggregate.handle(command)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/**************************************************************** | ||
* Licensed to the Apache Software Foundation (ASF) under one * | ||
* or more contributor license agreements. See the NOTICE file * | ||
* distributed with this work for additional information * | ||
* regarding copyright ownership. The ASF licenses this file * | ||
* to you under the Apache License, Version 2.0 (the * | ||
* "License"); you may not use this file except in compliance * | ||
* with the License. You may obtain a copy of the License at * | ||
* * | ||
* http://www.apache.org/licenses/LICENSE-2.0 * | ||
* * | ||
* Unless required by applicable law or agreed to in writing, * | ||
* software distributed under the License is distributed on an * | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * | ||
* KIND, either express or implied. See the License for the * | ||
* specific language governing permissions and limitations * | ||
* under the License. * | ||
****************************************************************/ | ||
|
||
package org.apache.james.mailbox.events.eventsourcing; | ||
|
||
import static org.apache.james.mailbox.events.eventsourcing.RegisteredGroupsAggregate.AGGREGATE_ID; | ||
|
||
import java.time.ZonedDateTime; | ||
import java.util.Objects; | ||
|
||
import org.apache.james.eventsourcing.AggregateId; | ||
import org.apache.james.eventsourcing.Event; | ||
import org.apache.james.eventsourcing.EventId; | ||
import org.apache.james.mailbox.events.Group; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
|
||
public class RegisteredGroupListenerChangeEvent implements Event { | ||
private final EventId eventId; | ||
private final Hostname hostname; | ||
private final ZonedDateTime zonedDateTime; | ||
private final ImmutableSet<Group> addedGroups; | ||
private final ImmutableSet<Group> removedGroups; | ||
|
||
public RegisteredGroupListenerChangeEvent(EventId eventId, Hostname hostname, ZonedDateTime zonedDateTime, ImmutableSet<Group> addedGroups, ImmutableSet<Group> removedGroups) { | ||
this.hostname = hostname; | ||
this.zonedDateTime = zonedDateTime; | ||
this.eventId = eventId; | ||
this.addedGroups = addedGroups; | ||
this.removedGroups = removedGroups; | ||
} | ||
|
||
|
||
@Override | ||
public EventId eventId() { | ||
return eventId; | ||
} | ||
|
||
@Override | ||
public AggregateId getAggregateId() { | ||
return AGGREGATE_ID; | ||
} | ||
|
||
public ImmutableSet<Group> getAddedGroups() { | ||
return addedGroups; | ||
} | ||
|
||
public ImmutableSet<Group> getRemovedGroups() { | ||
return removedGroups; | ||
} | ||
|
||
public EventId getEventId() { | ||
return eventId; | ||
} | ||
|
||
public Hostname getHostname() { | ||
return hostname; | ||
} | ||
|
||
public ZonedDateTime getZonedDateTime() { | ||
return zonedDateTime; | ||
} | ||
|
||
@Override | ||
public final boolean equals(Object o) { | ||
if (o instanceof RegisteredGroupListenerChangeEvent) { | ||
RegisteredGroupListenerChangeEvent that = (RegisteredGroupListenerChangeEvent) o; | ||
|
||
return Objects.equals(this.eventId, that.eventId) | ||
&& Objects.equals(this.addedGroups, that.addedGroups) | ||
&& Objects.equals(this.removedGroups, that.removedGroups); | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public final int hashCode() { | ||
return Objects.hash(eventId, addedGroups, removedGroups); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ordering issue