Skip to content

Commit

Permalink
DATACASS-618 - Integrate EntityCallbacks.
Browse files Browse the repository at this point in the history
We now support EntityCallbacks via DefaultEntityCallbacks and DefaultReactiveEntityCallbacks.
  • Loading branch information
mp911de committed Jun 7, 2019
1 parent e972181 commit 9de890c
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 90 deletions.
Expand Up @@ -21,6 +21,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
Expand All @@ -41,16 +44,20 @@
import org.springframework.data.cassandra.core.cql.CqlProvider;
import org.springframework.data.cassandra.core.cql.GuavaListenableFutureAdapter;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.cql.session.DefaultSessionFactory;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeConvertCallback;
import org.springframework.data.cassandra.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveCallback;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.domain.Slice;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -89,7 +96,8 @@
* @see org.springframework.data.cassandra.core.AsyncCassandraOperations
* @since 2.0
*/
public class AsyncCassandraTemplate implements AsyncCassandraOperations, ApplicationEventPublisherAware {
public class AsyncCassandraTemplate
implements AsyncCassandraOperations, ApplicationEventPublisherAware, ApplicationContextAware {

private final AsyncCqlOperations cqlOperations;

Expand All @@ -105,6 +113,8 @@ public class AsyncCassandraTemplate implements AsyncCassandraOperations, Applica

private @Nullable ApplicationEventPublisher eventPublisher;

private @Nullable EntityCallbacks entityCallbacks;

/**
* Creates an instance of {@link AsyncCassandraTemplate} initialized with the given {@link Session} and a default
* {@link MappingCassandraConverter}.
Expand Down Expand Up @@ -176,6 +186,29 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.eventPublisher = applicationEventPublisher;
}

/* (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

if (entityCallbacks == null) {
setEntityCallbacks(EntityCallbacks.create(applicationContext));
}

projectionFactory.setBeanFactory(applicationContext);
projectionFactory.setBeanClassLoader(applicationContext.getClassLoader());
}

/**
* Configure {@link EntityCallbacks} to pre-/post-process entities during persistence operations.
*
* @param entityCallbacks
*/
public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) {
this.entityCallbacks = entityCallbacks;
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.core.AsyncCassandraOperations#getAsyncCqlOperations()
*/
Expand Down Expand Up @@ -541,9 +574,14 @@ public <T> ListenableFuture<EntityWriteResult<T>> insert(T entity, InsertOptions
Assert.notNull(entity, "Entity must not be null");
Assert.notNull(options, "InsertOptions must not be null");

AdaptibleEntity<T> source = getEntityOperations().forEntity(entity, getConverter().getConversionService());
return doInsert(entity, options, getTableName(entity.getClass()));
}

private <T> ListenableFuture<EntityWriteResult<T>> doInsert(T entity, WriteOptions options, CqlIdentifier tableName) {

AdaptibleEntity<T> source = getEntityOperations().forEntity(maybeCallBeforeConvert(entity, tableName),
getConverter().getConversionService());
CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
CqlIdentifier tableName = persistentEntity.getTableName();

T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entity;

Expand Down Expand Up @@ -595,24 +633,26 @@ public <T> ListenableFuture<EntityWriteResult<T>> update(T entity, UpdateOptions
CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
CqlIdentifier tableName = persistentEntity.getTableName();

return source.isVersionedEntity() ? doUpdateVersioned(source, options, tableName, persistentEntity)
: doUpdate(entity, options, tableName, persistentEntity);
T entityToUpdate = maybeCallBeforeConvert(entity, tableName);

return source.isVersionedEntity() ? doUpdateVersioned(entityToUpdate, options, tableName, persistentEntity)
: doUpdate(entityToUpdate, options, tableName, persistentEntity);
}

private <T> ListenableFuture<EntityWriteResult<T>> doUpdateVersioned(AdaptibleEntity<T> source, UpdateOptions options,
private <T> ListenableFuture<EntityWriteResult<T>> doUpdateVersioned(T entity, UpdateOptions options,
CqlIdentifier tableName, CassandraPersistentEntity<?> persistentEntity) {

AdaptibleEntity<T> source = getEntityOperations().forEntity(entity, getConverter().getConversionService());
Number previousVersion = source.getVersion();
T toSave = source.incrementVersion();

T entity = source.incrementVersion();
Update update = getStatementFactory().update(toSave, options, getConverter(), persistentEntity, tableName);

Update update = getStatementFactory().update(entity, options, getConverter(), persistentEntity, tableName);

return executeSave(entity, tableName, source.appendVersionCondition(update, previousVersion), result -> {
return executeSave(toSave, tableName, source.appendVersionCondition(update, previousVersion), result -> {

if (!result.wasApplied()) {
throw new OptimisticLockingFailureException(
String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", entity,
String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", toSave,
source.getVersion(), tableName));
}
});
Expand Down Expand Up @@ -726,16 +766,17 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
Consumer<WriteResult> beforeAfterSaveEvent) {

maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement));
T entityToSave = maybeCallBeforeSave(entity, tableName, statement);

ListenableFuture<ResultSet> result = getAsyncCqlOperations().execute(new AsyncStatementCallback(statement));

return new MappingListenableFutureAdapter<>(result, resultSet -> {

EntityWriteResult<T> writeResult = EntityWriteResult.of(resultSet, entity);
EntityWriteResult<T> writeResult = EntityWriteResult.of(resultSet, entityToSave);

beforeAfterSaveEvent.accept(writeResult);

maybeEmitEvent(new AfterSaveEvent<>(entity, tableName));
maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName));

return writeResult;
});
Expand Down Expand Up @@ -825,6 +866,24 @@ private void maybeEmitEvent(ApplicationEvent event) {
}
}

protected <T> T maybeCallBeforeConvert(T object, CqlIdentifier tableName) {

if (null != entityCallbacks) {
return (T) entityCallbacks.callback(BeforeConvertCallback.class, object, tableName);
}

return object;
}

protected <T> T maybeCallBeforeSave(T object, CqlIdentifier tableName, Statement statement) {

if (null != entityCallbacks) {
return (T) entityCallbacks.callback(BeforeSaveCallback.class, object, tableName, statement);
}

return object;
}

static class MappingListenableFutureAdapter<T, S>
extends org.springframework.util.concurrent.ListenableFutureAdapter<T, S> {

Expand Down
Expand Up @@ -23,6 +23,9 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
Expand All @@ -49,11 +52,14 @@
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeConvertCallback;
import org.springframework.data.cassandra.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveCallback;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.cassandra.core.query.Columns;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.domain.Slice;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
Expand Down Expand Up @@ -92,10 +98,12 @@
* @see org.springframework.data.cassandra.core.CassandraOperations
* @since 2.0
*/
public class CassandraTemplate implements CassandraOperations, ApplicationEventPublisherAware {
public class CassandraTemplate implements CassandraOperations, ApplicationEventPublisherAware, ApplicationContextAware {

private @Nullable ApplicationEventPublisher eventPublisher;

private @Nullable EntityCallbacks entityCallbacks;

private final CassandraConverter converter;

private final CqlOperations cqlOperations;
Expand Down Expand Up @@ -187,6 +195,29 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.eventPublisher = applicationEventPublisher;
}

/* (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

if (entityCallbacks == null) {
setEntityCallbacks(EntityCallbacks.create(applicationContext));
}

projectionFactory.setBeanFactory(applicationContext);
projectionFactory.setBeanClassLoader(applicationContext.getClassLoader());
}

/**
* Configure {@link EntityCallbacks} to pre-/post-process entities during persistence operations.
*
* @param entityCallbacks
*/
public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) {
this.entityCallbacks = entityCallbacks;
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.core.CassandraOperations#getConverter()
*/
Expand Down Expand Up @@ -603,7 +634,8 @@ public <T> EntityWriteResult<T> insert(T entity, InsertOptions options) {

<T> EntityWriteResult<T> doInsert(T entity, WriteOptions options, CqlIdentifier tableName) {

AdaptibleEntity<T> source = getEntityOperations().forEntity(entity, getConverter().getConversionService());
AdaptibleEntity<T> source = getEntityOperations().forEntity(maybeCallBeforeConvert(entity, tableName),
getConverter().getConversionService());
CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());

T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entity;
Expand Down Expand Up @@ -653,24 +685,27 @@ public <T> EntityWriteResult<T> update(T entity, UpdateOptions options) {
CassandraPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
CqlIdentifier tableName = persistentEntity.getTableName();

return source.isVersionedEntity() ? doUpdateVersioned(source, options, tableName, persistentEntity)
: doUpdate(entity, options, tableName, persistentEntity);
T entityToUpdate = maybeCallBeforeConvert(entity, tableName);

return source.isVersionedEntity() ? doUpdateVersioned(entityToUpdate, options, tableName, persistentEntity)
: doUpdate(entityToUpdate, options, tableName, persistentEntity);
}

private <T> EntityWriteResult<T> doUpdateVersioned(AdaptibleEntity<T> source, UpdateOptions options,
CqlIdentifier tableName, CassandraPersistentEntity<?> persistentEntity) {
private <T> EntityWriteResult<T> doUpdateVersioned(T entity, UpdateOptions options, CqlIdentifier tableName,
CassandraPersistentEntity<?> persistentEntity) {

Number previousVersion = source.getVersion();
AdaptibleEntity<T> source = getEntityOperations().forEntity(entity, getConverter().getConversionService());

T entity = source.incrementVersion();
Number previousVersion = source.getVersion();
T toSave = source.incrementVersion();

Update update = getStatementFactory().update(entity, options, getConverter(), persistentEntity, tableName);
Update update = getStatementFactory().update(toSave, options, getConverter(), persistentEntity, tableName);

return executeSave(entity, tableName, source.appendVersionCondition(update, previousVersion), result -> {
return executeSave(toSave, tableName, source.appendVersionCondition(update, previousVersion), result -> {

if (!result.wasApplied()) {
throw new OptimisticLockingFailureException(
String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", entity,
String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", toSave,
source.getVersion(), tableName));
}
});
Expand Down Expand Up @@ -818,13 +853,14 @@ private <T> EntityWriteResult<T> executeSave(T entity, CqlIdentifier tableName,
Consumer<WriteResult> resultConsumer) {

maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement));
T entityToSave = maybeCallBeforeSave(entity, tableName, statement);

WriteResult result = getCqlOperations().execute(new StatementCallback(statement));
resultConsumer.accept(result);

maybeEmitEvent(new AfterSaveEvent<>(entity, tableName));
maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName));

return EntityWriteResult.of(result, entity);
return EntityWriteResult.of(result, entityToSave);
}

private WriteResult executeDelete(Object entity, CqlIdentifier tableName, Statement statement,
Expand Down Expand Up @@ -903,6 +939,24 @@ private void maybeEmitEvent(ApplicationEvent event) {
}
}

protected <T> T maybeCallBeforeConvert(T object, CqlIdentifier tableName) {

if (null != entityCallbacks) {
return (T) entityCallbacks.callback(BeforeConvertCallback.class, object, tableName);
}

return object;
}

protected <T> T maybeCallBeforeSave(T object, CqlIdentifier tableName, Statement statement) {

if (null != entityCallbacks) {
return (T) entityCallbacks.callback(BeforeSaveCallback.class, object, tableName, statement);
}

return object;
}

@Value
static class StatementCallback implements SessionCallback<WriteResult>, CqlProvider {

Expand Down

0 comments on commit 9de890c

Please sign in to comment.