From 841f9546a94f115d64ed013d7ad1ae1f9488d988 Mon Sep 17 00:00:00 2001 From: Hleb Albau Date: Mon, 14 May 2018 22:30:24 +0300 Subject: [PATCH] DATACASS-529 - Allow slice queries using reactive repositories. --- .../data/cassandra/core/QueryUtils.java | 23 ++++++++++ .../core/ReactiveCassandraOperations.java | 15 +++++++ .../core/ReactiveCassandraTemplate.java | 45 ++++++++++++++++++- .../query/AbstractReactiveCassandraQuery.java | 25 +++++++---- .../ReactiveCassandraQueryExecution.java | 32 +++++++++++++ ...veCassandraRepositoryIntegrationTests.java | 12 +++++ 6 files changed, 142 insertions(+), 10 deletions(-) diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/QueryUtils.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/QueryUtils.java index ab5029c97..0ff883c82 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/QueryUtils.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/QueryUtils.java @@ -17,7 +17,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Function; +import org.springframework.data.cassandra.ReactiveResultSet; import org.springframework.data.cassandra.core.cql.QueryOptions; import org.springframework.data.cassandra.core.cql.QueryOptionsUtil; import org.springframework.data.cassandra.core.cql.RowMapper; @@ -29,9 +31,11 @@ import org.springframework.data.domain.Slice; import org.springframework.data.domain.SliceImpl; import org.springframework.util.Assert; +import reactor.core.publisher.Mono; import com.datastax.driver.core.PagingState; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.Delete.Where; import com.datastax.driver.core.querybuilder.Insert; @@ -165,4 +169,23 @@ static Slice readSlice(ResultSet resultSet, RowMapper mapper, int page return new SliceImpl<>(result, pageRequest, pagingState != null); } + + /** + * Read a {@link Mono} of data from the {@link ReactiveResultSet} for a {@link Pageable}. + * + * @param resultSet must not be {@literal null}. + * @param fetchSize must not be {@literal null}. + * @param rowMapper must not be {@literal null}. + * @return the resulting {@link Slice}. + */ + static Mono> readSlice(ReactiveResultSet resultSet, Integer fetchSize, Function rowMapper) { + + return resultSet.rows().take(fetchSize).map(rowMapper).collectList() + .map(entities -> { + PagingState pagingState = resultSet.getExecutionInfo().getPagingState(); + PageRequest request = PageRequest.of(0, fetchSize); + CassandraPageRequest pageRequest = CassandraPageRequest.of(request, pagingState); + return new SliceImpl<>(entities, pageRequest, pagingState != null); + }); + } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraOperations.java index 61bf28e0f..fa0706ee8 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraOperations.java @@ -25,6 +25,8 @@ import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.core.query.Query; import org.springframework.data.cassandra.core.query.Update; +import org.springframework.data.domain.Slice; + import com.datastax.driver.core.Statement; @@ -97,6 +99,19 @@ public interface ReactiveCassandraOperations { */ Flux select(Statement statement, Class entityClass) throws DataAccessException; + /** + * Execute a {@code SELECT} query with paging and convert the result set to a {@link Slice} of entities. + * + * A sliced query translates the effective {@link Statement#getFetchSize() fetch size} to the page size. + * + * @param statement the CQL statement, must not be {@literal null}. + * @param entityClass The entity type must not be {@literal null}. + * @return the result object returned by the action or {@link Mono#empty()} + * @throws DataAccessException if there is any problem executing the query. + * @since 2.0 + */ + Mono> slice(Statement statement, Class entityClass) throws DataAccessException; + /** * Execute a {@code SELECT} query and convert the resulting item to an entity. * diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java index d9c10db44..e7410aa55 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java @@ -15,6 +15,8 @@ */ package org.springframework.data.cassandra.core; +import java.util.function.Function; + import lombok.NonNull; import lombok.Value; @@ -30,6 +32,7 @@ import org.springframework.data.cassandra.core.convert.MappingCassandraConverter; import org.springframework.data.cassandra.core.convert.QueryMapper; import org.springframework.data.cassandra.core.convert.UpdateMapper; +import org.springframework.data.cassandra.core.cql.CassandraAccessor; import org.springframework.data.cassandra.core.cql.CqlIdentifier; import org.springframework.data.cassandra.core.cql.CqlProvider; import org.springframework.data.cassandra.core.cql.QueryOptions; @@ -41,6 +44,7 @@ import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty; import org.springframework.data.cassandra.core.query.Query; +import org.springframework.data.domain.Slice; import org.springframework.data.mapping.context.MappingContext; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -48,6 +52,7 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.Row; import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.Insert; @@ -244,6 +249,23 @@ public Flux select(Statement cql, Class entityClass) { return getReactiveCqlOperations().query(cql, (row, rowNum) -> getConverter().read(entityClass, row)); } + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.CassandraOperations#slice(com.datastax.driver.core.Statement, java.lang.Class) + */ + @Override + public Mono> slice(Statement statement, Class entityClass) { + + Assert.notNull(statement, "Statement must not be null"); + Assert.notNull(entityClass, "Entity type must not be null"); + + Mono resultSetMono = getReactiveCqlOperations().queryForResultSet(statement); + Mono effectiveFetchSizeMono = getEffectiveFetchSize(statement); + Function rowMapper = (row) -> getConverter().read(entityClass, row); + + return Mono.zip(resultSetMono, effectiveFetchSizeMono) + .flatMap(tuple -> QueryUtils.readSlice(tuple.getT1(), tuple.getT2(), rowMapper)); + } + /* (non-Javadoc) * @see org.springframework.data.cassandra.core.ReactiveCassandraOperations#selectOne(com.datastax.driver.core.Statement, java.lang.Class) */ @@ -462,8 +484,27 @@ public Mono truncate(Class entityClass) { return getReactiveCqlOperations().execute(truncate).then(); } - @Value - static class StatementCallback implements ReactiveSessionCallback, CqlProvider { + @SuppressWarnings("ConstantConditions") + private Mono getEffectiveFetchSize(Statement statement) { + + if (statement.getFetchSize() > 0) { + return Mono.just(statement.getFetchSize()); + } + + if (getReactiveCqlOperations() instanceof CassandraAccessor) { + CassandraAccessor accessor = (CassandraAccessor) getReactiveCqlOperations(); + if (accessor.getFetchSize() != -1) { + return Mono.just(accessor.getFetchSize()); + } + } + + return getReactiveCqlOperations().execute((ReactiveSessionCallback) session -> + Mono.fromSupplier(() -> session.getCluster().getConfiguration().getQueryOptions().getFetchSize()) + ).single(); + } + + @Value + static class StatementCallback implements ReactiveSessionCallback, CqlProvider { @NonNull Statement statement; diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/AbstractReactiveCassandraQuery.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/AbstractReactiveCassandraQuery.java index 88a98bea5..d4337bc40 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/AbstractReactiveCassandraQuery.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/AbstractReactiveCassandraQuery.java @@ -15,6 +15,7 @@ */ package org.springframework.data.cassandra.repository.query; +import org.springframework.data.cassandra.repository.query.ReactiveCassandraQueryExecution.SlicedExecution; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -102,8 +103,9 @@ private Object executeNow(Object[] parameters) { ResultProcessor resultProcessor = getQueryMethod().getResultProcessor() .withDynamicProjection(convertingParameterAccessor); - ReactiveCassandraQueryExecution queryExecution = getExecution(new ResultProcessingConverter(resultProcessor, - getReactiveCassandraOperations().getConverter().getMappingContext(), getEntityInstantiators())); + ReactiveCassandraQueryExecution queryExecution = getExecution(parameterAccessor, + new ResultProcessingConverter(resultProcessor, + getReactiveCassandraOperations().getConverter().getMappingContext(), getEntityInstantiators())); Class resultType = resolveResultType(resultProcessor); @@ -127,17 +129,24 @@ private Class resolveResultType(ResultProcessor resultProcessor) { /** * Returns the execution instance to use. - * + * @param parameterAccessor must not be {@literal null}. * @param resultProcessing must not be {@literal null}. @return */ - private ReactiveCassandraQueryExecution getExecution(Converter resultProcessing) { - return new ResultProcessingExecution(getExecutionToWrap(), resultProcessing); + private ReactiveCassandraQueryExecution getExecution(CassandraParameterAccessor parameterAccessor, + Converter resultProcessing) { + return new ResultProcessingExecution(getExecutionToWrap(parameterAccessor), resultProcessing); } /* (non-Javadoc) */ - private ReactiveCassandraQueryExecution getExecutionToWrap() { - return (getQueryMethod().isCollectionQuery() ? new CollectionExecution(getReactiveCassandraOperations()) - : new SingleEntityExecution(getReactiveCassandraOperations(), isLimiting())); + private ReactiveCassandraQueryExecution getExecutionToWrap(CassandraParameterAccessor parameterAccessor) { + + if (getQueryMethod().isSliceQuery()) { + return new SlicedExecution(getReactiveCassandraOperations(), parameterAccessor.getPageable()); + } else if (getQueryMethod().isCollectionQuery()) { + return new CollectionExecution(getReactiveCassandraOperations()); + } else { + return new SingleEntityExecution(getReactiveCassandraOperations(), isLimiting()); + } } /** diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraQueryExecution.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraQueryExecution.java index da9b4bfd0..73484c7a8 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraQueryExecution.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/repository/query/ReactiveCassandraQueryExecution.java @@ -23,7 +23,10 @@ import org.springframework.data.cassandra.core.ReactiveCassandraOperations; import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty; +import org.springframework.data.cassandra.core.query.CassandraPageRequest; import org.springframework.data.convert.EntityInstantiators; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ReturnedType; @@ -42,6 +45,35 @@ interface ReactiveCassandraQueryExecution { Object execute(Statement statement, Class type); + /** + * {@link CassandraQueryExecution} for a {@link Slice}. + * + * @author Hleb Albau + */ + @RequiredArgsConstructor + final class SlicedExecution implements ReactiveCassandraQueryExecution { + + private final @NonNull ReactiveCassandraOperations operations; + private final @NonNull Pageable pageable; + + /* (non-Javadoc) + * @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class) + */ + @Override + public Object execute(Statement statement, Class type) { + + CassandraPageRequest.validatePageable(pageable); + + Statement statementToUse = statement.setFetchSize(pageable.getPageSize()); + + if (pageable instanceof CassandraPageRequest) { + statementToUse = statementToUse.setPagingState(((CassandraPageRequest) pageable).getPagingState()); + } + + return operations.slice(statementToUse, type); + } + } + /** * {@link ReactiveCassandraQueryExecution} for collection returning queries. * diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/ReactiveCassandraRepositoryIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/ReactiveCassandraRepositoryIntegrationTests.java index d6ed54173..e20266f94 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/ReactiveCassandraRepositoryIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/ReactiveCassandraRepositoryIntegrationTests.java @@ -35,6 +35,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.data.cassandra.core.ReactiveCassandraOperations; +import org.springframework.data.cassandra.core.query.CassandraPageRequest; import org.springframework.data.cassandra.domain.Group; import org.springframework.data.cassandra.domain.GroupKey; import org.springframework.data.cassandra.domain.User; @@ -42,6 +43,8 @@ import org.springframework.data.cassandra.repository.support.ReactiveCassandraRepositoryFactory; import org.springframework.data.cassandra.repository.support.SimpleReactiveCassandraRepository; import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.repository.query.DefaultEvaluationContextProvider; @@ -128,6 +131,13 @@ public void shouldFindByLastName() { StepVerifier.create(repository.findByLastname(dave.getLastname())).expectNextCount(2).verifyComplete(); } + @Test //DATACASS-529 + public void shouldFindSliceByLastName() { + StepVerifier.create(repository.findByLastname(carter.getLastname(), CassandraPageRequest.first(1))) + .expectNextMatches(users -> users.getSize() == 1 && users.hasNext()) + .verifyComplete(); + } + @Test // DATACASS-525 public void findOneWithManyResultsShouldFail() { StepVerifier.create(repository.findOneByLastname(dave.getLastname())) @@ -185,6 +195,8 @@ interface UserRepository extends ReactiveCassandraRepository { Flux findByLastname(String lastname); + Mono> findByLastname(String firstname, Pageable pageable); + Mono findFirstByLastname(String lastname); Mono findOneByLastname(String lastname);