Permalink
Browse files

hll wip

  • Loading branch information...
Tadas Vilkeliskis
Tadas Vilkeliskis committed Dec 28, 2013
1 parent 9f3a7f8 commit 529ddf67bf2b0a9f65c0aa1fc4b10fd8118a49c9
@@ -49,7 +49,8 @@
UUID (UUIDType.instance),
VARCHAR (UTF8Type.instance),
VARINT (IntegerType.instance),
TIMEUUID (TimeUUIDType.instance);
TIMEUUID (TimeUUIDType.instance),
HYPERLOGLOG (HyperLogLog2Type.instance);
private final AbstractType<?> type;
@@ -181,6 +182,16 @@ public static Collection set(CQL3Type t) throws InvalidRequestException
return new Collection(SetType.getInstance(t.getType()));
}
public static Collection hyperloglog(CQL3Type t, Term.Raw precision) throws InvalidRequestException
{
if (t.isCollection())
throw new InvalidRequestException("set type cannot contain another collection");
if (t.isCounter())
throw new InvalidRequestException("counters are not allowed inside a collection");
return new Collection(HyperLogLogType.getInstance(t.getType()));
}
public boolean isCollection()
{
return true;
@@ -226,6 +237,8 @@ public String toString()
return "list<" + ((ListType)type).elements.asCQL3Type() + ">";
case SET:
return "set<" + ((SetType)type).elements.asCQL3Type() + ">";
case HYPERLOGLOG:
return "hyperloglog<" + ((HyperLogLogType)type).elements.asCQL3Type() + ">";
case MAP:
MapType mt = (MapType)type;
return "map<" + mt.keys.asCQL3Type() + ", " + mt.values.asCQL3Type() + ">";
@@ -825,6 +825,12 @@ set_or_map[Term.Raw t] returns [Term.Raw value]
{ $value = new Sets.Literal(s); }
;
hyperloglog_literal[Term.Raw t] returns [Term.Raw value]
: { List<Term.Raw> s = new ArrayList<Term.Raw>(); s.add(t); }
( ',' tn=term { s.add(tn); } )*
{ $value = new HyperLogLogs.Literal(s); }
;
collection_literal returns [Term.Raw value]
: '[' { List<Term.Raw> l = new ArrayList<Term.Raw>(); }
( t1=term { l.add(t1); } ( ',' tn=term { l.add(tn); } )* )?
@@ -833,6 +839,8 @@ collection_literal returns [Term.Raw value]
// Note that we have an ambiguity between maps and set for "{}". So we force it to a set literal,
// and deal with it later based on the type of the column (SetLiteral.java).
| '{' '}' { $value = new Sets.Literal(Collections.<Term.Raw>emptyList()); }
| '[[' ']]' { $value = new HyperLogLogs.Literal(Collections.<Term.Raw>emptyList()); }
| '[[' t=term v=hyperloglog_literal[t] { $value = v; } ']]'
;
usertype_literal returns [UserTypes.Literal ut]
@@ -986,6 +994,9 @@ native_type returns [CQL3Type t]
;
collection_type returns [CQL3Type pt]
@init {
Term.Raw precision = null;
}
: K_MAP '<' t1=comparatorType ',' t2=comparatorType '>'
{ try {
// if we can't parse either t1 or t2, antlr will "recover" and we may have t1 or t2 null.
@@ -996,6 +1007,8 @@ collection_type returns [CQL3Type pt]
{ try { if (t != null) $pt = CQL3Type.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
| K_SET '<' t=comparatorType '>'
{ try { if (t != null) $pt = CQL3Type.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
| K_HYPERLOGLOG '<' t1=comparatorType ',' t3=intValue { precision=t3; } '>'
{ try { if (t1 != null && t3 != null) $pt = CQL3Type.Collection.hyperloglog(t1, precision); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
;
username
@@ -1145,6 +1158,7 @@ K_MAP: M A P;
K_LIST: L I S T;
K_NAN: N A N;
K_INFINITY: I N F I N I T Y;
K_HYPERLOGLOG: H Y P E R L O G L O G;
K_TRIGGER: T R I G G E R;
@@ -0,0 +1,271 @@
package org.apache.cassandra.cql3;
import com.google.common.base.Joiner;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HyperLogLog;
import org.apache.cassandra.cql3.Lists;
import org.apache.cassandra.utils.Pair;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Static helper methods and classes for hyperloglogs.
*/
public abstract class HyperLogLogs
{
private HyperLogLogs() {}
public static ColumnSpecification valueSpecOf(ColumnSpecification column)
{
return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("value(" + column.name + ")", true), ((HyperLogLogType)column.type).elements);
}
public static class Literal implements Term.Raw
{
private final List<Term.Raw> elements;
public Literal(List<Term.Raw> elements)
{
this.elements = elements;
}
public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
{
validateAssignableTo(receiver);
ColumnSpecification valueSpec = HyperLogLogs.valueSpecOf(receiver);
Set<Term> values = new HashSet<Term>(elements.size());
boolean allTerminal = true;
for (Term.Raw rt : elements)
{
Term t = rt.prepare(valueSpec);
if (t.containsBindMarker())
throw new InvalidRequestException(String.format("Invalid set literal for %s: bind variables are not supported inside collection literals", receiver));
if (t instanceof Term.NonTerminal)
allTerminal = false;
values.add(t);
}
DelayedValue value = new DelayedValue(((HyperLogLogType)receiver.type).elements, values);
return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
}
private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
{
ColumnSpecification valueSpec = HyperLogLogs.valueSpecOf(receiver);
for (Term.Raw rt : elements)
{
if (!rt.isAssignableTo(valueSpec))
throw new InvalidRequestException(String.format("Invalid hyperloglog literal for %s: value %s is not of type %s", receiver, rt, valueSpec.type.asCQL3Type()));
}
}
public boolean isAssignableTo(ColumnSpecification receiver)
{
try
{
validateAssignableTo(receiver);
return true;
}
catch (InvalidRequestException e)
{
return false;
}
}
@Override
public String toString()
{
return "hyperloglog{" + Joiner.on(", ").join(elements) + "}";
}
}
public static class Value extends Term.Terminal
{
public final Set<ByteBuffer> elements;
public Value(Set<ByteBuffer> elements)
{
this.elements = elements;
}
public static Value fromSerialized(ByteBuffer value, HyperLogLogType type) throws InvalidRequestException
{
// when pulling data from disk(?)
Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>();
elements.add(ByteBuffer.wrap(new String("fromSerialized").getBytes()));
return new Value(elements);
/*
try
{
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
Set<?> s = (Set<?>)type.compose(value);
Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>(s.size());
for (Object element : s)
elements.add(type.elements.decompose(element));
return new Value(elements);
}
catch (MarshalException e)
{
throw new InvalidRequestException(e.getMessage());
}
*/
}
public ByteBuffer get()
{
// Serialized HLL value, the actual bitmap.
return ByteBuffer.wrap(new String("packed").getBytes());
//return CollectionType.pack(new ArrayList<ByteBuffer>(elements), elements.size());
}
}
public static class DelayedValue extends Term.NonTerminal
{
private final Comparator<ByteBuffer> comparator;
private final Set<Term> elements;
public DelayedValue(Comparator<ByteBuffer> comparator, Set<Term> elements)
{
this.comparator = comparator;
this.elements = elements;
}
public boolean containsBindMarker()
{
// False since we don't support them in collection
return false;
}
public void collectMarkerSpecification(VariableSpecifications boundNames)
{
}
public Value bind(List<ByteBuffer> values) throws InvalidRequestException
{
Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
for (Term t : elements)
{
ByteBuffer bytes = t.bindAndGet(values);
if (bytes == null)
throw new InvalidRequestException("null is not supported inside collections");
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (bytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
throw new InvalidRequestException(String.format("Set value is too long. Set values are limited to %d bytes but %d bytes value provided",
FBUtilities.MAX_UNSIGNED_SHORT,
bytes.remaining()));
buffers.add(bytes);
}
return new Value(buffers);
}
}
public static class Setter extends Operation
{
public Setter(ColumnIdentifier column, Term t)
{
super(column, t);
}
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
// delete + add
ColumnNameBuilder column = prefix.add(columnName);
cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
Adder.doAdd(rowKey, columnName, t, cf, column, params);
}
}
public static class Adder extends Operation
{
@Override
public boolean requiresRead() {
return true;
}
public Adder(ColumnIdentifier column, Term t)
{
super(column, t);
}
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
doAdd(rowKey, columnName, t, cf, prefix.add(columnName), params);
}
static void doAdd(ByteBuffer rowKey, ColumnIdentifier columnName, Term t, ColumnFamily cf, ColumnNameBuilder columnNameBuilder, UpdateParameters params) throws InvalidRequestException
{
Term.Terminal value = t.bind(params.variables);
if (value == null)
return;
HyperLogLog<String> hll = new HyperLogLog<String>(5);
Set<ByteBuffer> toAdd = ((Sets.Value)value).elements;
for (ByteBuffer bb : toAdd) {
hll.add(null);
}
ByteBuffer cellName = columnNameBuilder.copy().add(ByteBuffer.wrap(new String("hll_bitmap").getBytes())).build();
cf.addColumn(params.makeColumn(
cellName,
hll.getBitmap()));
return;
/*
ByteBuffer ddd = value.get();
// We need to grab existing data from disk
// add new items and write it out
int componenetCount = columnNameBuilder.componentCount();
int columnCount = cf.getColumnCount();
for (ByteBuffer name : cf.getColumnNames()) {
//Column col = cf.getColumn(name);
System.out.println("sss");
}
List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName);
HyperLogLog<String> hll;
if (existingList.size() == 2) {
// there should always be two items in the list
// hll precision and hll_bitmap
} else {
assert existingList.size() == 0;
}
assert value instanceof Sets.Value : value;
ByteBuffer cellName = columnNameBuilder.copy().add(ByteBuffer.wrap(new String("precision").getBytes())).build();
cf.addColumn(params.makeColumn(
cellName,
ByteBuffer.wrap(new String("precision_value").getBytes())));
cellName = columnNameBuilder.copy().add(ByteBuffer.wrap(new String("hll_bitmap").getBytes())).build();
cf.addColumn(params.makeColumn(
cellName,
ByteBuffer.wrap(new String("HLL_BITMAP_DATA").getBytes())));
/*
Set<ByteBuffer> toAdd = ((HyperLogLogs.Value)value).elements;
for (ByteBuffer bb : toAdd)
{
ByteBuffer cellName = columnName.copy().add(bb).build();
cf.addColumn(params.makeColumn(cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER));
}
*/
}
}
}
@@ -174,6 +174,8 @@ public Operation prepare(ColumnDefinition receiver) throws InvalidRequestExcepti
return new Sets.Setter(receiver.name, v);
case MAP:
return new Maps.Setter(receiver.name, v);
case HYPERLOGLOG:
return new HyperLogLogs.Setter(receiver.name, v);
}
throw new AssertionError();
}
@@ -262,6 +264,8 @@ public Operation prepare(ColumnDefinition receiver) throws InvalidRequestExcepti
return new Lists.Appender(receiver.name, v);
case SET:
return new Sets.Adder(receiver.name, v);
case HYPERLOGLOG:
return new HyperLogLogs.Adder(receiver.name, v);
case MAP:
return new Maps.Putter(receiver.name, v);
}
@@ -305,6 +309,8 @@ public Operation prepare(ColumnDefinition receiver) throws InvalidRequestExcepti
return new Lists.Discarder(receiver.name, v);
case SET:
return new Sets.Discarder(receiver.name, v);
case HYPERLOGLOG:
throw new InvalidRequestException(String.format("Invalid operation (%s) for hyperloglog column %s", toString(receiver), receiver));
case MAP:
throw new InvalidRequestException(String.format("Invalid operation (%s) for map column %s", toString(receiver), receiver));
}
@@ -402,6 +408,8 @@ public Operation prepare(ColumnSpecification receiver) throws InvalidRequestExce
case SET:
Term elt = element.prepare(Sets.valueSpecOf(receiver));
return new Sets.Discarder(id, elt);
case HYPERLOGLOG:
throw new InvalidRequestException(String.format("Invalid deletion operation for hyperloglog collection column %s", receiver));
case MAP:
Term key = element.prepare(Maps.keySpecOf(receiver));
return new Maps.DiscarderByKey(id, key);
Oops, something went wrong.

0 comments on commit 529ddf6

Please sign in to comment.