Permalink
Browse files

Sink works; uses hector for serializer/deserializer (needs extracting…

… somehow). test fails: need to improve test, currently just improvised from cascading.hbase :)
  • Loading branch information...
1 parent 4b15b2e commit 3a63280cfb86b38b697e11264b83c33d2f971717 @pingles committed Oct 24, 2011
View
@@ -62,6 +62,10 @@
<orderEntry type="library" name="Maven: net.java.dev.jets3t:jets3t:0.6.1" level="project" />
<orderEntry type="library" name="Maven: hsqldb:hsqldb:1.8.0.7" level="project" />
<orderEntry type="library" name="Maven: org.eclipse.jdt:core:3.1.1" level="project" />
+ <orderEntry type="library" name="Maven: me.prettyprint:hector-core:0.8.0-3" level="project" />
+ <orderEntry type="library" name="Maven: commons-pool:commons-pool:1.5.3" level="project" />
+ <orderEntry type="library" name="Maven: com.github.stephenc.eaio-uuid:uuid:3.2.0" level="project" />
+ <orderEntry type="library" name="Maven: com.ecyrd.speed4j:speed4j:0.9" level="project" />
</component>
</module>
View
@@ -259,6 +259,28 @@
<root url="jar://$MAVEN_REPOSITORY$/cascading/cascading-core/1.2.4/cascading-core-1.2.4-sources.jar!/" />
</SOURCES>
</library>
+ <library name="Maven: com.ecyrd.speed4j:speed4j:0.9">
+ <CLASSES>
+ <root url="jar://$MAVEN_REPOSITORY$/com/ecyrd/speed4j/speed4j/0.9/speed4j-0.9.jar!/" />
+ </CLASSES>
+ <JAVADOC>
+ <root url="jar://$MAVEN_REPOSITORY$/com/ecyrd/speed4j/speed4j/0.9/speed4j-0.9-javadoc.jar!/" />
+ </JAVADOC>
+ <SOURCES>
+ <root url="jar://$MAVEN_REPOSITORY$/com/ecyrd/speed4j/speed4j/0.9/speed4j-0.9-sources.jar!/" />
+ </SOURCES>
+ </library>
+ <library name="Maven: com.github.stephenc.eaio-uuid:uuid:3.2.0">
+ <CLASSES>
+ <root url="jar://$MAVEN_REPOSITORY$/com/github/stephenc/eaio-uuid/uuid/3.2.0/uuid-3.2.0.jar!/" />
+ </CLASSES>
+ <JAVADOC>
+ <root url="jar://$MAVEN_REPOSITORY$/com/github/stephenc/eaio-uuid/uuid/3.2.0/uuid-3.2.0-javadoc.jar!/" />
+ </JAVADOC>
+ <SOURCES>
+ <root url="jar://$MAVEN_REPOSITORY$/com/github/stephenc/eaio-uuid/uuid/3.2.0/uuid-3.2.0-sources.jar!/" />
+ </SOURCES>
+ </library>
<library name="Maven: com.github.stephenc.high-scale-lib:high-scale-lib:1.1.2">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/com/github/stephenc/high-scale-lib/high-scale-lib/1.1.2/high-scale-lib-1.1.2.jar!/" />
@@ -402,6 +424,17 @@
<root url="jar://$MAVEN_REPOSITORY$/commons-net/commons-net/1.4.1/commons-net-1.4.1-sources.jar!/" />
</SOURCES>
</library>
+ <library name="Maven: commons-pool:commons-pool:1.5.3">
+ <CLASSES>
+ <root url="jar://$MAVEN_REPOSITORY$/commons-pool/commons-pool/1.5.3/commons-pool-1.5.3.jar!/" />
+ </CLASSES>
+ <JAVADOC>
+ <root url="jar://$MAVEN_REPOSITORY$/commons-pool/commons-pool/1.5.3/commons-pool-1.5.3-javadoc.jar!/" />
+ </JAVADOC>
+ <SOURCES>
+ <root url="jar://$MAVEN_REPOSITORY$/commons-pool/commons-pool/1.5.3/commons-pool-1.5.3-sources.jar!/" />
+ </SOURCES>
+ </library>
<library name="Maven: hsqldb:hsqldb:1.8.0.7">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/hsqldb/hsqldb/1.8.0.7/hsqldb-1.8.0.7.jar!/" />
@@ -479,6 +512,17 @@
<root url="jar://$MAVEN_REPOSITORY$/log4j/log4j/1.2.16/log4j-1.2.16-sources.jar!/" />
</SOURCES>
</library>
+ <library name="Maven: me.prettyprint:hector-core:0.8.0-3">
+ <CLASSES>
+ <root url="jar://$MAVEN_REPOSITORY$/me/prettyprint/hector-core/0.8.0-3/hector-core-0.8.0-3.jar!/" />
+ </CLASSES>
+ <JAVADOC>
+ <root url="jar://$MAVEN_REPOSITORY$/me/prettyprint/hector-core/0.8.0-3/hector-core-0.8.0-3-javadoc.jar!/" />
+ </JAVADOC>
+ <SOURCES>
+ <root url="jar://$MAVEN_REPOSITORY$/me/prettyprint/hector-core/0.8.0-3/hector-core-0.8.0-3-sources.jar!/" />
+ </SOURCES>
+ </library>
<library name="Maven: net.java.dev.jets3t:jets3t:0.6.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/net/java/dev/jets3t/jets3t/0.6.1/jets3t-0.6.1.jar!/" />
View
@@ -28,6 +28,11 @@
<artifactId>hadoop-core</artifactId>
<version>0.20.2-cdh3u2</version>
</dependency>
+ <dependency>
+ <groupId>me.prettyprint</groupId>
+ <artifactId>hector-core</artifactId>
+ <version>0.8.0-3</version>
+ </dependency>
</dependencies>
<repositories>
<repository>
@@ -5,24 +5,37 @@
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
+import me.prettyprint.cassandra.serializers.TypeInferringSerializer;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
public class CassandraScheme extends Scheme {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraScheme.class);
+ private final Fields keyField;
+ private final Fields[] columnFields;
/**
* Creates a {@link Scheme} for dealing with a regular Column Family.
- * @param keyFields the fields to use for the row key
- * @param nameFields column names
+ * @param keyField the field to use for the row key
+ * @param columnFields column names
*/
- public CassandraScheme(Fields keyFields, Fields nameFields) {
+ public CassandraScheme(Fields keyField, Fields[] columnFields) {
+ this.keyField = keyField;
+ this.columnFields = columnFields;
}
@Override
@@ -31,6 +44,8 @@ public void sourceInit(Tap tap, JobConf jobConf) throws IOException {
@Override
public void sinkInit(Tap tap, JobConf jobConf) throws IOException {
+ jobConf.setOutputKeyClass(ByteBuffer.class);
+ jobConf.setOutputValueClass(Mutation.class);
jobConf.setOutputFormat(ColumnFamilyOutputFormat.class);
}
@@ -41,5 +56,38 @@ public Tuple source(Object o, Object o1) {
@Override
public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException {
+ Tuple key = tupleEntry.selectTuple(keyField);
+ TypeInferringSerializer serializer = TypeInferringSerializer.get();
+ byte[] keyBytes = serializer.toBytes(key.get(0));
+
+ for (int i = 0; i < columnFields.length; i++) {
+ Fields selector = columnFields[i];
+ TupleEntry values = tupleEntry.selectEntry(selector);
+
+ for (int j = 0; j < values.getFields().size(); j++) {
+ Fields fields = values.getFields();
+ Tuple tuple = values.getTuple();
+
+ Object name = fields.get(j);
+ Object value = tuple.get(j);
+
+ Mutation mutation = createColumnPutMutation(ByteBuffer.wrap(serializer.toBytes(name)), ByteBuffer.wrap(serializer.toBytes(value)));
+ outputCollector.collect(ByteBuffer.wrap(keyBytes), Collections.singletonList(mutation));
+ }
+ }
+ }
+
+ private Mutation createColumnPutMutation(ByteBuffer name, ByteBuffer value) {
+ Column column = new Column(name);
+ column.setName(name);
+ column.setValue(value);
+ column.setTimestamp(System.currentTimeMillis());
+
+ Mutation m = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ columnOrSuperColumn.setColumn(column);
+ m.setColumn_or_supercolumn(columnOrSuperColumn);
+
+ return m;
}
}
@@ -34,18 +34,13 @@ public CassandraTap(String initialAddress, Integer rpcPort, String keyspace, Str
this.keyspace = keyspace;
}
- private URI getUri() {
- try {
- return new URI(SCHEME, keyspace, columnFamilyName, null);
- } catch (URISyntaxException e) {
- return null;
- }
- }
-
@Override
public void sinkInit(JobConf conf) throws IOException {
+ LOGGER.info("Created Cassandra tap {}", getPath());
LOGGER.info("Sinking to column family: {}", columnFamilyName);
+
super.sinkInit(conf);
+
ConfigHelper.setOutputColumnFamily(conf, keyspace, columnFamilyName);
ConfigHelper.setPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setInitialAddress(conf, this.initialAddress);
@@ -60,7 +55,7 @@ public void sourceInit(JobConf conf) throws IOException {
@Override
public Path getPath() {
- return new Path(getUri().toString());
+ return new Path(String.format("cassandra://%s:%d/%s/%s", initialAddress, rpcPort, keyspace, columnFamilyName));
}
@Override
@@ -2,10 +2,15 @@
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
@@ -14,6 +19,8 @@
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -23,6 +30,7 @@
private Cassandra.Client client;
private final TTransport transport;
+ private String keyspaceName;
public CassandraClient(String rpcHost, Integer rpcPort) {
TSocket socket = new TSocket(rpcHost, rpcPort);
@@ -32,6 +40,11 @@ public CassandraClient(String rpcHost, Integer rpcPort) {
LOGGER.info("CassandraClient connecting to {}:{}", rpcHost, rpcPort);
}
+ public CassandraClient(String rpcHost, Integer rpcPort, String keyspaceName) {
+ this(rpcHost, rpcPort);
+ this.keyspaceName = keyspaceName;
+ }
+
public String createKeyspace(String keyspaceName) throws TException, SchemaDisagreementException, InvalidRequestException {
List<CfDef> columnFamilyDefs = new ArrayList<CfDef>();
@@ -51,6 +64,16 @@ public String createKeyspace(String keyspaceName) throws TException, SchemaDisag
public void open() throws InterruptedException, TTransportException {
transport.open();
+ if (keyspaceName != null) {
+ try {
+ client.send_set_keyspace(keyspaceName);
+ client.recv_set_keyspace();
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ } catch (InvalidRequestException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
public void close() {
@@ -95,4 +118,12 @@ public String createColumnFamily(String keyspace, String name) throws TException
String s = client.recv_system_add_column_family();
return s;
}
+
+ public byte[] getValue(String columnFamilyName, ByteBuffer keyBytes, ByteBuffer nameBytes) throws TException, TimedOutException, NotFoundException, InvalidRequestException, UnavailableException {
+ ColumnPath cp = new ColumnPath(columnFamilyName);
+ cp.setColumn(nameBytes);
+
+ ColumnOrSuperColumn columnOrSuperColumn = client.get(keyBytes, cp, ConsistencyLevel.ONE);
+ return columnOrSuperColumn.column.getValue();
+ }
}
@@ -9,8 +9,14 @@
import cascading.tap.Lfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
+import me.prettyprint.cassandra.serializers.TypeInferringSerializer;
import org.apache.cassandra.service.EmbeddedCassandraService;
-import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.thrift.TException;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -19,9 +25,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+
@RunWith(JUnit4.class)
public class CassandraSchemeTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemeTest.class);
@@ -31,6 +40,7 @@
private final String keyspaceName = "TestKeyspace";
private final String columnFamilyName = "TestColumnFamily";
transient private static Map<Object, Object> properties = new HashMap<Object, Object>();
+ private CassandraClient client;
@BeforeClass
public static void startCassandra() {
@@ -43,25 +53,48 @@ public static void startCassandra() {
}
@Before
- public void ensureCassandraTestStores() throws Exception {
+ public void beforeTest() throws Exception {
CassandraTestUtil.ensureKeyspace(keyspaceName);
CassandraTestUtil.ensureColumnFamily(keyspaceName, columnFamilyName);
+ client = new CassandraClient(getRpcHost(), getRpcPort(), keyspaceName);
+ client.open();
+ }
+
+ @After
+ public void afterTest() {
+ client.close();
}
@Test
- public void testSomething() {
- // create flow to read from local file and insert into HBase
+ public void testInsertIntoCassandra() throws TException, TimedOutException, NotFoundException, InvalidRequestException, UnavailableException {
Tap source = new Lfs(new TextLine(), inputFile);
Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "lower", "upper"), " "));
Fields keyFields = new Fields("num");
- Fields valueFields = new Fields("lower", "upper");
+ Fields[] valueFields = new Fields[] {new Fields("lower"), new Fields("upper")};
- Tap sink = new CassandraTap(CassandraTestUtil.getRpcHost(), CassandraTestUtil.getRpcPort(), keyspaceName, columnFamilyName, new CassandraScheme(keyFields, valueFields));
+ CassandraScheme scheme = new CassandraScheme(keyFields, valueFields);
+ Tap sink = new CassandraTap(getRpcHost(), getRpcPort(), keyspaceName, columnFamilyName, scheme);
Flow parseFlow = new FlowConnector(properties).connect(source, sink, parsePipe);
parseFlow.complete();
- assert(true);
+ assertEquals("a", bytesToString(client.getValue(columnFamilyName, toBytes("1"), toBytes("lower"))));
+ assertEquals("A", bytesToString(client.getValue(columnFamilyName, toBytes("1"), toBytes("upper"))));
+ }
+
+ private String bytesToString(byte[] bytes) {
+ return new String(bytes);
}
+ private ByteBuffer toBytes(Object obj) {
+ return TypeInferringSerializer.get().toByteBuffer(obj);
+ }
+
+ private String getRpcHost() {
+ return CassandraTestUtil.getRpcHost();
+ }
+
+ private Integer getRpcPort() {
+ return CassandraTestUtil.getRpcPort();
+ }
}

0 comments on commit 3a63280

Please sign in to comment.