Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.RowMapper;
Expand All @@ -29,9 +31,11 @@
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Delete.Where;
import com.datastax.driver.core.querybuilder.Insert;
Expand Down Expand Up @@ -165,4 +169,23 @@ static <T> Slice<T> readSlice(ResultSet resultSet, RowMapper<T> mapper, int page

return new SliceImpl<>(result, pageRequest, pagingState != null);
}

/**
* Read a {@link Mono<Slice>} of data from the {@link ReactiveResultSet} for a {@link Pageable}.
*
* @param resultSet must not be {@literal null}.
* @param fetchSize must not be {@literal null}.
* @param rowMapper must not be {@literal null}.
* @return the resulting {@link Slice}.
*/
static <T> Mono<Slice<T>> readSlice(ReactiveResultSet resultSet, Integer fetchSize, Function<Row, T> rowMapper) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't introduce dependencies to reactive types here as we want the module to run without reactive dependencies as well.

We also need to be careful about fetching/reading progress. If the fetch size differs from the actual number of rows returned for a page, we might request the next page without intending to do so. I think we need another method on ReactiveResultSet that gives us a Flux without transparent next page fetching.


return resultSet.rows().take(fetchSize).map(rowMapper).collectList()
.map(entities -> {
PagingState pagingState = resultSet.getExecutionInfo().getPagingState();
PageRequest request = PageRequest.of(0, fetchSize);
CassandraPageRequest pageRequest = CassandraPageRequest.of(request, pagingState);
return new SliceImpl<>(entities, pageRequest, pagingState != null);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
import org.springframework.data.domain.Slice;


import com.datastax.driver.core.Statement;

Expand Down Expand Up @@ -97,6 +99,19 @@ public interface ReactiveCassandraOperations {
*/
<T> Flux<T> select(Statement statement, Class<T> entityClass) throws DataAccessException;

/**
* Execute a {@code SELECT} query with paging and convert the result set to a {@link Slice} of entities.
*
* A sliced query translates the effective {@link Statement#getFetchSize() fetch size} to the page size.
*
* @param statement the CQL statement, must not be {@literal null}.
* @param entityClass The entity type must not be {@literal null}.
* @return the result object returned by the action or {@link Mono#empty()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we're mixing paradigms. We should return an empty Slice (see Page.empty()) as the query terminates without rows and the query result is the container for rows, not the rows themselves.

* @throws DataAccessException if there is any problem executing the query.
* @since 2.0
*/
<T> Mono<Slice<T>> slice(Statement statement, Class<T> entityClass) throws DataAccessException;

/**
* Execute a {@code SELECT} query and convert the resulting item to an entity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.cassandra.core;

import java.util.function.Function;

import lombok.NonNull;
import lombok.Value;

Expand All @@ -30,6 +32,7 @@
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
import org.springframework.data.cassandra.core.convert.QueryMapper;
import org.springframework.data.cassandra.core.convert.UpdateMapper;
import org.springframework.data.cassandra.core.cql.CassandraAccessor;
import org.springframework.data.cassandra.core.cql.CqlIdentifier;
import org.springframework.data.cassandra.core.cql.CqlProvider;
import org.springframework.data.cassandra.core.cql.QueryOptions;
Expand All @@ -41,13 +44,15 @@
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.domain.Slice;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Insert;
Expand Down Expand Up @@ -244,6 +249,23 @@ public <T> Flux<T> select(Statement cql, Class<T> entityClass) {
return getReactiveCqlOperations().query(cql, (row, rowNum) -> getConverter().read(entityClass, row));
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.core.CassandraOperations#slice(com.datastax.driver.core.Statement, java.lang.Class)
*/
@Override
public <T> Mono<Slice<T>> slice(Statement statement, Class<T> entityClass) {

Assert.notNull(statement, "Statement must not be null");
Assert.notNull(entityClass, "Entity type must not be null");

Mono<ReactiveResultSet> resultSetMono = getReactiveCqlOperations().queryForResultSet(statement);
Mono<Integer> effectiveFetchSizeMono = getEffectiveFetchSize(statement);
Function<Row,T> rowMapper = (row) -> getConverter().read(entityClass, row);

return Mono.zip(resultSetMono, effectiveFetchSizeMono)
.flatMap(tuple -> QueryUtils.readSlice(tuple.getT1(), tuple.getT2(), rowMapper));
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.core.ReactiveCassandraOperations#selectOne(com.datastax.driver.core.Statement, java.lang.Class)
*/
Expand Down Expand Up @@ -462,8 +484,27 @@ public Mono<Void> truncate(Class<?> entityClass) {
return getReactiveCqlOperations().execute(truncate).then();
}

@Value
static class StatementCallback implements ReactiveSessionCallback<WriteResult>, CqlProvider {
@SuppressWarnings("ConstantConditions")
private Mono<Integer> getEffectiveFetchSize(Statement statement) {

if (statement.getFetchSize() > 0) {
return Mono.just(statement.getFetchSize());
}

if (getReactiveCqlOperations() instanceof CassandraAccessor) {
CassandraAccessor accessor = (CassandraAccessor) getReactiveCqlOperations();
if (accessor.getFetchSize() != -1) {
return Mono.just(accessor.getFetchSize());
}
}

return getReactiveCqlOperations().execute((ReactiveSessionCallback<Integer>) session ->
Mono.fromSupplier(() -> session.getCluster().getConfiguration().getQueryOptions().getFetchSize())
).single();
}

@Value
static class StatementCallback implements ReactiveSessionCallback<WriteResult>, CqlProvider {

@NonNull Statement statement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.cassandra.repository.query;

import org.springframework.data.cassandra.repository.query.ReactiveCassandraQueryExecution.SlicedExecution;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -102,8 +103,9 @@ private Object executeNow(Object[] parameters) {
ResultProcessor resultProcessor = getQueryMethod().getResultProcessor()
.withDynamicProjection(convertingParameterAccessor);

ReactiveCassandraQueryExecution queryExecution = getExecution(new ResultProcessingConverter(resultProcessor,
getReactiveCassandraOperations().getConverter().getMappingContext(), getEntityInstantiators()));
ReactiveCassandraQueryExecution queryExecution = getExecution(parameterAccessor,
new ResultProcessingConverter(resultProcessor,
getReactiveCassandraOperations().getConverter().getMappingContext(), getEntityInstantiators()));

Class<?> resultType = resolveResultType(resultProcessor);

Expand All @@ -127,17 +129,24 @@ private Class<?> resolveResultType(ResultProcessor resultProcessor) {

/**
* Returns the execution instance to use.
*
* @param parameterAccessor must not be {@literal null}.
* @param resultProcessing must not be {@literal null}. @return
*/
private ReactiveCassandraQueryExecution getExecution(Converter<Object, Object> resultProcessing) {
return new ResultProcessingExecution(getExecutionToWrap(), resultProcessing);
private ReactiveCassandraQueryExecution getExecution(CassandraParameterAccessor parameterAccessor,
Converter<Object, Object> resultProcessing) {
return new ResultProcessingExecution(getExecutionToWrap(parameterAccessor), resultProcessing);
}

/* (non-Javadoc) */
private ReactiveCassandraQueryExecution getExecutionToWrap() {
return (getQueryMethod().isCollectionQuery() ? new CollectionExecution(getReactiveCassandraOperations())
: new SingleEntityExecution(getReactiveCassandraOperations(), isLimiting()));
private ReactiveCassandraQueryExecution getExecutionToWrap(CassandraParameterAccessor parameterAccessor) {

if (getQueryMethod().isSliceQuery()) {
return new SlicedExecution(getReactiveCassandraOperations(), parameterAccessor.getPageable());
} else if (getQueryMethod().isCollectionQuery()) {
return new CollectionExecution(getReactiveCassandraOperations());
} else {
return new SingleEntityExecution(getReactiveCassandraOperations(), isLimiting());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty;
import org.springframework.data.cassandra.core.query.CassandraPageRequest;
import org.springframework.data.convert.EntityInstantiators;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
Expand All @@ -42,6 +45,35 @@ interface ReactiveCassandraQueryExecution {

Object execute(Statement statement, Class<?> type);

/**
* {@link CassandraQueryExecution} for a {@link Slice}.
*
* @author Hleb Albau
*/
@RequiredArgsConstructor
final class SlicedExecution implements ReactiveCassandraQueryExecution {

private final @NonNull ReactiveCassandraOperations operations;
private final @NonNull Pageable pageable;

/* (non-Javadoc)
* @see org.springframework.data.cassandra.repository.query.CassandraQueryExecution#execute(java.lang.String, java.lang.Class)
*/
@Override
public Object execute(Statement statement, Class<?> type) {

CassandraPageRequest.validatePageable(pageable);

Statement statementToUse = statement.setFetchSize(pageable.getPageSize());

if (pageable instanceof CassandraPageRequest) {
statementToUse = statementToUse.setPagingState(((CassandraPageRequest) pageable).getPagingState());
}

return operations.slice(statementToUse, type);
}
}

/**
* {@link ReactiveCassandraQueryExecution} for collection returning queries.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.query.CassandraPageRequest;
import org.springframework.data.cassandra.domain.Group;
import org.springframework.data.cassandra.domain.GroupKey;
import org.springframework.data.cassandra.domain.User;
import org.springframework.data.cassandra.repository.support.IntegrationTestConfig;
import org.springframework.data.cassandra.repository.support.ReactiveCassandraRepositoryFactory;
import org.springframework.data.cassandra.repository.support.SimpleReactiveCassandraRepository;
import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.repository.query.DefaultEvaluationContextProvider;
Expand Down Expand Up @@ -128,6 +131,13 @@ public void shouldFindByLastName() {
StepVerifier.create(repository.findByLastname(dave.getLastname())).expectNextCount(2).verifyComplete();
}

@Test //DATACASS-529
public void shouldFindSliceByLastName() {
StepVerifier.create(repository.findByLastname(carter.getLastname(), CassandraPageRequest.first(1)))
.expectNextMatches(users -> users.getSize() == 1 && users.hasNext())
.verifyComplete();
}

@Test // DATACASS-525
public void findOneWithManyResultsShouldFail() {
StepVerifier.create(repository.findOneByLastname(dave.getLastname()))
Expand Down Expand Up @@ -185,6 +195,8 @@ interface UserRepository extends ReactiveCassandraRepository<User, String> {

Flux<User> findByLastname(String lastname);

Mono<Slice<User>> findByLastname(String firstname, Pageable pageable);

Mono<User> findFirstByLastname(String lastname);

Mono<User> findOneByLastname(String lastname);
Expand Down