diff --git a/spring-geode/src/main/java/org/springframework/geode/cache/RepositoryAsyncEventListener.java b/spring-geode/src/main/java/org/springframework/geode/cache/RepositoryAsyncEventListener.java new file mode 100644 index 000000000..951f41c45 --- /dev/null +++ b/spring-geode/src/main/java/org/springframework/geode/cache/RepositoryAsyncEventListener.java @@ -0,0 +1,470 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.springframework.geode.cache; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; + +import org.springframework.data.gemfire.util.CollectionUtils; +import org.springframework.data.repository.CrudRepository; +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * An Apache Geode {@link AsyncEventListener} that uses a Spring Data {@link CrudRepository} to perform + * data access operations to a backend, external data source asynchronously, triggered by cache operations. + * + * @author John Blum + * @see java.util.function.Function + * @see org.apache.geode.cache.Operation + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see org.apache.geode.cache.asyncqueue.AsyncEventListener + * @see org.springframework.data.repository.CrudRepository + * @since 1.4.0 + */ +public class RepositoryAsyncEventListener implements AsyncEventListener { + + protected static final AsyncEventErrorHandler DEFAULT = eventError -> false; + + private AsyncEventErrorHandler asyncEventErrorHandler = DEFAULT; + + private final CrudRepository repository; + + private final List> repositoryFunctions = new CopyOnWriteArrayList<>(); + + /** + * Constructs a new instance of {@link RepositoryAsyncEventListener} initialized with the given Spring Data + * {@link CrudRepository}. + * + * @param repository Spring Data {@link CrudRepository} used to perform data access operations to a backend, + * external data source when triggered by a cache operation; must not be {@literal null}. + * @throws IllegalArgumentException if {@link CrudRepository} is {@literal null}. + * @see org.springframework.data.repository.CrudRepository + */ + public RepositoryAsyncEventListener(@NonNull CrudRepository repository) { + + Assert.notNull(repository, "CrudRepository must not be null"); + + this.repository = repository; + + this.repositoryFunctions.addAll(Arrays.asList( + new CreateUpdateAsyncEventRepositoryFunction<>(), + new RemoveAsyncEventRepositoryFunction<>() + )); + } + + /** + * Configures an {@link AsyncEventErrorHandler} to handle errors that may occur when this listener is invoked with + * a batch of {@link AsyncEvent AsyncEvents}. + * + * Since the processing of {@link AsyncEvent AsyncEvents} is asynchronous, the {@link AsyncEventErrorHandler} gives + * users the opportunity to respond to errors for each {@link AsyncEvent} as it is is processed given this listener + * is designed to coordinate data/state changes occurring in a GemFire/Geode cache with an external data source. + * + * @param asyncEventErrorHandler {@link AsyncEventErrorHandler} used to handle errors while processing the batch of + * {@link AsyncEvent AsyncEvents}. + * @see AsyncEventErrorHandler + */ + public void setAsyncEventErrorHandler(@Nullable AsyncEventErrorHandler asyncEventErrorHandler) { + this.asyncEventErrorHandler = asyncEventErrorHandler; + } + + /** + * Gets the configured {@link AsyncEventErrorHandler} used to handle errors that may occur when this listener + * is invoked with a batch of {@link AsyncEvent AsyncEvents}. + * + * @return the configured {@link AsyncEventErrorHandler}; never {@literal null}. + * @see AsyncEventErrorHandler + */ + protected @NonNull AsyncEventErrorHandler getAsyncEventErrorHandler() { + return this.asyncEventErrorHandler != null ? this.asyncEventErrorHandler : DEFAULT; + } + + /** + * Gets a reference to the configured Spring Data {@link CrudRepository} used by this {@link AsyncEventListener} + * to perform data access operations to a backend, external data source asynchronously when triggered by a cache + * operation. + * + * @return a reference to the configured Spring Data {@link CrudRepository}. + * @see org.springframework.data.repository.CrudRepository + */ + protected @NonNull CrudRepository getRepository() { + return this.repository; + } + + /** + * Gets a {@link List} of {@link AsyncEventOperationRepositoryFunction} objects used to process + * {@link AsyncEvent AsyncEvents} passed to this listener by inspecting the {@link Operation} + * on the {@link AsyncEvent} and calling the appropriate {@link CrudRepository} method. + * + * @return a {@link List} of {@link AsyncEventOperationRepositoryFunction} objects to process + * the {@link AsyncEvent AsyncEvents}; never {@literal null}. + * @see AsyncEventOperationRepositoryFunction + */ + protected @NonNull List> getRepositoryFunctions() { + return this.repositoryFunctions; + } + + /** + * Processes each {@link AsyncEvent} in order by first determining whether the {@link AsyncEvent} can be processed + * by this listener and then invokes the appropriate Spring Data {@link CrudRepository} data access operation + * corresponding to the {@link AsyncEvent} {@link Operation}. + * + * @param events {@link List} of {@link AsyncEvent AsyncEvents} to process. + * @return a boolean value indicating whether all {@link AsyncEvent AsyncEvents} were processes successfully. + * If any {@link AsyncEvent} fails to be processed (just one), then this method will return {@literal false}. + * If any {@link AsyncEvent} cannot be handled, then this method will return {@literal false}, even if other + * {@link AsyncEvent AsyncEvents} were successfully processed. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see AsyncEventOperationRepositoryFunction + * @see #getRepositoryFunctions() + * @see java.util.List + */ + @Override + @SuppressWarnings("unchecked") + public boolean processEvents(List events) { + + AtomicBoolean result = new AtomicBoolean(true); + + CollectionUtils.nullSafeList(events).stream() + .filter(Objects::nonNull) + .forEach(event -> { + + Optional> repositoryFunction = + getRepositoryFunctions().stream() + .filter(function -> function.canProcess(event)) + .findFirst(); + + boolean processed = Boolean.TRUE.equals(repositoryFunction + .map(function -> function.apply(event)) + .orElse(false)); + + result.compareAndSet(true, processed); + }); + + return result.get(); + } + + /** + * Registers a {@link AsyncEventOperationRepositoryFunction} capable of processing {@link AsyncEvent AsyncEvents} + * by {@link Operation} and invoking the appropriate Spring Data {@link CrudRepository} data access operation. + * + * {@link AsyncEventOperationRepositoryFunction AsyncEventOperationRepositoryFunctions} can be registered for + * {@link AsyncEvent} {@link Operation Operations} not currently handled by this listener. Alternatively, users can + * override existing {@link AsyncEventOperationRepositoryFunction AsyncEventOperationRepositoryFunctions} provided + * by this listener to alter the default behavior, or effectively the Spring Data {@link CrudRepository} data access + * operation invoked based on the {@link AsyncEvent} {@link Operation}. The {@code repositoryFunction} arguments are + * prepended to the {@link List} of registered {@link Function Functions} to implement the override, where the first + * {@link Function} found capable of handling the {@link AsyncEvent} {@link Operation} will be applied. + * + * @param repositoryFunction {@link AsyncEventOperationRepositoryFunction} used to process + * {@link AsyncEvent AsyncEvents} by {@link Operation} invoking the appropriate Spring Data {@link CrudRepository} + * data access operation; must not be {@literal null}. + * @return a boolean value indicating whether the registration was successful. + * @see AsyncEventOperationRepositoryFunction + * @see #getRepositoryFunctions() + */ + public boolean register(@NonNull AsyncEventOperationRepositoryFunction repositoryFunction) { + + if (repositoryFunction != null) { + getRepositoryFunctions().add(0, repositoryFunction); + return true; + } + + return false; + } + + /** + * Unregisters the given {@link AsyncEventOperationRepositoryFunction} from this listener. + * + * @param repositoryFunction {@link AsyncEventOperationRepositoryFunction} to unregister. + * @return a boolean value indicating whether the un-registration was successful. + * @see AsyncEventOperationRepositoryFunction + * @see #getRepositoryFunctions() + */ + public boolean unregister(@Nullable AsyncEventOperationRepositoryFunction repositoryFunction) { + return getRepositoryFunctions().remove(repositoryFunction); + } + + /** + * {@link AsyncEventError} is a wrapper class encapsulating the {@link AsyncEvent} along with + * the {@link Throwable error} that was thrown while processing the event. + * + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see java.lang.Throwable + */ + public static class AsyncEventError { + + private final AsyncEvent event; + + private final Throwable cause; + + /** + * Constructs a new instance of {@link AsyncEventError} initialized with the required {@link AsyncEvent} + * and {@link Throwable} thrown while processing the event. + * + * @param event processed {@link AsyncEvent}; must not be {@literal null}. + * @param cause {@link Throwable error} thrown while processing the event; must not be {@literal null}. + * @throws IllegalArgumentException if the {@link AsyncEvent} or the {@link Throwable} are {@literal null}. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see java.lang.Throwable + */ + public AsyncEventError(@NonNull AsyncEvent event, @NonNull Throwable cause) { + + Assert.notNull(event, "AsyncEvent must not be null"); + Assert.notNull(cause, "The cause must not be null"); + + this.event = event; + this.cause = cause; + } + + /** + * Gets the {@link Throwable} thrown while processing the {@link AsyncEvent}. + * + * @return the {@link Throwable} thrown while processing the {@link AsyncEvent}. + * @see java.lang.Throwable + */ + public @NonNull Throwable getCause() { + return this.cause; + } + + /** + * Gets the {@link AsyncEvent} being processed when the {@link Throwable error} occurred. + * + * @return the {@link AsyncEvent} being processed when the {@link Throwable error} occurred. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + */ + public @NonNull AsyncEvent getEvent() { + return this.event; + } + + /** + * @inheritDoc + */ + @Override + public String toString() { + return String.format("Error [%s] thrown when processing AsyncEvent [%s]", + getCause().getMessage(), getEvent()); + } + } + + /** + * The {@link AsyncEventErrorHandler} interface is a {@link Function} and {@link FunctionalInterface} used to + * handle errors while processing {@link AsyncEvent AsyncEvents}. + * + * @see java.lang.FunctionalInterface + * @see java.util.function.Function + * @see AsyncEventError + */ + @FunctionalInterface + public interface AsyncEventErrorHandler extends Function { } + + /** + * The {@link AsyncEventOperationRepositoryFunction} interface is a {@link Function} and {@link FunctionalInterface} + * that translates the {@link AsyncEvent} {@link Operation} into a Spring Data {@link CrudRepository} method + * invocation. + * + * @param {@link Class type} of the entity tied to the event. + * @param {@link Class type} of the identifier of the entity. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see java.lang.FunctionalInterface + * @see java.util.function.Function + */ + @FunctionalInterface + public interface AsyncEventOperationRepositoryFunction extends Function, Boolean> { + + /** + * Determines whether the given {@link AsyncEvent} can be processed by this {@link Function}. + * + * Implementing classes must override this method to specify which {@link AsyncEvent} + * {@link Operation Operations} they are capable of processing. + * + * @param event {@link AsyncEvent} to evaluate. + * @return a boolean value indicating whether this {@link Function} is capable of processing + * the given {@link AsyncEvent}. Default returns {@literal false}. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + */ + default boolean canProcess(@Nullable AsyncEvent event) { + return false; + } + } + + /** + * {@link AbstractAsyncEventOperationRepositoryFunction} is an abstract base class implementing the + * {@link AsyncEventOperationRepositoryFunction} interface to provided a default {@literal template} implementation + * of the {@link Function#apply(Object)} method. + * + * @param {@link Class type} of the entity tied to the event. + * @param {@link Class type} of the identifier of the entity. + * @see AsyncEventOperationRepositoryFunction + */ + protected abstract class AbstractAsyncEventOperationRepositoryFunction + implements AsyncEventOperationRepositoryFunction { + + /** + * Processes the given {@link AsyncEvent} by first determining whether the event can be processed by this + * {@link Function}, and then proceeds to extract the {@link AsyncEvent#getDeserializedValue() entity} + * associated with the event to invoke the appropriate Spring Data {@link CrudRepository} data access operation + * determined by the {@link AsyncEvent} {@link Operation}. + * + * If an {@link Throwable error} is thrown while processing the {@link AsyncEvent}, then the + * {@link AsyncEventErrorHandler} is called to handle the error and perform any necessary/required + * post-processing actions. + * + * {@link AsyncEventErrorHandler} can be implemented to retry the operation with incremental backoff, based on + * count or time, record the failure, perform resource cleanup actions, whatever is necessary and appropriate + * to the application use case. + * + * @param event {@link AsyncEvent} to process. + * @return a boolean value indicating whether the event was successfully processed. + * @throws IllegalStateException if the resolve entity is {@literal null}. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see AsyncEventErrorHandler + * @see #canProcess(AsyncEvent) + * @see #doRepositoryOp(Object) + * @see #resolveEntity(AsyncEvent) + * @see #getAsyncEventErrorHandler() + */ + @Override + public Boolean apply(@Nullable AsyncEvent event) { + + try { + if (canProcess(event)) { + + T entity = resolveEntity(event); + + doRepositoryOp(entity); + + return true; + } + + return false; + } + catch (Throwable cause) { + return getAsyncEventErrorHandler().apply(new AsyncEventError(event, cause)); + } + } + + /** + * Invokes the appropriate Spring Data {@link CrudRepository} data access operation based on the + * {@link AsyncEvent} {@link Operation} as determined by {@link AsyncEvent#getOperation()}. + * + * @param {@link Class type} of the Spring Data {@link CrudRepository} data access operation return value. + * @param entity entity to process. + * @return the result of invoking the Spring Data {@link CrudRepository} data access operation. + * @see org.springframework.data.repository.CrudRepository + */ + protected abstract R doRepositoryOp(@NonNull T entity); + + /** + * Resolves the {@link AsyncEvent#getDeserializedValue() entity} associated with the {@link AsyncEvent}. + * + * @param event {@link AsyncEvent} from which to resolve the entity. + * @return the resolve entity from the {@link AsyncEvent}. + * @throws IllegalArgumentException if {@link AsyncEvent} is {@literal null}. + * @throws IllegalStateException if the resolved {@link AsyncEvent#getDeserializedValue() entity} + * is {@literal null}. + * @see org.apache.geode.cache.asyncqueue.AsyncEvent#getDeserializedValue() + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + */ + protected T resolveEntity(@NonNull AsyncEvent event) { + + Assert.notNull(event, "AsyncEvent must not be null"); + + T entity = event.getDeserializedValue(); + + Assert.state(entity != null, "The entity (deserialized value) was null"); + + return entity; + } + } + + /** + * An {@link AsyncEventOperationRepositoryFunction} capable of handling {@link Operation#CREATE} + * and {@link Operation#UPDATE} {@link AsyncEvent AsyncEvents}. + * + * Invokes the {@link CrudRepository#save(Object)} data access operation. + * + * @param {@link Class Subtype} of the entity tied to the event. + * @param {@link Class type} of the identifier of the entity. + */ + public class CreateUpdateAsyncEventRepositoryFunction + extends AbstractAsyncEventOperationRepositoryFunction { + + /** + * @inheritDoc + */ + @Override + public boolean canProcess(@Nullable AsyncEvent event) { + + Operation operation = event != null ? event.getOperation() : null; + + return operation != null && (operation.isCreate() || operation.isUpdate()); + } + + /** + * @inheritDoc + */ + @Override + @SuppressWarnings("unchecked") + protected R doRepositoryOp(S entity) { + return (R) getRepository().save(entity); + } + } + + /** + * An {@link AsyncEventOperationRepositoryFunction} capable of handling {@link Operation#REMOVE} + * {@link AsyncEvent AsyncEvents}. + * + * Invokes the {@link CrudRepository#delete(Object)} data access operation. + * + * @param {@link Class Subtype} of the entity tied to the event. + * @param {@link Class type} of the identifier of the entity. + */ + public class RemoveAsyncEventRepositoryFunction + extends AbstractAsyncEventOperationRepositoryFunction { + + /** + * @inheritDoc + */ + @Override + public boolean canProcess(@Nullable AsyncEvent event) { + + Operation operation = event != null ? event.getOperation() : null; + + return Operation.REMOVE.equals(operation); + } + + /** + * @inheritDoc + */ + @Override + protected R doRepositoryOp(S entity) { + getRepository().delete(entity); + return null; + } + } +} diff --git a/spring-geode/src/test/java/org/springframework/geode/cache/RepositoryAsyncEventListenerUnitTests.java b/spring-geode/src/test/java/org/springframework/geode/cache/RepositoryAsyncEventListenerUnitTests.java new file mode 100644 index 000000000..3ef8effcf --- /dev/null +++ b/spring-geode/src/test/java/org/springframework/geode/cache/RepositoryAsyncEventListenerUnitTests.java @@ -0,0 +1,781 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.springframework.geode.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.junit.Test; +import org.mockito.InOrder; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.asyncqueue.AsyncEvent; + +import org.springframework.data.repository.CrudRepository; +import org.springframework.geode.cache.RepositoryAsyncEventListener.AbstractAsyncEventOperationRepositoryFunction; +import org.springframework.geode.cache.RepositoryAsyncEventListener.AsyncEventError; +import org.springframework.geode.cache.RepositoryAsyncEventListener.AsyncEventErrorHandler; +import org.springframework.geode.cache.RepositoryAsyncEventListener.AsyncEventOperationRepositoryFunction; +import org.springframework.geode.cache.RepositoryAsyncEventListener.CreateUpdateAsyncEventRepositoryFunction; +import org.springframework.geode.cache.RepositoryAsyncEventListener.RemoveAsyncEventRepositoryFunction; + +/** + * Unit Tests for {@link RepositoryAsyncEventListener}. + * + * @author John Blum + * @see org.junit.Test + * @see org.mockito.Mockito + * @see org.apache.geode.cache.Operation + * @see org.apache.geode.cache.asyncqueue.AsyncEvent + * @see org.apache.geode.cache.asyncqueue.AsyncEventListener + * @see org.springframework.data.repository.CrudRepository + * @see org.springframework.geode.cache.RepositoryAsyncEventListener + * @since 1.4.0 + */ +@SuppressWarnings({ "rawtypes", "unchecked"}) +public class RepositoryAsyncEventListenerUnitTests { + + @Test + public void constructRepositoryAsyncEventListener() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + + verifyNoInteractions(mockRepository); + } + + @Test(expected = IllegalArgumentException.class) + public void constructRepositoryAsyncEventListenerWithNullRepositoryThrowsIllegalArgumentException() { + + try { + new RepositoryAsyncEventListener<>(null); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("CrudRepository must not be null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test + public void setAndGetAsyncEventErrorHandler() { + + AsyncEventErrorHandler mockAsyncEventErrorHandler = mock(AsyncEventErrorHandler.class); + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + assertThat(listener.getAsyncEventErrorHandler()).isEqualTo(RepositoryAsyncEventListener.DEFAULT); + + listener.setAsyncEventErrorHandler(mockAsyncEventErrorHandler); + + assertThat(listener.getAsyncEventErrorHandler()).isEqualTo(mockAsyncEventErrorHandler); + + listener.setAsyncEventErrorHandler(null); + + assertThat(listener.getAsyncEventErrorHandler()).isEqualTo(RepositoryAsyncEventListener.DEFAULT); + + verifyNoInteractions(mockAsyncEventErrorHandler, mockRepository); + } + + @Test + public void getRepositoryFunctionsHandlesCreateUpdateAndRemove() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + List> repositoryFunctions = + listener.getRepositoryFunctions(); + + assertThat(repositoryFunctions).isNotNull(); + + List> repositoryFunctionTypes = repositoryFunctions.stream() + .map(Object::getClass) + .collect(Collectors.toList()); + + assertThat(repositoryFunctionTypes).containsExactly(CreateUpdateAsyncEventRepositoryFunction.class, + RemoveAsyncEventRepositoryFunction.class); + + verifyNoInteractions(mockRepository); + } + + @Test + public void registerAndUnregisterAsyncEventOperationRepositoryFunction() { + + AsyncEventOperationRepositoryFunction mockFunction = + mock(AsyncEventOperationRepositoryFunction.class); + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + assertThat(listener.getRepositoryFunctions()).hasSize(2); + assertThat(listener.getRepositoryFunctions().get(0).getClass()) + .isEqualTo(CreateUpdateAsyncEventRepositoryFunction.class); + assertThat(listener.getRepositoryFunctions().get(1).getClass()) + .isEqualTo(RemoveAsyncEventRepositoryFunction.class); + assertThat(listener.register(mockFunction)).isTrue(); + assertThat(listener.register(null)).isFalse(); + assertThat(listener.getRepositoryFunctions()).hasSize(3); + assertThat(listener.getRepositoryFunctions()).contains(mockFunction); + assertThat(listener.getRepositoryFunctions().get(0)).isEqualTo(mockFunction); + assertThat(listener.getRepositoryFunctions().get(1).getClass()) + .isEqualTo(CreateUpdateAsyncEventRepositoryFunction.class); + assertThat(listener.getRepositoryFunctions().get(2).getClass()) + .isEqualTo(RemoveAsyncEventRepositoryFunction.class); + assertThat(listener.unregister(null)).isFalse(); + assertThat(listener.unregister(mockFunction)).isTrue(); + assertThat(listener.getRepositoryFunctions()).hasSize(2); + assertThat(listener.getRepositoryFunctions().get(0).getClass()) + .isEqualTo(CreateUpdateAsyncEventRepositoryFunction.class); + assertThat(listener.getRepositoryFunctions().get(1).getClass()) + .isEqualTo(RemoveAsyncEventRepositoryFunction.class); + + verifyNoInteractions(mockFunction, mockRepository); + } + + @Test + public void processEventsIsNullSafe() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + assertThat(listener.processEvents(null)).isTrue(); + + verifyNoInteractions(mockRepository); + } + + @Test + public void processEventsIsSuccessful() { + + AsyncEvent mockEventOne = mock(AsyncEvent.class, "AsyncEventOne"); + AsyncEvent mockEventTwo = mock(AsyncEvent.class, "AsyncEventTwo"); + AsyncEvent mockEventThree = mock(AsyncEvent.class, "AsyncEventThree"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionA = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionA"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionB = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionB"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionC = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionC"); + + CrudRepository mockRepository = mock(CrudRepository.class); + + doAnswer(invocation -> { + + AsyncEvent event = invocation.getArgument(0); + + return mockEventOne.equals(event) || mockEventThree.equals(event); + + }).when(mockRepositoryFunctionA).canProcess(isA(AsyncEvent.class)); + + doAnswer(invocation -> { + mockRepository.save(invocation.getArgument(0)); + return true; + }).when(mockRepositoryFunctionA).apply(isA(AsyncEvent.class)); + + doAnswer(invocation -> { + + AsyncEvent event = invocation.getArgument(0); + + return mockEventTwo.equals(event) || mockEventThree.equals(event); + + }).when(mockRepositoryFunctionB).canProcess(isA(AsyncEvent.class)); + + doAnswer(invocation -> { + mockRepository.delete(invocation.getArgument(0)); + return true; + }).when(mockRepositoryFunctionB).apply(isA(AsyncEvent.class)); + + doReturn(false).when(mockRepositoryFunctionC).canProcess(any()); + + List mockEvents = Arrays.asList(mockEventOne, mockEventTwo, mockEventThree); + + RepositoryAsyncEventListener listener = spy(new RepositoryAsyncEventListener<>(mockRepository)); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + + doReturn(Arrays.asList(mockRepositoryFunctionC, mockRepositoryFunctionA, mockRepositoryFunctionB)) + .when(listener).getRepositoryFunctions(); + + assertThat(listener.processEvents(mockEvents)).isTrue(); + + InOrder order = + inOrder(mockRepository, mockRepositoryFunctionA, mockRepositoryFunctionB, mockRepositoryFunctionC); + + order.verify(mockRepositoryFunctionC, times(1)).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionA, times(1)).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionB, never()).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionA, times(1)).apply(eq(mockEventOne)); + order.verify(mockRepository, times(1)).save(eq(mockEventOne)); + order.verify(mockRepositoryFunctionC, times(1)).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionA, times(1)).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionB, times(1)).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionB, times(1)).apply(eq(mockEventTwo)); + order.verify(mockRepository, times(1)).delete(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionC, times(1)).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionA, times(1)).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionB, never()).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionA, times(1)).apply(eq(mockEventThree)); + order.verify(mockRepository, times(1)).save(eq(mockEventThree)); + + verify(mockRepositoryFunctionA, never()).apply(eq(mockEventTwo)); + verify(mockRepositoryFunctionB, never()).apply(eq(mockEventOne)); + verify(mockRepositoryFunctionB, never()).apply(eq(mockEventThree)); + verify(mockRepositoryFunctionC, never()).apply(any()); + verify(mockRepository, never()).delete(eq(mockEventOne)); + verify(mockRepository, never()).delete(eq(mockEventThree)); + verify(mockRepository, never()).save(eq(mockEventTwo)); + + verifyNoInteractions(mockEventOne, mockEventTwo, mockEventThree); + } + + @Test + public void processEventsIsUnsuccessfulWhenASingleFunctionApplyReturnsFalse() { + + AsyncEvent mockEventOne = mock(AsyncEvent.class, "AsyncEventOne"); + AsyncEvent mockEventTwo = mock(AsyncEvent.class, "AsyncEventTwo"); + AsyncEvent mockEventThree = mock(AsyncEvent.class, "AsyncEventThree"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionOne = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionOne"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionTwo = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionTwo"); + + AsyncEventOperationRepositoryFunction mockRepositoryFunctionThree = + mock(AsyncEventOperationRepositoryFunction.class, "RepositoryFunctionThree"); + + doReturn(true).when(mockRepositoryFunctionOne).canProcess(eq(mockEventOne)); + doReturn(true).when(mockRepositoryFunctionOne).apply(eq(mockEventOne)); + doReturn(true).when(mockRepositoryFunctionTwo).canProcess(eq(mockEventTwo)); + doReturn(false).when(mockRepositoryFunctionTwo).apply(eq(mockEventTwo)); + doReturn(true).when(mockRepositoryFunctionThree).canProcess(eq(mockEventThree)); + doReturn(true).when(mockRepositoryFunctionThree).apply(eq(mockEventThree)); + + CrudRepository mockRepository = mock(CrudRepository.class); + + List mockEvents = Arrays.asList(mockEventOne, mockEventTwo, mockEventThree); + + RepositoryAsyncEventListener listener = spy(new RepositoryAsyncEventListener<>(mockRepository)); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + + doReturn(Arrays.asList(mockRepositoryFunctionOne, mockRepositoryFunctionTwo, mockRepositoryFunctionThree)) + .when(listener).getRepositoryFunctions(); + + assertThat(listener.processEvents(mockEvents)).isFalse(); + + InOrder order = + inOrder(mockRepository, mockRepositoryFunctionOne, mockRepositoryFunctionTwo, mockRepositoryFunctionThree); + + order.verify(mockRepositoryFunctionOne, times(1)).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionTwo, never()).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionThree, never()).canProcess(eq(mockEventOne)); + order.verify(mockRepositoryFunctionOne, times(1)).apply(eq(mockEventOne)); + order.verify(mockRepositoryFunctionOne, times(1)).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionTwo, times(1)).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionThree, never()).canProcess(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionTwo, times(1)).apply(eq(mockEventTwo)); + order.verify(mockRepositoryFunctionOne, times(1)).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionTwo, times(1)).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionThree, times(1)).canProcess(eq(mockEventThree)); + order.verify(mockRepositoryFunctionThree, times(1)).apply(eq(mockEventThree)); + + verify(mockRepositoryFunctionOne, never()).apply(eq(mockEventTwo)); + verify(mockRepositoryFunctionOne, never()).apply(eq(mockEventThree)); + verify(mockRepositoryFunctionTwo, never()).apply(eq(mockEventOne)); + verify(mockRepositoryFunctionTwo, never()).apply(eq(mockEventThree)); + verify(mockRepositoryFunctionThree, never()).apply(eq(mockEventOne)); + verify(mockRepositoryFunctionThree, never()).apply(eq(mockEventTwo)); + + verifyNoInteractions(mockRepository, mockEventOne, mockEventTwo, mockEventThree); + } + + @Test + public void processEventsIsUnsuccessfulWhenNoFunctionCanProcessEvent() { + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + AsyncEventOperationRepositoryFunction mockRepositoryFunction = + mock(AsyncEventOperationRepositoryFunction.class); + + doReturn(false).when(mockRepositoryFunction).canProcess(any()); + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = spy(new RepositoryAsyncEventListener<>(mockRepository)); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + + doReturn(Collections.singletonList(mockRepositoryFunction)).when(listener).getRepositoryFunctions(); + + assertThat(listener.processEvents(Collections.singletonList(mockEvent))).isFalse(); + + verify(mockRepositoryFunction, times(1)).canProcess(eq(mockEvent)); + verify(mockRepositoryFunction, never()).apply(any()); + + verifyNoInteractions(mockEvent, mockRepository); + } + + @Test + public void processNoEventsIsSuccessful() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + RepositoryAsyncEventListener listener = new RepositoryAsyncEventListener<>(mockRepository); + + assertThat(listener).isNotNull(); + assertThat(listener.getRepository()).isEqualTo(mockRepository); + assertThat(listener.processEvents(Collections.emptyList())).isTrue(); + + verifyNoInteractions(mockRepository); + } + + @Test + public void constructAsyncEventError() { + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + Throwable cause = new RuntimeException("test"); + + AsyncEventError eventError = new AsyncEventError(mockEvent, cause); + + assertThat(eventError).isNotNull(); + assertThat(eventError.getCause()).isEqualTo(cause); + assertThat(eventError.getEvent()).isEqualTo(mockEvent); + + assertThat(eventError.toString()) + .isEqualTo("Error [test] thrown when processing AsyncEvent [%s]", mockEvent); + + verifyNoInteractions(mockEvent); + } + + @Test(expected = IllegalArgumentException.class) + public void constructAsyncEventErrorWithNullCauseThrowsIllegalArgumentException() { + + try { + new AsyncEventError(mock(AsyncEvent.class), null); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("The cause must not be null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test(expected = IllegalArgumentException.class) + public void constructAsyncEventErrorWithNullEventThrowsIllegalArgumentException() { + + try { + new AsyncEventError(null, new RuntimeException("test")); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("AsyncEvent must not be null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test + public void callAbstractAsyncEventOperationRepositoryFunctionApplyWhenFunctionCanProcessEvent() { + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + mock(AbstractAsyncEventOperationRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + Object entity = "test"; + + doReturn(true).when(repositoryFunction).canProcess(eq(mockEvent)); + doReturn(entity).when(repositoryFunction).resolveEntity(eq(mockEvent)); + doCallRealMethod().when(repositoryFunction).apply(any()); + + assertThat(repositoryFunction.apply(mockEvent)).isTrue(); + + InOrder order = inOrder(repositoryFunction); + + order.verify(repositoryFunction, times(1)).apply(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).canProcess(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).resolveEntity(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).doRepositoryOp(eq(entity)); + + verifyNoMoreInteractions(repositoryFunction); + verifyNoInteractions(mockEvent); + } + + @Test + public void callAbstractAsyncEventOperationRepositoryFunctionApplyWhenFunctionCannotProcessEvent() { + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + mock(AbstractAsyncEventOperationRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doReturn(false).when(repositoryFunction).canProcess(any()); + doCallRealMethod().when(repositoryFunction).apply(any()); + + assertThat(repositoryFunction.apply(mockEvent)).isFalse(); + + verify(repositoryFunction, times(1)).apply(eq(mockEvent)); + verify(repositoryFunction, times(1)).canProcess(eq(mockEvent)); + + verifyNoMoreInteractions(repositoryFunction); + verifyNoInteractions(mockEvent); + } + + @Test + public void callAbstractAsyncEventOperationRepositoryFunctionApplyWhenRepositoryOperationThrowsException() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + TestRepositoryAsyncEventListener listener = spy(new TestRepositoryAsyncEventListener(mockRepository)); + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + spy(listener.new TestAsyncEventOperationRepositoryFunction()); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + AsyncEventErrorHandler mockEventErrorHandler = mock(AsyncEventErrorHandler.class); + + Object entity = "mock"; + + doReturn(mockEventErrorHandler).when(listener).getAsyncEventErrorHandler(); + doReturn(true).when(repositoryFunction).canProcess(eq(mockEvent)); + doReturn(entity).when(repositoryFunction).resolveEntity(eq(mockEvent)); + + doAnswer(invocation -> { + + AsyncEventError eventError = invocation.getArgument(0); + + assertThat(eventError).isNotNull(); + assertThat(eventError.getCause()).isInstanceOf(UnsupportedOperationException.class); + assertThat(eventError.getCause().getMessage()).isEqualTo("Not Implemented"); + assertThat(eventError.getEvent()).isEqualTo(mockEvent); + + return false; + + }).when(mockEventErrorHandler).apply(any()); + + assertThat(repositoryFunction.apply(mockEvent)).isFalse(); + + InOrder order = inOrder(listener, mockEventErrorHandler, repositoryFunction); + + order.verify(repositoryFunction, times(1)).apply(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).canProcess(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).resolveEntity(eq(mockEvent)); + order.verify(repositoryFunction, times(1)).doRepositoryOp(eq(entity)); + order.verify(listener, times(1)).getAsyncEventErrorHandler(); + order.verify(mockEventErrorHandler, times(1)).apply(isA(AsyncEventError.class)); + + verifyNoMoreInteractions(repositoryFunction, listener, mockEventErrorHandler); + verifyNoInteractions(mockEvent, mockRepository); + } + + @Test + public void resolveEntityIsSuccessful() { + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + mock(AbstractAsyncEventOperationRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).resolveEntity(any()); + doReturn("mock").when(mockEvent).getDeserializedValue(); + + assertThat(repositoryFunction.resolveEntity(mockEvent)).isEqualTo("mock"); + + verify(mockEvent, times(1)).getDeserializedValue(); + verifyNoMoreInteractions(mockEvent); + } + + @Test(expected = IllegalArgumentException.class) + public void resolveEntityFromNullEventThrowsIllegalArgumentException() { + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + mock(AbstractAsyncEventOperationRepositoryFunction.class); + + doCallRealMethod().when(repositoryFunction).resolveEntity(any()); + + try { + repositoryFunction.resolveEntity(null); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("AsyncEvent must not be null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test(expected = IllegalStateException.class) + public void resolveNulEntityThrowsIllegalStateException() { + + AbstractAsyncEventOperationRepositoryFunction repositoryFunction = + mock(AbstractAsyncEventOperationRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).resolveEntity(any()); + doReturn(null).when(mockEvent).getDeserializedValue(); + + try { + repositoryFunction.resolveEntity(mockEvent); + } + catch (IllegalStateException expected) { + + assertThat(expected).hasMessage("The entity (deserialized value) was null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + finally { + verify(mockEvent, times(1)).getDeserializedValue(); + } + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionCanProcessCreateEventReturnsTrue() { + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + mock(CreateUpdateAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(Operation.CREATE).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isTrue(); + + verify(mockEvent, times(1)).getOperation(); + verifyNoMoreInteractions(mockEvent); + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionCanProcessUpdateEventReturnsTrue() { + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + mock(CreateUpdateAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(Operation.UPDATE).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isTrue(); + + verify(mockEvent, times(1)).getOperation(); + verifyNoMoreInteractions(mockEvent); + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionCanProcessEventWithNullOperationReturnsFalse() { + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + mock(CreateUpdateAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(null).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isFalse(); + + verify(mockEvent, times(1)).getOperation(); + verifyNoMoreInteractions(mockEvent); + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionCanProcessNullEventReturnsFalse() { + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + mock(CreateUpdateAsyncEventRepositoryFunction.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + assertThat(repositoryFunction.canProcess(null)).isFalse(); + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionCanProcessRemoveEventReturnsFalse() { + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + mock(CreateUpdateAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(Operation.REMOVE).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isFalse(); + + verify(mockEvent, times(1)).getOperation(); + verifyNoMoreInteractions(mockEvent); + } + + @Test + public void createUpdateAsyncEventRepositoryFunctionDoRepositoryOpCallsCrudRepositorySave() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + TestRepositoryAsyncEventListener listener = new TestRepositoryAsyncEventListener(mockRepository); + + CreateUpdateAsyncEventRepositoryFunction repositoryFunction = + spy(listener.new TestCreateUpdateAsyncEventOperationRepositoryFunction()); + + doReturn("TEST").when(mockRepository).save(any()); + + assertThat(repositoryFunction.doRepositoryOp("MOCK")).isEqualTo("TEST"); + + verify(mockRepository, times(1)).save(eq("MOCK")); + verifyNoMoreInteractions(mockRepository); + } + + @Test + public void removeAsyncEventRepositoryFunctionCanProcessRemoveEventReturnsTrue() { + + RemoveAsyncEventRepositoryFunction repositoryFunction = mock(RemoveAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(Operation.REMOVE).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isTrue(); + + verify(mockEvent, times(1)).getOperation(); + } + + @Test + public void removeAsyncEventRepositoryFunctionCanProcessCreateEventReturnsFalse() { + + RemoveAsyncEventRepositoryFunction repositoryFunction = mock(RemoveAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(Operation.CREATE).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isFalse(); + + verify(mockEvent, times(1)).getOperation(); + } + + @Test + public void removeAsyncEventRepositoryFunctionCanProcessEventWithNullOperationReturnsFalse() { + + RemoveAsyncEventRepositoryFunction repositoryFunction = mock(RemoveAsyncEventRepositoryFunction.class); + + AsyncEvent mockEvent = mock(AsyncEvent.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + doReturn(null).when(mockEvent).getOperation(); + + assertThat(repositoryFunction.canProcess(mockEvent)).isFalse(); + + verify(mockEvent, times(1)).getOperation(); + } + + @Test + public void removeAsyncEventRepositoryFunctionCanProcessNullEventReturnsFalse() { + + RemoveAsyncEventRepositoryFunction repositoryFunction = mock(RemoveAsyncEventRepositoryFunction.class); + + doCallRealMethod().when(repositoryFunction).canProcess(any()); + + assertThat(repositoryFunction.canProcess(null)).isFalse(); + } + + @Test + public void removeAsyncEventRepositoryFunctionDoRepositoryOpCallsCrudRepositoryDelete() { + + CrudRepository mockRepository = mock(CrudRepository.class); + + TestRepositoryAsyncEventListener listener = spy(new TestRepositoryAsyncEventListener(mockRepository)); + + doReturn(mockRepository).when(listener).getRepository(); + + RemoveAsyncEventRepositoryFunction repositoryFunction = + spy(listener.new TestRemoveAsyncEventRepositoryFunction()); + + assertThat(repositoryFunction.doRepositoryOp("MOCK")).isNull(); + + verify(mockRepository, times(1)).delete(eq("MOCK")); + verifyNoMoreInteractions(mockRepository); + } + + private static class TestRepositoryAsyncEventListener extends RepositoryAsyncEventListener { + + private TestRepositoryAsyncEventListener(CrudRepository repository) { + super(repository); + } + + private class TestAsyncEventOperationRepositoryFunction + extends AbstractAsyncEventOperationRepositoryFunction { + + @Override + protected R doRepositoryOp(T entity) { + throw new UnsupportedOperationException("Not Implemented"); + } + } + + private class TestCreateUpdateAsyncEventOperationRepositoryFunction + extends CreateUpdateAsyncEventRepositoryFunction { + + } + + private class TestRemoveAsyncEventRepositoryFunction extends RemoveAsyncEventRepositoryFunction { } + + } +}