Skip to content

Commit

Permalink
Added new tests for non-depdendent collections and removal of removed…
Browse files Browse the repository at this point in the history
… entries. Need to also test for maps.
  • Loading branch information
tnine committed Feb 16, 2011
1 parent 69424e5 commit 0a9c4dc
Show file tree
Hide file tree
Showing 15 changed files with 556 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<properties>
<datanucleus.version>2.1.1</datanucleus.version>
<datanucleus.plugin.version>2.1.0-release</datanucleus.plugin.version>
<cassandra.version>0.7.0</cassandra.version>
<cassandra.version>0.7.1</cassandra.version>
<thrift.version>0.5</thrift.version>
<pelops.version>1.0-RC1-0.7.0-st</pelops.version>
<slf4j.version>1.6.1</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ public Object fetchObjectField(int fieldNumber) {

// get our list of Strings

ReadCollection columnFetcher = new ReadCollection(
ReadCollection columnFetcher = new ReadCollection(selector,
byteContext, columnFamily, rowKey, columnName,
context, elementClass);

// TODO use context.getFetchPlan().getFetchSize()
columnFetcher.fetchColumns(100, null, selector);
columnFetcher.fetchColumns(100, null);

for (Object key : columnFetcher) {

Expand Down Expand Up @@ -328,10 +328,10 @@ public Object fetchObjectField(int fieldNumber) {
}

// TODO use context.getFetchPlan().getFetchSize()
ReadMap mapReader = new ReadMap(byteContext, columnFamily,
ReadMap mapReader = new ReadMap(selector, byteContext, columnFamily,
rowKey, columnName, storedKeyClass,
storedValueClass);
mapReader.fetchColumns(100, null, selector);
mapReader.fetchColumns(100, null);

for (CassEntry entry : mapReader) {

Expand Down Expand Up @@ -379,10 +379,10 @@ public Object fetchObjectField(int fieldNumber) {

} else if (fieldMetaData.getType().isArray()) {

ReadMap mapReader = new ReadMap(byteContext, columnFamily,
ReadMap mapReader = new ReadMap(selector, byteContext, columnFamily,
rowKey, columnName, Integer.class,
byteContext.getKeyClass(context, metaData));
mapReader.fetchColumns(100, null, selector);
mapReader.fetchColumns(100, null);

int columns = mapReader.getColumnCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.datanucleus.store.fieldmanager.AbstractFieldManager;
import org.scale7.cassandra.pelops.Bytes;
import org.scale7.cassandra.pelops.Mutator;
import org.scale7.cassandra.pelops.Selector;

import com.spidertracks.datanucleus.collection.WriteCollection;
import com.spidertracks.datanucleus.collection.WriteMap;
Expand All @@ -48,6 +49,7 @@
public class CassandraInsertFieldManager extends AbstractFieldManager {

private ExecutionContext context;
private Selector selector;
private Mutator mutator;
private AbstractClassMetaData metaData;
private ObjectProvider objectProvider;
Expand All @@ -59,10 +61,11 @@ public class CassandraInsertFieldManager extends AbstractFieldManager {
* @param columns
* @param metaData
*/
public CassandraInsertFieldManager(Mutator mutator, ObjectProvider op,
String columnFamily, Bytes key) {
public CassandraInsertFieldManager(Selector selector, Mutator mutator,
ObjectProvider op, String columnFamily, Bytes key) {
super();

this.selector = selector;
this.mutator = mutator;
this.objectProvider = op;
this.metaData = op.getClassMetaData();
Expand Down Expand Up @@ -183,19 +186,6 @@ public void storeObjectField(int fieldNumber, Object value) {

Bytes columnName = getColumnName(metaData, fieldNumber);

// delete operation
if (value == null) {
// TODO TN we need a way to update secondary indexing if this
// field was deleted.
// how can we get the previous value? Only loading the current
// value
// then removing will work
this.mutator.deleteColumn(columnFamily, key, columnName);

return;

}

ClassLoaderResolver clr = context.getClassLoaderResolver();
AbstractMemberMetaData fieldMetaData = metaData
.getMetaDataForManagedMemberAtAbsolutePosition(fieldNumber);
Expand All @@ -217,6 +207,12 @@ public void storeObjectField(int fieldNumber, Object value) {

}

// delete operation
if (value == null) {
this.mutator.deleteColumn(columnFamily, key, columnName);
return;
}

Object persisted = context.persistObjectInternal(value,
objectProvider, fieldNumber, StateManager.PC);

Expand All @@ -239,12 +235,18 @@ public void storeObjectField(int fieldNumber, Object value) {

if (fieldMetaData.hasCollection()) {

WriteCollection collectionWriter = new WriteCollection(
selector, byteContext, columnFamily, key,
columnName);

if (value == null) {
collectionWriter.removeAllColumns(mutator);
return;
}

Object persisted = null;
Object objectPk = null;

WriteCollection collectionWriter = new WriteCollection(
byteContext, columnFamily, key, columnName);

for (Object element : (Collection<?>) value) {
// persist the object
persisted = context.persistObjectInternal(element,
Expand All @@ -256,13 +258,24 @@ public void storeObjectField(int fieldNumber, Object value) {
collectionWriter.writeRelationship(mutator, objectPk);
}

// TODO remove this when SCO is working
collectionWriter.removeRemaining(mutator);

objectProvider.wrapSCOField(fieldNumber, value, true, true,
true);

return;

} else if (fieldMetaData.hasMap()) {

WriteMap mapWriter = new WriteMap(byteContext,
WriteMap mapWriter = new WriteMap(selector, byteContext,
columnFamily, key, columnName);

if (value == null) {
mapWriter.removeAllColumns(mutator);
return;
}

ApiAdapter adapter = context.getApiAdapter();

Map<?, ?> map = ((Map<?, ?>) value);
Expand Down Expand Up @@ -313,18 +326,29 @@ public void storeObjectField(int fieldNumber, Object value) {
mapWriter.writeRelationship(mutator, serializedKey,
serializedValue);

// TODO remove this when SCO is working
mapWriter.removeRemaining(mutator);

}

objectProvider.wrapSCOField(fieldNumber, value, true, true,
true);

return;

} else if (fieldMetaData.hasArray()) {

Object persisted = null;
Object objectPk = null;

WriteMap mapWriter = new WriteMap(byteContext,
WriteMap mapWriter = new WriteMap(selector, byteContext,
columnFamily, key, columnName);

if (value == null) {
mapWriter.removeAllColumns(mutator);
return;
}

for (int i = 0; i < Array.getLength(value); i++) {

// persist the object
Expand All @@ -337,8 +361,19 @@ public void storeObjectField(int fieldNumber, Object value) {

mapWriter.writeRelationship(mutator, i, objectPk);
}

// TODO remove this when SCO is working
mapWriter.removeRemaining(mutator);
}

objectProvider.wrapSCOField(fieldNumber, value, true, true,
true);

return;
}

if (value == null) {
this.mutator.deleteColumn(columnFamily, key, columnName);
return;
}

Expand All @@ -355,13 +390,13 @@ public void storeObjectField(int fieldNumber, Object value) {
@Override
public void storeStringField(int fieldNumber, String value) {
try {
if(value == null)
{
mutator.deleteColumn(columnFamily, key, getColumnName(metaData, fieldNumber));

if (value == null) {
mutator.deleteColumn(columnFamily, key,
getColumnName(metaData, fieldNumber));
return;
}

mutator.writeColumn(columnFamily, key, mutator.newColumn(
getColumnName(metaData, fieldNumber),
byteContext.getBytes(value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ public void fetchObject(ObjectProvider op, int[] fieldNumbers) {
// exception b/c the object doesn't exist
pksearched(metaData, fieldNumbers);

// do nothing if we didn't fail the check above. Since cassandra has
// no concept of transactions the DN transaction is attempting
// to load and flush rows that no longer exist.
return;

}

Expand Down Expand Up @@ -272,13 +268,15 @@ public void updateObject(ObjectProvider op, int[] fieldNumbers) {

// signal a write is about to start
Mutator mutator = this.batchManager.beginWrite(ec).getMutator();
Selector selector = Pelops.createSelector(manager.getPoolName());


Bytes key = byteContext.getRowKey(op);
String columnFamily = getColumnFamily(metaData);

// Write our all our primary object data
CassandraInsertFieldManager manager = new CassandraInsertFieldManager(
mutator, op, columnFamily, key);
selector, mutator, op, columnFamily, key);

op.provideFields(metaData.getAllMemberPositions(), manager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,21 @@
public class Consistency {

private static final ThreadLocal<ConsistencyLevel> level = new ThreadLocal<ConsistencyLevel>();

private static ConsistencyLevel defaultLevel;


{
defaultLevel = ConsistencyLevel.ONE;
}

/**
* Set the default level if it hasn't been set
* @param c
*/
public static void setDefault(ConsistencyLevel c){
defaultLevel = c;
}
/**
* Set the consistency level for this thread
*
Expand All @@ -56,7 +70,7 @@ public static ConsistencyLevel get() {
ConsistencyLevel l = level.get();

if (l == null) {
return ConsistencyLevel.ONE;
return defaultLevel;
}

return l;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
***********************************************************************/
package com.spidertracks.datanucleus.collection;

import java.nio.ByteBuffer;
import java.util.List;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.scale7.cassandra.pelops.Bytes;
import org.scale7.cassandra.pelops.Selector;

import com.spidertracks.datanucleus.client.Consistency;
import com.spidertracks.datanucleus.convert.ByteConverterContext;

/**
Expand All @@ -30,11 +38,10 @@
* @author Todd Nine
*
*/
public class ExternalEntity{
public class ExternalEntity {


protected static final byte DELIM_MIN = 0;
protected static final byte DELIM_MAX = 1;
protected static final byte DELIM_MAX = 1;

protected ByteConverterContext context;

Expand All @@ -44,21 +51,22 @@ public class ExternalEntity{

protected Bytes rowKey;


public ExternalEntity() {

}
protected Selector selector;

/**
*
* @param context The Byte converter context
* @param ownerColumnFamily The owning column family
* @param rowKey The row key
* @param ownerColumn The bytes of the column
* @param context
* The Byte converter context
* @param ownerColumnFamily
* The owning column family
* @param rowKey
* The row key
* @param ownerColumn
* The bytes of the column
*/
public ExternalEntity(ByteConverterContext context,
public ExternalEntity(Selector selector, ByteConverterContext context,
String ownerColumnFamily, Bytes rowKey, Bytes ownerColumn) {
super();
this.selector = selector;
this.context = context;
this.ownerColumnFamily = ownerColumnFamily;
this.ownerColumn = ownerColumn;
Expand All @@ -67,9 +75,4 @@ public ExternalEntity(ByteConverterContext context,








}
Loading

0 comments on commit 0a9c4dc

Please sign in to comment.