Permalink
Browse files

(Pig) fix CassandraStorage to use correct comparator in Super ColumnF…

…amily case

patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251
  • Loading branch information...
1 parent 9ecaa2a commit dbd8ced107fad720c416d7e4919cb33884378a02 @xedin xedin committed Jan 31, 2012
Showing with 33 additions and 15 deletions.
  1. +2 −1 CHANGES.txt
  2. +31 −14 contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
View
@@ -3,7 +3,8 @@
* use correct list of replicas for LOCAL_QUORUM reads when read repair
is disabled (CASSANDRA-3696)
* block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733)
-
+ * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
+ case (CASSANDRA-3251)
0.8.9
* avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656)
@@ -128,7 +128,7 @@ public Tuple getNext() throws IOException
tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
- columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
+ columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.set(1, new DefaultDataBag(columns));
@@ -140,29 +140,31 @@ public Tuple getNext() throws IOException
}
}
- private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
+ private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- setTupleValue(pair, 0, marshallers.get(0).compose(name));
+ setTupleValue(pair, 0, comparator.compose(col.name()));
if (col instanceof Column)
{
// standard
- if (validators.get(name) == null)
+ if (validators.get(col.name()) == null)
setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
else
- setTupleValue(pair, 1, validators.get(name).compose(col.value()));
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
return pair;
}
+ else
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
-
- pair.set(1, new DefaultDataBag(subcols));
+ pair.set(1, new DefaultDataBag(subcols));
+ }
return pair;
}
@@ -188,12 +190,14 @@ private CfDef getCfDef(String signature)
private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
{
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
- AbstractType comparator = null;
- AbstractType default_validator = null;
- AbstractType key_validator = null;
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
try
{
comparator = TypeParser.parse(cfDef.getComparator_type());
+ subcomparator = TypeParser.parse(cfDef.getSubcomparator_type());
default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
key_validator = TypeParser.parse(cfDef.getKey_validation_class());
}
@@ -205,6 +209,7 @@ private CfDef getCfDef(String signature)
marshallers.add(comparator);
marshallers.add(default_validator);
marshallers.add(key_validator);
+ marshallers.add(subcomparator);
return marshallers;
}
@@ -230,6 +235,18 @@ private CfDef getCfDef(String signature)
return validators;
}
+ private AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
@Override
public InputFormat getInputFormat()
{

0 comments on commit dbd8ced

Please sign in to comment.