Skip to content

Commit

Permalink
using family names as value fields
Browse files Browse the repository at this point in the history
  • Loading branch information
drujensen committed Feb 13, 2009
1 parent b5154e0 commit 1c8ed07
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 0 deletions.
177 changes: 177 additions & 0 deletions src/java/cascading/hbase/HBaseFullScheme.java
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2009 Concurrent, Inc.
*
* This work has been released into the public domain
* by the copyright holder. This applies worldwide.
*
* In case this is not legally possible:
* The copyright holder grants any entity the right
* to use this work for any purpose, without any
* conditions, unless such conditions are required by law.
*/

package cascading.hbase;

import java.io.IOException;

import cascading.scheme.Scheme;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tap.Tap;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction
* with the {@HBaseTap} to allow for the reading and writing of data
* to and from a HBase cluster.
*
* @see HBaseTap
*/
public class HBaseFullScheme extends Scheme {
/** Field LOG */
private static final Logger LOG = LoggerFactory
.getLogger(HBaseFullScheme.class);

/** Field keyFields */
private String keyName;
private Fields keyFields;

/** Field valueFields */
private String[] familyNames;
private Fields[] valueFields;

/**
* Constructor HBaseScheme creates a new HBaseScheme instance.
*
* @param keyName
* of type String
* @param familyName
* of type String
*/
public HBaseFullScheme(String keyName, String familyName) {
this(keyName, new String[] { familyName });
}

/**
* Constructor HBaseScheme creates a new HBaseScheme instance.
*
* @param keyName
* of type String
* @param familyNames
* of type String[]
*/
public HBaseFullScheme(String keyName, String[] familyNames) {
this.keyName = keyName;
this.keyFields = new Fields(keyName);
this.familyNames = familyNames;

this.valueFields = new Fields[familyNames.length];
for (int i = 0; i < familyNames.length; i++) {
this.valueFields[i] = new Fields(familyNames[i]);
}

setSourceSink(this.keyFields, this.valueFields);

}

private void setSourceSink(Fields keyFields, Fields[] valueFields) {
Fields allFields = Fields.join(keyFields, Fields.join(valueFields)); // prepend
// keys

setSourceFields(allFields);
setSinkFields(allFields);
}

public String getKeyName() {
return keyName;
}

/**
* Method getFamilyNames returns the familyNames of this HBaseScheme object.
*
* @return the familyNames (type String[]) of this HBaseScheme object.
*/
public String[] getFamilyNames() {
return familyNames;
}

public Tuple source(Object key, Object value) {
Tuple result = new Tuple();

ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
RowResult row = (RowResult) value;

result.add(Bytes.toString(keyWritable.get()));

for (byte[] bytes : row.keySet()) {
Cell cell = row.get(bytes);
result.add(Bytes.toString(cell.getValue()));
}

return result;
}

public void sink(TupleEntry tupleEntry, OutputCollector outputCollector)
throws IOException {
Tuple key = tupleEntry.selectTuple(keyFields);

byte[] keyBytes = Bytes.toBytes(key.getString(0));
BatchUpdate batchUpdate = new BatchUpdate(keyBytes);

for (int i = 0; i < valueFields.length; i++) {
Fields fieldSelector = valueFields[i];
TupleEntry values = tupleEntry.selectEntry(fieldSelector);

for (int j = 0; j < values.getFields().size(); j++) {
Fields fields = values.getFields();
Tuple tuple = values.getTuple();
batchUpdate.put(hbaseColumn(fields.get(j).toString()), Bytes.toBytes(tuple.getString(j)));
}
}

outputCollector.collect(null, batchUpdate);
}

public void sinkInit(Tap tap, JobConf conf) throws IOException {
conf.setOutputFormat(TableOutputFormat.class);

conf.setOutputKeyClass(ImmutableBytesWritable.class);
conf.setOutputValueClass(BatchUpdate.class);
}

public void sourceInit(Tap tap, JobConf conf) throws IOException {
conf.setInputFormat(TableInputFormat.class);

String columns = getColumns();
LOG.debug("sourcing from columns: {}", columns);

conf.set(TableInputFormat.COLUMN_LIST, columns);
}

private String getColumns() {
String columns = new String();
for (String family : familyNames) {
columns += hbaseColumn(family) + " ";
}
return columns;
}

private String hbaseColumn(String field) {
return field + ":";
}

}
187 changes: 187 additions & 0 deletions src/java/cascading/hbase/HBaseFullTap.java
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2009 Concurrent, Inc.
*
* This work has been released into the public domain
* by the copyright holder. This applies worldwide.
*
* In case this is not legally possible:
* The copyright holder grants any entity the right
* to use this work for any purpose, without any
* conditions, unless such conditions are required by law.
*/

