Skip to content

Commit

Permalink
Improve CassandraTemplate creation from Session.
Browse files Browse the repository at this point in the history
CassandraTemplate and its reactive and asynchronous variants created with a bare Session now picks up UserTypeResolver and CodecRegistry configured at the Session level.

ReactiveSession now also exposes the logged keyspace and metadata.

Closes #1133
  • Loading branch information
mp911de committed May 19, 2021
1 parent 7ca512a commit 2b964d8
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

import java.io.Closeable;
import java.util.Map;
import java.util.Optional;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;

/**
* A session holds connections to a Cassandra cluster, allowing it to be queried. {@link ReactiveSession} executes
Expand All @@ -49,6 +54,40 @@
*/
public interface ReactiveSession extends Closeable {

/**
* Returns a snapshot of the Cassandra cluster's topology and schema metadata.
* <p/>
* In order to provide atomic updates, this method returns an immutable object: the node list, token map, and schema
* contained in a given instance will always be consistent with each other (but note that {@link Node} itself is not
* immutable: some of its properties will be updated dynamically, in particular {@link Node#getState()}).
* <p/>
* As a consequence of the above, you should call this method each time you need a fresh view of the metadata. <b>Do
* not</b> call it once and store the result, because it is a frozen snapshot that will become stale over time.
* <p>
* If a metadata refresh triggers events (such as node added/removed, or schema events), then the new version of the
* metadata is guaranteed to be visible by the time you receive these events.
* <p>
*
* @return never {@code null}, but may be empty if metadata has been disabled in the configuration.
* @since 3.1.10
*/
Metadata getMetadata();

/**
* The keyspace that this session is currently connected to, or {@link Optional#empty()} if this session is not
* connected to any keyspace.
* <p/>
* There are two ways that this can be set: before initializing the session (either with the {@code session-keyspace}
* option in the configuration, or with {@link CqlSessionBuilder#withKeyspace(CqlIdentifier)}); or at runtime, if the
* client issues a request that changes the keyspace (such as a CQL {@code USE} query). Note that this second method
* is inherently unsafe, since other requests expecting the old keyspace might be executing concurrently. Therefore it
* is highly discouraged, aside from trivial cases (such as a cqlsh-style program where requests are never
* concurrent).
*
* @since 3.1.10
*/
Optional<CqlIdentifier> getKeyspace();

/**
* Whether this Session instance has been closed.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ public abstract class AbstractCassandraConfiguration extends AbstractSessionConf
@Bean
public CassandraConverter cassandraConverter() {

CqlSession cqlSession = getRequiredSession();

UserTypeResolver userTypeResolver =
new SimpleUserTypeResolver(getRequiredSession(), CqlIdentifier.fromCql(getKeyspaceName()));
new SimpleUserTypeResolver(cqlSession, CqlIdentifier.fromCql(getKeyspaceName()));

MappingCassandraConverter converter =
new MappingCassandraConverter(requireBeanOfType(CassandraMappingContext.class));

converter.setCodecRegistry(getRequiredSession().getContext().getCodecRegistry());
converter.setCodecRegistry(cqlSession.getContext().getCodecRegistry());
converter.setUserTypeResolver(userTypeResolver);
converter.setCustomConversions(requireBeanOfType(CassandraCustomConversions.class));

Expand All @@ -92,8 +94,10 @@ public CassandraConverter cassandraConverter() {
@Bean
public CassandraMappingContext cassandraMapping() throws ClassNotFoundException {

CqlSession cqlSession = getRequiredSession();

UserTypeResolver userTypeResolver =
new SimpleUserTypeResolver(getRequiredSession(), CqlIdentifier.fromCql(getKeyspaceName()));
new SimpleUserTypeResolver(cqlSession, CqlIdentifier.fromCql(getKeyspaceName()));

CassandraMappingContext mappingContext =
new CassandraMappingContext(userTypeResolver, SimpleTupleTypeFactory.DEFAULT);
Expand All @@ -102,7 +106,7 @@ public CassandraMappingContext cassandraMapping() throws ClassNotFoundException

getBeanClassLoader().ifPresent(mappingContext::setBeanClassLoader);

mappingContext.setCodecRegistry(getRequiredSession().getContext().getCodecRegistry());
mappingContext.setCodecRegistry(cqlSession.getContext().getCodecRegistry());
mappingContext.setCustomConversions(customConversions);
mappingContext.setInitialEntitySet(getInitialEntitySet());
mappingContext.setSimpleTypeHolder(customConversions.getSimpleTypeHolder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.data.cassandra.core.cql.util.CassandraFutureAdapter;
import org.springframework.data.cassandra.core.cql.util.StatementBuilder;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
Expand Down Expand Up @@ -125,7 +126,7 @@ public class AsyncCassandraTemplate
* @see Session
*/
public AsyncCassandraTemplate(CqlSession session) {
this(session, newConverter());
this(session, newConverter(session));
}

/**
Expand Down Expand Up @@ -874,9 +875,11 @@ private Class<?> resolveTypeToRead(Class<?> entityType, Class<?> targetType) {
return targetType.isInterface() || targetType.isAssignableFrom(entityType) ? entityType : targetType;
}

private static MappingCassandraConverter newConverter() {
private static MappingCassandraConverter newConverter(CqlSession session) {

MappingCassandraConverter converter = new MappingCassandraConverter();
converter.setUserTypeResolver(new SimpleUserTypeResolver(session));
converter.setCodecRegistry(session.getContext().getCodecRegistry());

converter.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.data.cassandra.core.cql.util.StatementBuilder;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty;
import org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class CassandraTemplate implements CassandraOperations, ApplicationEventP
* @see Session
*/
public CassandraTemplate(CqlSession session) {
this(session, newConverter());
this(session, newConverter(session));
}

/**
Expand Down Expand Up @@ -923,9 +924,11 @@ private Class<?> resolveTypeToRead(Class<?> entityType, Class<?> targetType) {
return targetType.isInterface() || targetType.isAssignableFrom(entityType) ? entityType : targetType;
}

private static MappingCassandraConverter newConverter() {
private static MappingCassandraConverter newConverter(CqlSession session) {

MappingCassandraConverter converter = new MappingCassandraConverter();
converter.setUserTypeResolver(new SimpleUserTypeResolver(session));
converter.setCodecRegistry(session.getContext().getCodecRegistry());

converter.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.data.cassandra.core.cql.session.DefaultReactiveSessionFactory;
import org.springframework.data.cassandra.core.cql.util.StatementBuilder;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
Expand Down Expand Up @@ -128,7 +129,7 @@ public class ReactiveCassandraTemplate
* @see Session
*/
public ReactiveCassandraTemplate(ReactiveSession session) {
this(session, newConverter());
this(session, newConverter(session));
}

/**
Expand Down Expand Up @@ -879,9 +880,12 @@ private Class<?> resolveTypeToRead(Class<?> entityType, Class<?> targetType) {
return targetType.isInterface() || targetType.isAssignableFrom(entityType) ? entityType : targetType;
}

private static MappingCassandraConverter newConverter() {
private static MappingCassandraConverter newConverter(ReactiveSession session) {

MappingCassandraConverter converter = new MappingCassandraConverter();
converter.setUserTypeResolver(new SimpleUserTypeResolver(session::getMetadata,
session.getKeyspace().orElse(CqlIdentifier.fromCql("system"))));
converter.setCodecRegistry(session.getContext().getCodecRegistry());

converter.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class MappingCassandraConverter extends AbstractCassandraConverter

private CodecRegistry codecRegistry;

private UserTypeResolver userTypeResolver;
private @Nullable UserTypeResolver userTypeResolver;

private @Nullable ClassLoader beanClassLoader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import org.slf4j.Logger;
Expand All @@ -32,6 +33,7 @@
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.util.Assert;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
Expand All @@ -44,6 +46,7 @@
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;

/**
* Default implementation of a {@link ReactiveSession}. This implementation bridges asynchronous {@link CqlSession}
Expand Down Expand Up @@ -87,6 +90,22 @@ public DefaultBridgedReactiveSession(CqlSession session) {
this.session = session;
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.ReactiveSession#getMetadata()
*/
@Override
public Metadata getMetadata() {
return this.session.getMetadata();
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.ReactiveSession#getKeyspace()
*/
@Override
public Optional<CqlIdentifier> getKeyspace() {
return this.session.getKeyspace();
}

/* (non-Javadoc)
* @see org.springframework.data.cassandra.ReactiveSession#isClosed()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.cassandra.core.mapping;

import java.util.function.Supplier;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand All @@ -32,7 +34,7 @@
*/
public class SimpleUserTypeResolver implements UserTypeResolver {

private final CqlSession session;
private final Supplier<Metadata> metadataSupplier;

private final CqlIdentifier keyspaceName;

Expand All @@ -46,7 +48,7 @@ public SimpleUserTypeResolver(CqlSession session) {

Assert.notNull(session, "Session must not be null");

this.session = session;
this.metadataSupplier = session::getMetadata;
this.keyspaceName = session.getKeyspace().orElse(CqlIdentifier.fromCql("system"));
}

Expand All @@ -62,7 +64,23 @@ public SimpleUserTypeResolver(CqlSession session, CqlIdentifier keyspaceName) {
Assert.notNull(session, "Session must not be null");
Assert.notNull(keyspaceName, "Keyspace must not be null");

this.session = session;
this.metadataSupplier = session::getMetadata;
this.keyspaceName = keyspaceName;
}

/**
* Create a new {@link SimpleUserTypeResolver}.
*
* @param metadataSupplier must not be {@literal null}.
* @param keyspaceName must not be {@literal null}.
* @since 3.1.10
*/
public SimpleUserTypeResolver(Supplier<Metadata> metadataSupplier, CqlIdentifier keyspaceName) {

Assert.notNull(metadataSupplier, "Metadata supplier must not be null");
Assert.notNull(keyspaceName, "Keyspace must not be null");

this.metadataSupplier = metadataSupplier;
this.keyspaceName = keyspaceName;
}

Expand All @@ -72,7 +90,7 @@ public SimpleUserTypeResolver(CqlSession session, CqlIdentifier keyspaceName) {
@Nullable
@Override
public UserDefinedType resolveType(CqlIdentifier typeName) {
return session.getMetadata().getKeyspace(keyspaceName) //
return metadataSupplier.get().getKeyspace(keyspaceName) //
.flatMap(it -> it.getUserDefinedType(typeName)) //
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;

/**
* Unit tests for {@link AsyncCassandraTemplate}.
Expand All @@ -66,9 +69,11 @@
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class AsyncCassandraTemplateUnitTests {
class AsyncCassandraTemplateUnitTests {

@Mock CqlSession session;
CodecRegistry codecRegistry = new DefaultCodecRegistry("foo");
@Mock DriverContext driverContext;
@Mock AsyncResultSet resultSet;
@Mock Row row;
@Mock ColumnDefinition columnDefinition;
Expand All @@ -85,7 +90,8 @@ public class AsyncCassandraTemplateUnitTests {
@BeforeEach
void setUp() {

template = new AsyncCassandraTemplate(session);
when(driverContext.getCodecRegistry()).thenReturn(codecRegistry);
when(session.getContext()).thenReturn(driverContext);

when(session.executeAsync(any(Statement.class))).thenReturn(new TestResultSetFuture(resultSet));
when(row.getColumnDefinitions()).thenReturn(columnDefinitions);
Expand All @@ -106,9 +112,16 @@ void setUp() {
return entity;
});

template = new AsyncCassandraTemplate(session);
template.setEntityCallbacks(callbacks);
}

@Test // gh-1133
void shouldConfigureConverterFromSession() {
assertThat(template.getConverter().getCodecRegistry()).isEqualTo(session.getContext().getCodecRegistry());
assertThat(template.getConverter()).extracting("userTypeResolver").isNotNull();
}

@Test // DATACASS-292
void selectUsingCqlShouldReturnMappedResults() {

Expand Down
Loading

0 comments on commit 2b964d8

Please sign in to comment.