Skip to content
Browse files

merge HBaseFull with original. Added a flag to indicate using fully q…

…ualified constructor and handle differences in appropriate places.
  • Loading branch information...
1 parent 2676d48 commit d84c53375c51b955971adc3eacc8aa4483b49095 @drujensen drujensen committed Feb 20, 2009
View
223 src/java/cascading/hbase/HBaseFullScheme.java
@@ -1,223 +0,0 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package cascading.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-
-import cascading.scheme.Scheme;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import cascading.tap.Tap;
-import cascading.util.Util;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.mapred.TableInputFormat;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to
- * allow for the reading and writing of data to and from a HBase cluster.
- *
- * @see HBaseTap
- */
-public class HBaseFullScheme extends Scheme
-{
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger(HBaseFullScheme.class);
-
- /** Field keyFields */
- private Fields keyField;
-
- /** Field valueFields */
- private String[] columnNames;
- private Fields[] columnFields;
- private byte[][] fields = null;
-
-
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyField
- * of type String
- * @param columnFields
- * of type Fields
- */
- public HBaseFullScheme(Fields keyField, Fields columnFields)
- {
- this(keyField, Fields.fields( columnFields ));
- }
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyField
- * of type Field
- * @param columnFields
- * of type Field[]
- */
- public HBaseFullScheme(Fields keyField, Fields[] columnFields)
- {
- this.keyField = keyField;
- this.columnFields = columnFields;
-
- this.columnNames = new String[columnFields.length];
- for (int i = 0; i < columnFields.length; i++)
- {
- this.columnNames[i] = (String) columnFields[i].get(0);
- }
-
- validate();
-
- setSourceSink(this.keyField, this.columnFields);
-
- }
-
- private void validate()
- {
- if( keyField.size() != 1 )
- throw new IllegalArgumentException( "may only have one key field, found: " + keyField.print() );
- }
-
- private void setSourceSink(Fields keyFields, Fields[] columnFields)
- {
- Fields allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend
- // keys
-
- setSourceFields(allFields);
- setSinkFields(allFields);
- }
-
- /**
- * Method getFamilyNames returns the familyNames of this HBaseScheme object.
- *
- * @return the familyNames (type String[]) of this HBaseScheme object.
- */
- public String[] getFamilyNames()
- {
- HashSet<String> familyNames = new HashSet<String>();
- for (String columnName : columnNames)
- {
- int pos = columnName.indexOf(":");
- if (pos > 0)
- {
- familyNames.add(columnName.substring(0, pos + 1));
- } else
- {
- familyNames.add(columnName + ":");
- }
- }
- return familyNames.toArray(new String[0]);
- }
-
- public Tuple source(Object key, Object value)
- {
- Tuple result = new Tuple();
-
- ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
- RowResult row = (RowResult) value;
-
- result.add(Bytes.toString(keyWritable.get()));
-
- for (byte[] bytes : getFields(this.columnNames))
- {
- Cell cell = row.get(bytes);
- result.add(cell != null ? Bytes.toString(cell.getValue()) : "");
- }
-
- return result;
- }
-
- private byte[][] getFields(String[] columnNames)
- {
- if (fields == null)
- fields = new byte[columnNames.length][];
-
- for (int i = 0; i < columnNames.length; i++)
- fields[i] = Bytes.toBytes(hbaseColumn(columnNames[i]));
-
- return fields;
- }
-
- public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException
- {
- Tuple key = tupleEntry.selectTuple(keyField);
-
- byte[] keyBytes = Bytes.toBytes(key.getString(0));
- BatchUpdate batchUpdate = new BatchUpdate(keyBytes);
-
- for (int i = 0; i < columnFields.length; i++)
- {
- Fields fieldSelector = columnFields[i];
- TupleEntry values = tupleEntry.selectEntry(fieldSelector);
-
- for (int j = 0; j < values.getFields().size(); j++)
- {
- Fields fields = values.getFields();
- Tuple tuple = values.getTuple();
- batchUpdate.put(hbaseColumn(fields.get(j).toString()), Bytes.toBytes(tuple.getString(j)));
- }
- }
-
- outputCollector.collect(null, batchUpdate);
- }
-
- public void sinkInit(Tap tap, JobConf conf) throws IOException
- {
- conf.setOutputFormat(TableOutputFormat.class);
-
- conf.setOutputKeyClass(ImmutableBytesWritable.class);
- conf.setOutputValueClass(BatchUpdate.class);
- }
-
- public void sourceInit(Tap tap, JobConf conf) throws IOException
- {
- conf.setInputFormat(TableInputFormat.class);
-
- String columns = getColumns();
- LOG.debug("sourcing from columns: {}", columns);
-
- conf.set(TableInputFormat.COLUMN_LIST, columns);
- }
-
- private String getColumns()
- {
- String columns = new String();
- for (String column : columnNames)
- {
- columns += hbaseColumn(column) + " ";
- }
- return columns;
- }
-
- private String hbaseColumn(String columnName)
- {
- if (columnName.indexOf(":") > 0)
- {
- return columnName;
- }
- return columnName + ":";
- }
-
-}
View
191 src/java/cascading/hbase/HBaseFullTap.java
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package cascading.hbase;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import cascading.tap.Tap;
-import cascading.tap.TapException;
-import cascading.tap.SinkMode;
-import cascading.tap.hadoop.TapIterator;
-import cascading.tap.hadoop.TapCollector;
-import cascading.tuple.TupleEntryIterator;
-import cascading.tuple.TupleEntryCollector;
-import cascading.flow.Flow;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme}
- * to allow for the reading and writing of data to and from a HBase cluster.
- *
- */
-public class HBaseFullTap extends Tap
-{
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger(HBaseFullTap.class);
-
- /** Field SCHEME */
- public static final String SCHEME = "hbase";
-
- /** Field hBaseAdmin */
- private transient HBaseAdmin hBaseAdmin;
-
- private String tableName;
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- */
- public HBaseFullTap(String tableName, HBaseFullScheme HBaseFullScheme)
- {
- super(HBaseFullScheme, SinkMode.APPEND);
- this.tableName = tableName;
- }
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName
- * of type String
- * @param HBaseFullScheme
- * of type HBaseFullScheme
- * @param sinkMode
- * of type SinkMode
- */
- public HBaseFullTap(String tableName, HBaseFullScheme HBaseFullScheme, SinkMode sinkMode)
- {
- super(HBaseFullScheme, sinkMode);
- this.tableName = tableName;
- }
-
- private URI getURI()
- {
- try
- {
- return new URI(SCHEME, tableName, null);
- } catch (URISyntaxException exception)
- {
- throw new TapException("unable to create uri", exception);
- }
- }
-
- public Path getPath()
- {
- return new Path(getURI().toString());
- }
-
- public TupleEntryIterator openForRead(JobConf conf) throws IOException
- {
- return new TupleEntryIterator(getSourceFields(), new TapIterator(this, conf));
- }
-
- public TupleEntryCollector openForWrite(JobConf conf) throws IOException
- {
- return new TapCollector(this, conf);
- }
-
- private HBaseAdmin getHBaseAdmin() throws MasterNotRunningException
- {
- if (hBaseAdmin == null)
- hBaseAdmin = new HBaseAdmin(new HBaseConfiguration());
-
- return hBaseAdmin;
- }
-
- public boolean makeDirs(JobConf conf) throws IOException
- {
- HBaseAdmin hBaseAdmin = getHBaseAdmin();
-
- if (hBaseAdmin.tableExists(tableName))
- return true;
-
- LOG.debug("creating hbase table: {}", tableName);
-
- HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
-
- String[] familyNames = ((HBaseFullScheme) getScheme()).getFamilyNames();
-
- for (String familyName : familyNames)
- tableDescriptor.addFamily(new HColumnDescriptor(familyName));
-
- hBaseAdmin.createTable(tableDescriptor);
-
- return true;
- }
-
- public boolean deletePath(JobConf conf) throws IOException
- {
- // eventually keep table meta-data to source table create
- HBaseAdmin hBaseAdmin = getHBaseAdmin();
-
- if (!hBaseAdmin.tableExists(tableName))
- return true;
-
- LOG.debug("deleting hbase table: {}", tableName);
-
- hBaseAdmin.disableTable(tableName);
- hBaseAdmin.deleteTable(tableName);
-
- return true;
- }
-
- public boolean pathExists(JobConf conf) throws IOException
- {
- return getHBaseAdmin().tableExists(tableName);
- }
-
- public long getPathModified(JobConf conf) throws IOException
- {
- return System.currentTimeMillis(); // currently unable to find last mod time on a table
- }
-
- @Override
- public void sinkInit(JobConf conf) throws IOException
- {
- LOG.debug("sinking to table: {}", tableName);
-
- // do not delete if initialized from within a task
- if (isReplace() && conf.get("mapred.task.partition") == null)
- deletePath(conf);
-
- makeDirs(conf);
-
- conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
- super.sinkInit(conf);
- }
-
- @Override
- public void sourceInit(JobConf conf) throws IOException
- {
- LOG.debug("sourcing from table: {}", tableName);
-
- FileInputFormat.addInputPaths(conf, tableName);
- super.sourceInit(conf);
- }
-}
View
307 src/java/cascading/hbase/HBaseScheme.java
@@ -13,6 +13,9 @@
package cascading.hbase;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import cascading.scheme.Scheme;
import cascading.tuple.Fields;
@@ -22,8 +25,10 @@
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
@@ -33,181 +38,257 @@
import org.slf4j.LoggerFactory;
/**
- * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to allow for the
- * reading and writing of data to and from a HBase cluster.
- *
+ * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to
+ * allow for the reading and writing of data to and from a HBase cluster.
+ *
* @see HBaseTap
*/
public class HBaseScheme extends Scheme
- {
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger( HBaseScheme.class );
-
- /** Field keyFields */
- private Fields keyFields;
- /** Field familyNames */
- private String[] familyNames;
- /** Field valueFields */
- private Fields[] valueFields;
-
- /** Field fields */
- private transient byte[][] fields;
-
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyFields of type Fields
- * @param familyName of type String
- * @param valueFields of type Fields
- */
- public HBaseScheme( Fields keyFields, String familyName, Fields valueFields )
+{
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class);
+
+ /** Field keyFields */
+ private Fields keyField;
+ /** String columnNames */
+ private String[] columnNames;
+ /** Field valueFields */
+ private Fields[] valueFields;
+ /** Field fields */
+ private transient byte[][] fields;
+
+ private boolean isFullyQualified = false;
+
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields
+ * of type Fields
+ * @param familyName
+ * of type String
+ * @param valueFields
+ * of type Fields
+ */
+ public HBaseScheme(Fields keyFields, String familyName, Fields valueFields)
{
- this( keyFields, new String[]{familyName}, Fields.fields( valueFields ) );
+ this(keyFields, new String[] { familyName }, Fields.fields(valueFields));
}
- /**
- * Constructor HBaseScheme creates a new HBaseScheme instance.
- *
- * @param keyFields of type Fields
- * @param familyNames of type String[]
- * @param valueFields of type Fields[]
- */
- public HBaseScheme( Fields keyFields, String[] familyNames, Fields[] valueFields )
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance.
+ *
+ * @param keyFields
+ * of type Fields
+ * @param familyNames
+ * of type String[]
+ * @param valueFields
+ * of type Fields[]
+ */
+ public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields)
{
- this.keyFields = keyFields;
- this.familyNames = familyNames;
- this.valueFields = valueFields;
+ this.keyField = keyFields;
+ //The column Names only holds the family Names.
+ this.columnNames = familyNames;
+ this.valueFields = valueFields;
- setSourceSink( this.keyFields, this.valueFields );
+ setSourceSink(this.keyField, this.valueFields);
- validate();
+ validate();
}
- private void validate()
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField
+ * of type String
+ * @param valueFields
+ * of type Fields
+ */
+ public HBaseScheme(Fields keyField, Fields valueFields)
{
- if( keyFields.size() != 1 )
- throw new IllegalArgumentException( "may only have one key field, found: " + keyFields.print() );
+ this(keyField, Fields.fields(valueFields));
}
- private void setSourceSink( Fields keyFields, Fields[] valueFields )
+ /**
+ * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names
+ *
+ * @param keyField
+ * of type Field
+ * @param valueFields
+ * of type Field[]
+ */
+ public HBaseScheme(Fields keyField, Fields[] valueFields)
{
- Fields allFields = Fields.join( keyFields, Fields.join( valueFields ) ); // prepend keys
+ //Set a flag that this is using fully qualified names
+ this.isFullyQualified = true;
+ this.keyField = keyField;
+ this.valueFields = valueFields;
+
+ //The column names include the familyName:columnName
+ this.columnNames = new String[valueFields.length];
+ for (int i = 0; i < valueFields.length; i++)
+ {
+ this.columnNames[i] = (String) valueFields[i].get(0);
+ }
+
+ validate();
+
+ setSourceSink(this.keyField, this.valueFields);
- setSourceFields( allFields );
- setSinkFields( allFields );
}
- /**
- * Method getFamilyNames returns the familyNames of this HBaseScheme object.
- *
- * @return the familyNames (type String[]) of this HBaseScheme object.
- */
- public String[] getFamilyNames()
+ private void validate()
{
- return familyNames;
+ if (keyField.size() != 1)
+ throw new IllegalArgumentException("may only have one key field, found: " + keyField.print());
}
- private byte[][] getFieldsBytes()
+ private void setSourceSink(Fields keyFields, Fields[] columnFields)
{
- if( fields == null )
- fields = makeBytes( familyNames, valueFields );
+ Fields allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend
- return fields;
+ setSourceFields(allFields);
+ setSinkFields(allFields);
+ }
+
+ /**
+ * Method getFamilyNames returns the set of familyNames of this HBaseScheme object.
+ *
+ * @return the familyNames (type String[]) of this HBaseScheme object.
+ */
+ public String[] getFamilyNames()
+ {
+ HashSet<String> familyNames = new HashSet<String>();
+ for (String columnName : columnNames)
+ {
+ if (isFullyQualified)
+ {
+ int pos = columnName.indexOf(":");
+ familyNames.add(hbaseColumn(pos>0?columnName.substring(0, pos):columnName));
+ } else
+ {
+ familyNames.add(hbaseColumn(columnName));
+ }
+ }
+ return familyNames.toArray(new String[0]);
}
- public Tuple source( Object key, Object value )
+ public Tuple source(Object key, Object value)
{
- Tuple result = new Tuple();
+ Tuple result = new Tuple();
- ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
- RowResult row = (RowResult) value;
+ ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
+ RowResult row = (RowResult) value;
- result.add( Bytes.toString( keyWritable.get() ) );
+ result.add(Bytes.toString(keyWritable.get()));
- for( byte[] bytes : getFieldsBytes() )
- {
- Cell cell = row.get( bytes );
- result.add( Bytes.toString( cell.getValue() ) );
- }
+ for (byte[] bytes : getFieldsBytes())
+ {
+ Cell cell = row.get(bytes);
+ result.add(cell != null ? Bytes.toString(cell.getValue()) : "");
+ }
- return result;
+ return result;
}
- public void sink( TupleEntry tupleEntry, OutputCollector outputCollector ) throws IOException
+ private byte[][] getFieldsBytes()
{
- Tuple key = tupleEntry.selectTuple( keyFields );
+ if( fields == null )
+ fields = makeBytes( this.columnNames, this.valueFields );
+
+ return fields;
+ }
- byte[] keyBytes = Bytes.toBytes( key.getString( 0 ) );
- BatchUpdate batchUpdate = new BatchUpdate( keyBytes );
+ public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException
+ {
+ Tuple key = tupleEntry.selectTuple(keyField);
- for( int i = 0; i < valueFields.length; i++ )
- {
- Fields fieldSelector = valueFields[ i ];
- TupleEntry values = tupleEntry.selectEntry( fieldSelector );
+ byte[] keyBytes = Bytes.toBytes(key.getString(0));
+ BatchUpdate batchUpdate = new BatchUpdate(keyBytes);
- for( int j = 0; j < values.getFields().size(); j++ )
+ for (int i = 0; i < valueFields.length; i++)
{
- Fields fields = values.getFields();
- Tuple tuple = values.getTuple();
- batchUpdate.put( familyNames[ i ] + ":" + fields.get( j ), Bytes.toBytes( tuple.getString( j ) ) );
+ Fields fieldSelector = valueFields[i];
+ TupleEntry values = tupleEntry.selectEntry(fieldSelector);
+
+ for (int j = 0; j < values.getFields().size(); j++)
+ {
+ Fields fields = values.getFields();
+ Tuple tuple = values.getTuple();
+ if (isFullyQualified)
+ batchUpdate.put(hbaseColumn(fields.get(j).toString()), Bytes.toBytes(tuple.getString(j)));
+ else
+ batchUpdate.put(hbaseColumn(columnNames[i]) + fields.get(j).toString(), Bytes.toBytes(tuple.getString(j)));
+
+ }
}
- }
- outputCollector.collect( null, batchUpdate );
+ outputCollector.collect(null, batchUpdate);
}
- public void sinkInit( Tap tap, JobConf conf ) throws IOException
+ public void sinkInit(Tap tap, JobConf conf) throws IOException
{
- conf.setOutputFormat( TableOutputFormat.class );
-
- conf.setOutputKeyClass( ImmutableBytesWritable.class );
- conf.setOutputValueClass( BatchUpdate.class );
+ conf.setOutputFormat(TableOutputFormat.class);
+
+ conf.setOutputKeyClass(ImmutableBytesWritable.class);
+ conf.setOutputValueClass(BatchUpdate.class);
}
- public void sourceInit( Tap tap, JobConf conf ) throws IOException
+ public void sourceInit(Tap tap, JobConf conf) throws IOException
{
- conf.setInputFormat( TableInputFormat.class );
+ conf.setInputFormat(TableInputFormat.class);
- String columns = getColumns();
- LOG.debug( "sourcing from columns: {}", columns );
+ String columns = getColumns();
+ LOG.debug("sourcing from columns: {}", columns);
- conf.set( TableInputFormat.COLUMN_LIST, columns );
+ conf.set(TableInputFormat.COLUMN_LIST, columns);
}
- private String getColumns()
+ private String getColumns()
{
- return Util.join( columns( familyNames, valueFields ), " " );
+ return Util.join(columns(this.columnNames, this.valueFields), " ");
}
- private String[] columns( String[] familyNames, Fields[] fieldsArray )
+ private String[] columns(String[] familyNames, Fields[] fieldsArray)
{
- int size = 0;
+ int size = 0;
- for( Fields fields : fieldsArray )
- size += fields.size();
+ for (Fields fields : fieldsArray)
+ size += fields.size();
- String[] columns = new String[size];
+ String[] columns = new String[size];
- for( int i = 0; i < fieldsArray.length; i++ )
- {
- Fields fields = fieldsArray[ i ];
+ for (int i = 0; i < fieldsArray.length; i++)
+ {
+ Fields fields = fieldsArray[i];
- for( int j = 0; j < fields.size(); j++ )
- columns[ i + j ] = familyNames[ i ] + ":" + (String) fields.get( j );
- }
+ for (int j = 0; j < fields.size(); j++)
+ if (isFullyQualified)
+ columns[i + j] = hbaseColumn((String) fields.get(j));
+ else
+ columns[i + j] = hbaseColumn(familyNames[i]) + (String) fields.get(j);
+ }
- return columns;
+ return columns;
}
- private byte[][] makeBytes( String[] familyNames, Fields[] fieldsArray )
+ private byte[][] makeBytes( String[] familyNames, Fields[] fieldsArray )
{
- String[] columns = columns( familyNames, fieldsArray );
- byte[][] bytes = new byte[columns.length][];
+ String[] columns = columns( familyNames, fieldsArray );
+ byte[][] bytes = new byte[columns.length][];
+
+ for( int i = 0; i < columns.length; i++ )
+ bytes[ i ] = Bytes.toBytes( columns[ i ] );
+
+ return bytes;
+ }
- for( int i = 0; i < columns.length; i++ )
- bytes[ i ] = Bytes.toBytes( columns[ i ] );
+ private String hbaseColumn(String column)
+ {
+ if (column.indexOf(":") < 0)
+ return column + ":";
+ return column;
- return bytes;
}
- }
+}
View
191 src/java/cascading/hbase/HBaseTap.java
@@ -37,154 +37,155 @@
import org.slf4j.LoggerFactory;
/**
- * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseScheme} to allow for the
- * reading and writing of data to and from a HBase cluster.
- *
+ * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme}
+ * to allow for the reading and writing of data to and from a HBase cluster.
+ *
*/
public class HBaseTap extends Tap
- {
- /** Field LOG */
- private static final Logger LOG = LoggerFactory.getLogger( HBaseTap.class );
-
- /** Field SCHEME */
- public static final String SCHEME = "hbase";
-
- /** Field tableName */
- private String tableName;
-
- /** Field hBaseAdmin */
- private transient HBaseAdmin hBaseAdmin;
-
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName of type String
- * @param hBaseScheme of type HBaseScheme
- */
- public HBaseTap( String tableName, HBaseScheme hBaseScheme )
+{
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class);
+
+ /** Field SCHEME */
+ public static final String SCHEME = "hbase";
+
+ /** Field hBaseAdmin */
+ private transient HBaseAdmin hBaseAdmin;
+
+ private String tableName;
+
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ */
+ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme)
{
- super( hBaseScheme, SinkMode.APPEND );
-
- this.tableName = tableName;
+ super(HBaseFullScheme, SinkMode.APPEND);
+ this.tableName = tableName;
}
- /**
- * Constructor HBaseTap creates a new HBaseTap instance.
- *
- * @param tableName of type String
- * @param hBaseScheme of type HBaseScheme
- * @param sinkMode of type SinkMode
- */
- public HBaseTap( String tableName, HBaseScheme hBaseScheme, SinkMode sinkMode )
+ /**
+ * Constructor HBaseTap creates a new HBaseTap instance.
+ *
+ * @param tableName
+ * of type String
+ * @param HBaseFullScheme
+ * of type HBaseFullScheme
+ * @param sinkMode
+ * of type SinkMode
+ */
+ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode)
{
- super( hBaseScheme, sinkMode );
-
- this.tableName = tableName;
+ super(HBaseFullScheme, sinkMode);
+ this.tableName = tableName;
}
- private URI getURI()
+ private URI getURI()
{
- try
- {
- return new URI( SCHEME, tableName, null );
- }
- catch( URISyntaxException exception )
- {
- throw new TapException( "unable to create uri", exception );
- }
+ try
+ {
+ return new URI(SCHEME, tableName, null);
+ } catch (URISyntaxException exception)
+ {
+ throw new TapException("unable to create uri", exception);
+ }
}
- public Path getPath()
+ public Path getPath()
{
- return new Path( getURI().toString() );
+ return new Path(getURI().toString());
}
- public TupleEntryIterator openForRead( JobConf conf ) throws IOException
+ public TupleEntryIterator openForRead(JobConf conf) throws IOException
{
- return new TupleEntryIterator( getSourceFields(), new TapIterator( this, conf ) );
+ return new TupleEntryIterator(getSourceFields(), new TapIterator(this, conf));
}
- public TupleEntryCollector openForWrite( JobConf conf ) throws IOException
+ public TupleEntryCollector openForWrite(JobConf conf) throws IOException
{
- return new TapCollector( this, conf );
+ return new TapCollector(this, conf);
}
- private HBaseAdmin getHBaseAdmin() throws MasterNotRunningException
+ private HBaseAdmin getHBaseAdmin() throws MasterNotRunningException
{
- if( hBaseAdmin == null )
- hBaseAdmin = new HBaseAdmin( new HBaseConfiguration() );
+ if (hBaseAdmin == null)
+ hBaseAdmin = new HBaseAdmin(new HBaseConfiguration());
- return hBaseAdmin;
+ return hBaseAdmin;
}
- public boolean makeDirs( JobConf conf ) throws IOException
+ public boolean makeDirs(JobConf conf) throws IOException
{
- HBaseAdmin hBaseAdmin = getHBaseAdmin();
+ HBaseAdmin hBaseAdmin = getHBaseAdmin();
- if( hBaseAdmin.tableExists( tableName ) )
- return true;
+ if (hBaseAdmin.tableExists(tableName))
+ return true;
- LOG.debug( "creating hbase table: {}", tableName );
+ LOG.debug("creating hbase table: {}", tableName);
- HTableDescriptor tableDescriptor = new HTableDescriptor( tableName );
+ HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
- String[] familyNames = ( (HBaseScheme) getScheme() ).getFamilyNames();
+ String[] familyNames = ((HBaseScheme) getScheme()).getFamilyNames();
- for( String familyName : familyNames )
- tableDescriptor.addFamily( new HColumnDescriptor( familyName + ":" ) );
+ for (String familyName : familyNames)
+ tableDescriptor.addFamily(new HColumnDescriptor(familyName));
- hBaseAdmin.createTable( tableDescriptor );
+ hBaseAdmin.createTable(tableDescriptor);
- return true;
+ return true;
}
- public boolean deletePath( JobConf conf ) throws IOException
+ public boolean deletePath(JobConf conf) throws IOException
{
- // eventually keep table meta-data to source table create
- HBaseAdmin hBaseAdmin = getHBaseAdmin();
+ // eventually keep table meta-data to source table create
+ HBaseAdmin hBaseAdmin = getHBaseAdmin();
- if( !hBaseAdmin.tableExists( tableName ) )
- return true;
+ if (!hBaseAdmin.tableExists(tableName))
+ return true;
- LOG.debug( "deleting hbase table: {}", tableName );
+ LOG.debug("deleting hbase table: {}", tableName);
- hBaseAdmin.disableTable( tableName );
- hBaseAdmin.deleteTable( tableName );
+ hBaseAdmin.disableTable(tableName);
+ hBaseAdmin.deleteTable(tableName);
- return true;
+ return true;
}
- public boolean pathExists( JobConf conf ) throws IOException
+ public boolean pathExists(JobConf conf) throws IOException
{
- return getHBaseAdmin().tableExists( tableName );
+ return getHBaseAdmin().tableExists(tableName);
}
- public long getPathModified( JobConf conf ) throws IOException
+ public long getPathModified(JobConf conf) throws IOException
{
- return System.currentTimeMillis(); // currently unable to find last mod time on a table
+ return System.currentTimeMillis(); // currently unable to find last mod time on a table
}
- @Override
- public void sinkInit( JobConf conf ) throws IOException
+ @Override
+ public void sinkInit(JobConf conf) throws IOException
{
- LOG.debug( "sinking to table: {}", tableName );
+ LOG.debug("sinking to table: {}", tableName);
- // do not delete if initialized from within a task
- if( isReplace() && conf.get( "mapred.task.partition" ) == null )
- deletePath( conf );
+ // do not delete if initialized from within a task
+ if (isReplace() && conf.get("mapred.task.partition") == null)
+ deletePath(conf);
- makeDirs( conf );
+ makeDirs(conf);
- conf.set( TableOutputFormat.OUTPUT_TABLE, tableName );
- super.sinkInit( conf );
+ conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ super.sinkInit(conf);
}
- @Override
- public void sourceInit( JobConf conf ) throws IOException
+ @Override
+ public void sourceInit(JobConf conf) throws IOException
{
- LOG.debug( "sourcing from table: {}", tableName );
+ LOG.debug("sourcing from table: {}", tableName);
- FileInputFormat.addInputPaths( conf, tableName );
- super.sourceInit( conf );
+ FileInputFormat.addInputPaths(conf, tableName);
+ super.sourceInit(conf);
}
- }
+}
View
133 src/test/cascading/hbase/HBaseFullTest.java
@@ -1,133 +0,0 @@
-/*
- * Copyright (c) 2009 Concurrent, Inc.
- *
- * This work has been released into the public domain
- * by the copyright holder. This applies worldwide.
- *
- * In case this is not legally possible:
- * The copyright holder grants any entity the right
- * to use this work for any purpose, without any
- * conditions, unless such conditions are required by law.
- */
-
-package cascading.hbase;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.hbase.HBaseClusterTestCase;
-import org.junit.Test;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowConnector;
-import cascading.operation.Aggregator;
-import cascading.operation.Identity;
-import cascading.operation.aggregator.Count;
-import cascading.operation.regex.RegexSplitter;
-import cascading.pipe.Each;
-import cascading.pipe.Every;
-import cascading.pipe.GroupBy;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.TextLine;
-import cascading.tap.Hfs;
-import cascading.tap.Lfs;
-import cascading.tap.SinkMode;
-import cascading.tap.Tap;
-import cascading.tuple.Fields;
-import cascading.tuple.TupleEntryIterator;
-
-/**
- *
- */
-public class HBaseFullTest extends HBaseClusterTestCase
-{
- transient private static Properties properties = new Properties();
-
- String inputFile = "src/test/data/small.txt";
-
- public HBaseFullTest()
- {
- super( 1, false );
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- }
-
- @Test
- public void testHBaseMultiFamily() throws IOException
- {
- // create flow to read from local file and insert into HBase
- Tap source = new Lfs(new TextLine(), inputFile);
-
- Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "content:lower",
- "content:upper"), " "));
-
- Fields keyField = new Fields("num");
- Fields columnFields = new Fields( "content:lower","content:upper");
- Tap hBaseTap = new HBaseFullTap("multitable", new HBaseFullScheme(keyField, columnFields), SinkMode.REPLACE);
-
- Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);
-
- parseFlow.complete();
-
- verifySink(parseFlow, 5);
-
- // create flow to read from hbase and save to local file
- Tap sink = new Hfs(new TextLine(), "multifamily", SinkMode.REPLACE);
-
- Pipe copyPipe = new Each("read", new Identity());
-
- Flow copyFlow = new FlowConnector(properties).connect(hBaseTap, sink, copyPipe);
-
- copyFlow.complete();
-
- verifySink(copyFlow, 5);
- }
-
- private void verifySink(Flow flow, int expects) throws IOException
- {
- int count = 0;
-
- TupleEntryIterator iterator = flow.openSink();
-
- while (iterator.hasNext())
- {
- count++;
- System.out.println("iterator.next() = " + iterator.next());
- }
-
- iterator.close();
-
- assertEquals("wrong number of values", expects, count);
- }
-
- @Test
- public void testGroupByCount() throws IOException
- {
-
- Fields keyField = new Fields("num");
- Fields columnFields = new Fields( "content:lower","content:upper");
- Tap source = new HBaseFullTap("multitable", new HBaseFullScheme(keyField, columnFields), SinkMode.REPLACE);
-
- Scheme sinkScheme = new TextLine(new Fields("content:lower", "count"));
- Tap sink = new Hfs(sinkScheme, "lowercount", true);
-
- Pipe assembly = new Pipe("lowercount");
- assembly = new GroupBy(assembly, new Fields("content:lower"));
- Aggregator count = new Count(new Fields("count"));
- assembly = new Every(assembly, count);
-
- Flow copyFlow = new FlowConnector(properties).connect(source, sink, assembly);
-
- copyFlow.complete();
-
- }
-
-}
View
9 src/test/cascading/hbase/HBaseTest.java
@@ -49,7 +49,7 @@
public HBaseTest()
{
- super( 1, false );
+ super( 1, true );
}
@Override
@@ -95,13 +95,12 @@ public void testHBaseMultiFamilyCascade() throws IOException
// create flow to read from local file and insert into HBase
Tap source = new Lfs( new TextLine(), inputFile );
- Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "ignore", "lower", "upper" ), " " ) );
+ Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "ignore", "left:lower", "right:upper" ), " " ) );
parsePipe = new Each( parsePipe, new ExpressionFunction( new Fields( "num" ), "(int) (Math.random() * Integer.MAX_VALUE)" ), Fields.ALL );
Fields keyFields = new Fields( "num" );
- String[] familyNames = {"left", "right"};
- Fields[] valueFields = new Fields[]{new Fields( "lower" ), new Fields( "upper" )};
- Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ) );
+ Fields[] valueFields = new Fields[]{new Fields( "left:lower" ), new Fields( "right:upper" )};
+ Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, valueFields ) );
Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );

0 comments on commit d84c533

Please sign in to comment.
Something went wrong with that request. Please try again.