Skip to content

Commit

Permalink
Merge branch 'DATACASS-11' into DATACASS-32
Browse files Browse the repository at this point in the history
  • Loading branch information
prowave committed Nov 17, 2013
2 parents e99d9f8 + 963be47 commit 74a37c5
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@ public MappingContext<? extends CassandraPersistentEntity<?>, CassandraPersisten
}

/**
* Return the {@link CassandraConverter} instance to convert Rows to Objects.
* Return the {@link CassandraConverter} instance to convert Rows to Objects, Objects to BuiltStatements
*
* @return
* @throws Exception
*/
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
MappingCassandraConverter converter = new MappingCassandraConverter(mappingContext());
converter.setBeanClassLoader(beanClassLoader);
return converter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import org.springframework.data.cassandra.mapping.CassandraPersistentProperty;
import org.springframework.data.convert.EntityConverter;

import com.datastax.driver.core.Row;

/**
* Central Cassandra specific converter interface from Object to Row.
*
* @author Alex Shvid
*/
public interface CassandraConverter extends
EntityConverter<CassandraPersistentEntity<?>, CassandraPersistentProperty, Object, Row> {
EntityConverter<CassandraPersistentEntity<?>, CassandraPersistentProperty, Object, Object> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.convert.support.DefaultConversionService;
Expand All @@ -34,16 +35,21 @@
import org.springframework.data.mapping.model.SpELContext;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.ClassUtils;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Update;

/**
* {@link CassandraConverter} that uses a {@link MappingContext} to do sophisticated mapping of domain objects to
* {@link Row}.
*
* @author Alex Shvid
*/
public class MappingCassandraConverter extends AbstractCassandraConverter implements ApplicationContextAware {
public class MappingCassandraConverter extends AbstractCassandraConverter implements ApplicationContextAware,
BeanClassLoaderAware {

protected static final Logger log = LoggerFactory.getLogger(MappingCassandraConverter.class);

Expand All @@ -52,6 +58,8 @@ public class MappingCassandraConverter extends AbstractCassandraConverter implem
private SpELContext spELContext;
private boolean useFieldAccessOnly = true;

private ClassLoader beanClassLoader;

/**
* Creates a new {@link MappingCassandraConverter} given the new {@link MappingContext}.
*
Expand All @@ -65,9 +73,11 @@ public MappingCassandraConverter(
}

@SuppressWarnings("unchecked")
public <R> R read(Class<R> clazz, Row row) {
public <R> R readRow(Class<R> clazz, Row row) {

Class<R> beanClassLoaderClass = transformClassToBeanClassLoaderClass(clazz);

TypeInformation<? extends R> type = ClassTypeInformation.from(clazz);
TypeInformation<? extends R> type = ClassTypeInformation.from(beanClassLoaderClass);
// TypeInformation<? extends R> typeToUse = typeMapper.readType(row, type);
TypeInformation<? extends R> typeToUse = type;
Class<? extends R> rawType = typeToUse.getType();
Expand All @@ -82,7 +92,7 @@ public <R> R read(Class<R> clazz, Row row) {
throw new MappingException("No mapping metadata found for " + rawType.getName());
}

return read(persistentEntity, row);
return readRowInternal(persistentEntity, row);
}

/*
Expand All @@ -102,7 +112,7 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
this.spELContext = new SpELContext(this.spELContext, applicationContext);
}

private <S extends Object> S read(final CassandraPersistentEntity<S> entity, final Row row) {
private <S extends Object> S readRowInternal(final CassandraPersistentEntity<S> entity, final Row row) {

final DefaultSpELExpressionEvaluator evaluator = new DefaultSpELExpressionEvaluator(row, spELContext);

Expand Down Expand Up @@ -144,14 +154,97 @@ public void setUseFieldAccessOnly(boolean useFieldAccessOnly) {
* @see org.springframework.data.convert.EntityWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
public void write(Object source, Row sink) {

/*
* There is no concept of passing a Row into Cassandra for Writing.
* This must be done with Query
*
* See the CQLUtils.
*/
public <R> R read(Class<R> type, Object row) {
if (row instanceof Row) {
return readRow(type, (Row) row);
}
throw new MappingException("Unknown row object " + row.getClass().getName());
}

/* (non-Javadoc)
* @see org.springframework.data.convert.EntityWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
public void write(Object obj, Object builtStatement) {

if (obj == null) {
return;
}

Class<?> beanClassLoaderClass = transformClassToBeanClassLoaderClass(obj.getClass());
CassandraPersistentEntity<?> entity = mappingContext.getPersistentEntity(beanClassLoaderClass);

if (entity == null) {
throw new MappingException("No mapping metadata found for " + obj.getClass());
}

if (builtStatement instanceof Insert) {
writeInsertInternal(obj, (Insert) builtStatement, entity);
} else if (builtStatement instanceof Update) {
writeUpdateInternal(obj, (Update) builtStatement, entity);
} else {
throw new MappingException("Unknown buildStatement " + builtStatement.getClass().getName());
}
}

private void writeInsertInternal(final Object objectToSave, final Insert insert, CassandraPersistentEntity<?> entity) {

final BeanWrapper<CassandraPersistentEntity<Object>, Object> wrapper = BeanWrapper.create(objectToSave,
conversionService);

// Write the properties
entity.doWithProperties(new PropertyHandler<CassandraPersistentProperty>() {
public void doWithPersistentProperty(CassandraPersistentProperty prop) {

Object propertyObj = wrapper.getProperty(prop, prop.getType(), useFieldAccessOnly);

if (propertyObj != null) {
insert.value(prop.getColumnName(), propertyObj);
}

}
});

}

private void writeUpdateInternal(final Object objectToSave, final Update update, CassandraPersistentEntity<?> entity) {

final BeanWrapper<CassandraPersistentEntity<Object>, Object> wrapper = BeanWrapper.create(objectToSave,
conversionService);

// Write the properties
entity.doWithProperties(new PropertyHandler<CassandraPersistentProperty>() {
public void doWithPersistentProperty(CassandraPersistentProperty prop) {

Object propertyObj = wrapper.getProperty(prop, prop.getType(), useFieldAccessOnly);

if (propertyObj != null) {
if (prop.isIdProperty()) {
update.where(QueryBuilder.eq(prop.getColumnName(), propertyObj));
} else {
update.with(QueryBuilder.set(prop.getColumnName(), propertyObj));
}
}

}
});

}

@SuppressWarnings("unchecked")
private <T> Class<T> transformClassToBeanClassLoaderClass(Class<T> entity) {
try {
return (Class<T>) ClassUtils.forName(entity.getName(), beanClassLoader);
} catch (ClassNotFoundException e) {
return entity;
} catch (LinkageError e) {
return entity;
}
}

@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public class CassandraTemplate implements CassandraOperations, BeanClassLoaderAw
*/
private static class ReadRowCallback<T> implements RowCallback<T> {

private final EntityReader<? super T, Row> reader;
private final EntityReader<? super T, Object> reader;
private final Class<T> type;

public ReadRowCallback(EntityReader<? super T, Row> reader, Class<T> type) {
public ReadRowCallback(EntityReader<? super T, Object> reader, Class<T> type) {
Assert.notNull(reader);
Assert.notNull(type);
this.reader = reader;
Expand Down Expand Up @@ -1030,13 +1030,10 @@ protected <T> List<T> doBatchInsert(final String tableName, final List<T> entiti

Assert.notEmpty(entities);

CassandraPersistentEntity<?> CPEntity = getEntity(entities.get(0));

Assert.notNull(CPEntity);

try {

final Batch b = CqlUtils.toInsertBatchQuery(keyspace.getKeyspace(), tableName, entities, CPEntity, optionsByName);
final Batch b = CqlUtils.toInsertBatchQuery(keyspace.getKeyspace(), tableName, entities, optionsByName,
cassandraConverter);
log.info(b.getQueryString());

return execute(new SessionCallback<List<T>>() {
Expand Down Expand Up @@ -1075,13 +1072,10 @@ protected <T> List<T> doBatchUpdate(final String tableName, final List<T> entiti

Assert.notEmpty(entities);

CassandraPersistentEntity<?> CPEntity = getEntity(entities.get(0));

Assert.notNull(CPEntity);

try {

final Batch b = CqlUtils.toUpdateBatchQuery(keyspace.getKeyspace(), tableName, entities, CPEntity, optionsByName);
final Batch b = CqlUtils.toUpdateBatchQuery(keyspace.getKeyspace(), tableName, entities, optionsByName,
cassandraConverter);
log.info(b.toString());

return execute(new SessionCallback<List<T>>() {
Expand Down Expand Up @@ -1155,13 +1149,10 @@ public Object doInSession(Session s) throws DataAccessException {
protected <T> T doInsert(final String tableName, final T entity, final Map<String, Object> optionsByName,
final boolean insertAsychronously) {

CassandraPersistentEntity<?> CPEntity = getEntity(entity);

Assert.notNull(CPEntity);

try {

final Query q = CqlUtils.toInsertQuery(keyspace.getKeyspace(), tableName, entity, CPEntity, optionsByName);
final Query q = CqlUtils.toInsertQuery(keyspace.getKeyspace(), tableName, entity, optionsByName,
cassandraConverter);
log.info(q.toString());
if (q.getConsistencyLevel() != null) {
log.info(q.getConsistencyLevel().name());
Expand Down Expand Up @@ -1205,13 +1196,10 @@ public T doInSession(Session s) throws DataAccessException {
protected <T> T doUpdate(final String tableName, final T entity, final Map<String, Object> optionsByName,
final boolean updateAsychronously) {

CassandraPersistentEntity<?> CPEntity = getEntity(entity);

Assert.notNull(CPEntity);

try {

final Query q = CqlUtils.toUpdateQuery(keyspace.getKeyspace(), tableName, entity, CPEntity, optionsByName);
final Query q = CqlUtils.toUpdateQuery(keyspace.getKeyspace(), tableName, entity, optionsByName,
cassandraConverter);
log.info(q.toString());

return execute(new SessionCallback<T>() {
Expand Down
Loading

0 comments on commit 74a37c5

Please sign in to comment.