package cascading.hbase;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.TapIterator;
import cascading.tap.hadoop.TapCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.flow.Flow;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme} to allow for the
* reading and writing of data to and from a HBase cluster.
*
*/
public class HBaseFullTap extends Tap
{
/** Field LOG */
private static final Logger LOG = LoggerFactory.getLogger( HBaseFullTap.class );

/** Field SCHEME */
public static final String SCHEME = "hbase";

/** Field hBaseAdmin */
private transient HBaseAdmin hBaseAdmin;

private String tableName;

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
*/
public HBaseFullTap( String tableName, HBaseFullScheme HBaseFullScheme )
{
super( HBaseFullScheme, SinkMode.APPEND );
this.tableName = tableName;
}

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
* @param sinkMode of type SinkMode
*/
public HBaseFullTap( String tableName, HBaseFullScheme HBaseFullScheme, SinkMode sinkMode )
{
super( HBaseFullScheme, sinkMode );
this.tableName = tableName;
}

private URI getURI()
{
try
{
return new URI( SCHEME, tableName, null );
}
catch( URISyntaxException exception )
{
throw new TapException( "unable to create uri", exception );
}
}

public Path getPath()
{
return new Path( getURI().toString() );
}

public TupleEntryIterator openForRead( JobConf conf ) throws IOException
{
return new TupleEntryIterator( getSourceFields(), new TapIterator( this, conf ) );
}

public TupleEntryCollector openForWrite( JobConf conf ) throws IOException
{
return new TapCollector( this, conf );
}

private HBaseAdmin getHBaseAdmin() throws MasterNotRunningException
{
if( hBaseAdmin == null )
hBaseAdmin = new HBaseAdmin( new HBaseConfiguration() );

return hBaseAdmin;
}

public boolean makeDirs( JobConf conf ) throws IOException
{
HBaseAdmin hBaseAdmin = getHBaseAdmin();

if( hBaseAdmin.tableExists( tableName ) )
return true;

LOG.debug( "creating hbase table: {}", tableName );

HTableDescriptor tableDescriptor = new HTableDescriptor( tableName );

String[] familyNames = ( (HBaseFullScheme) getScheme() ).getFamilyNames();

for( String familyName : familyNames )
tableDescriptor.addFamily( new HColumnDescriptor( familyName + ":" ) );

hBaseAdmin.createTable( tableDescriptor );

return true;
}

public boolean deletePath( JobConf conf ) throws IOException
{
// eventually keep table meta-data to source table create
HBaseAdmin hBaseAdmin = getHBaseAdmin();

if( !hBaseAdmin.tableExists( tableName ) )
return true;

LOG.debug( "deleting hbase table: {}", tableName );

hBaseAdmin.disableTable( tableName );
hBaseAdmin.deleteTable( tableName );

return true;
}

public boolean pathExists( JobConf conf ) throws IOException
{
return getHBaseAdmin().tableExists( tableName );
}

public long getPathModified( JobConf conf ) throws IOException
{
return System.currentTimeMillis(); // currently unable to find last mod time on a table
}

@Override
public void sinkInit( JobConf conf ) throws IOException
{
LOG.debug( "sinking to table: {}", tableName );

// do not delete if initialized from within a task
if( isReplace() && conf.get( "mapred.task.partition" ) == null )
deletePath( conf );

makeDirs( conf );

conf.set( TableOutputFormat.OUTPUT_TABLE, tableName );
super.sinkInit( conf );
}

@Override
public void sourceInit( JobConf conf ) throws IOException
{
LOG.debug( "sourcing from table: {}", tableName );

FileInputFormat.addInputPaths( conf, tableName );
super.sourceInit( conf );
}
}

0 comments on commit 1c8ed07

Please sign in to comment.