Skip to content

Commit

Permalink
Have connector do ORM mapping based on different object types.
Browse files Browse the repository at this point in the history
Change-Id: I0dcec18292b658cc1f2d3a9d6fb17ce07956e75b
  • Loading branch information
Mike Wiederhold authored and ingenthr committed Feb 22, 2012
1 parent 482de54 commit 1e84cb4
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.xml
Expand Up @@ -20,7 +20,7 @@

<!-- Directory variables -->
<property name="name" value="couchbase-hadoop-plugin"/>
<property name="version" value="1.0"/>
<property name="version" value="1.1-dp"/>
<property name="lib.dir" value="${basedir}/lib" />
<property name="ivy.dir" location="${basedir}/ivy" />
<property name="build.dir" value="${basedir}/build" />
Expand Down
2 changes: 1 addition & 1 deletion etc/couchbase-manager.xml
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

com.couchbase.sqoop.manager.CouchbaseFactory=lib/couchsqoop-plugin-1.0.jar
com.couchbase.sqoop.manager.CouchbaseFactory=lib/couchbase-hadoop-plugin-1.1-dp.jar
2 changes: 1 addition & 1 deletion etc/install.sh
Expand Up @@ -27,7 +27,7 @@ function check_file_exists {

f_config=$PWD/couchbase-config.xml
f_manager=$PWD/couchbase-manager.xml
f_plugin=$PWD/couchbase-hadoop-plugin-1.0.jar
f_plugin=$PWD/couchbase-hadoop-plugin-1.1-dp.jar

if [ $# -ne 1 ]; then
echo "usage: ./install.sh path_to_sqoop_home"
Expand Down
1 change: 1 addition & 0 deletions src/java/com/couchbase/sqoop/manager/CouchbaseFactory.java
Expand Up @@ -36,6 +36,7 @@ public class CouchbaseFactory extends ManagerFactory {
public static final Log LOG = LogFactory.getLog(
DefaultManagerFactory.class.getName());

@Override
public ConnManager accept(JobData data) {
SqoopOptions options = data.getSqoopOptions();

Expand Down
1 change: 1 addition & 0 deletions src/java/com/couchbase/sqoop/manager/CouchbaseManager.java
Expand Up @@ -148,6 +148,7 @@ public void importTable(ImportJobContext context) throws IOException,
/**
* Export data stored in HDFS into Membase/Couchbase.
*/
@Override
public void exportTable(ExportJobContext context) throws IOException,
ExportException {
context.setConnManager(this);
Expand Down
50 changes: 50 additions & 0 deletions src/java/com/couchbase/sqoop/mapreduce/CouchbaseImportMapper.java
@@ -0,0 +1,50 @@
package com.couchbase.sqoop.mapreduce;

import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;

import java.io.IOException;
import java.sql.SQLException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CouchbaseImportMapper
extends AutoProgressMapper<Text, SqoopRecord, Text, NullWritable> {

private Text outkey;
private LargeObjectLoader lobLoader;

public CouchbaseImportMapper() {
outkey = new Text();
}

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
FileOutputFormat.getWorkOutputPath(context));
}

@Override
public void map(Text key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
val.loadLargeObjects(lobLoader);
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}

outkey.set(val.toString());
context.write(outkey, NullWritable.get());
}

@Override
protected void cleanup(Context context) throws IOException {
if (null != lobLoader) {
lobLoader.close();
}
}
}
Expand Up @@ -20,6 +20,7 @@
import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/**
Expand Down Expand Up @@ -84,4 +85,9 @@ public String getInputTableName() {
public void setOutputTableName(String tableName) {
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
}

public void setMapperClass(Class<? extends Mapper> mapperClass)
throws IllegalStateException {
conf.setClass("mapreduce.map.class", mapperClass, Mapper.class);
}
}
Expand Up @@ -17,6 +17,8 @@
package com.couchbase.sqoop.mapreduce.db;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.couchbase.client.CouchbaseClient;
import com.couchbase.sqoop.mapreduce.CouchbaseImportMapper;

import com.couchbase.client.CouchbaseClient;

Expand All @@ -29,6 +31,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -44,7 +47,7 @@
*
*/
public class CouchbaseInputFormat<T extends DBWritable> extends
InputFormat<LongWritable, T> implements Configurable {
InputFormat<Text, T> implements Configurable {

private String tableName;

Expand All @@ -53,6 +56,7 @@ public class CouchbaseInputFormat<T extends DBWritable> extends
@Override
public void setConf(Configuration conf) {
dbConf = new CouchbaseConfiguration(conf);
dbConf.setMapperClass(CouchbaseImportMapper.class);
tableName = dbConf.getInputTableName();
}

Expand All @@ -67,13 +71,13 @@ public CouchbaseConfiguration getDBConf() {

@Override
/** {@inheritDoc} */
public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
public RecordReader<Text, T> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return createRecordReader(split, context.getConfiguration());
}

@SuppressWarnings("unchecked")
public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
public RecordReader<Text, T> createRecordReader(InputSplit split,
Configuration conf)
throws IOException, InterruptedException {
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
Expand Down
Expand Up @@ -16,11 +16,16 @@

package com.couchbase.sqoop.mapreduce.db;

import com.cloudera.sqoop.lib.SqoopRecord;

import com.couchbase.client.TapClient;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
Expand All @@ -29,15 +34,17 @@

import javax.naming.ConfigurationException;


import net.spy.memcached.tapmessage.MessageBuilder;
import net.spy.memcached.tapmessage.ResponseMessage;
import net.spy.memcached.tapmessage.TapStream;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.SerializingTranscoder;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
Expand All @@ -51,7 +58,7 @@
* key and DBWritables as value.
*/
public class CouchbaseRecordReader<T extends DBWritable>
extends RecordReader<LongWritable, T> {
extends RecordReader<Text, T> {

private static final Log LOG =
LogFactory.getLog(CouchbaseRecordReader.class);
Expand All @@ -60,7 +67,7 @@ public class CouchbaseRecordReader<T extends DBWritable>

private Configuration conf;

private LongWritable key = null;
private Text key = null;

private T value = null;

Expand All @@ -83,8 +90,7 @@ public CouchbaseRecordReader(Class<T> inputClass, CouchbaseInputSplit split,
String user = dbConf.getUsername();
String pass = dbConf.getPassword();
String url = dbConf.getUrlProperty();
this.client = new TapClient(Arrays.asList(new URI(url)), user,
pass);
this.client = new TapClient(Arrays.asList(new URI(url)), user, pass);
} catch (URISyntaxException e) {
LOG.error("Bad URI Syntax: " + e.getMessage());
client.shutdown();
Expand All @@ -97,7 +103,7 @@ public void close() throws IOException {
}

@Override
public LongWritable getCurrentKey() throws IOException,
public Text getCurrentKey() throws IOException,
InterruptedException {
LOG.info("Key: " + key);
return key;
Expand Down Expand Up @@ -173,50 +179,41 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
ResponseMessage message;
while ((message = client.getNextMessage()) == null) {
if (!client.hasMoreMessages()) {
LOG.error("No More Messages\n");
return false;
}
}

byte[] mkey = null;
byte[] mvalue = null;
ByteBuffer buf;
int bufLen = 4;

mkey = message.getKey().getBytes();
bufLen += mkey.length;

mvalue = message.getValue();
bufLen += mvalue.length;
buf = ByteBuffer.allocate(bufLen);

if (key == null) {
key = new LongWritable();
}
if (value == null) {
/* Will create a new value based on the generated ORM mapper. */
value = ReflectionUtils.newInstance(inputClass, conf);
}

key.set(client.getMessagesRead());
if (mkey != null) {
buf.put((byte)0);
buf.put((byte)mkey.length);
for (int i = 0; i < mkey.length; i++) {
buf.put(mkey[i]);
}
String recordKey = message.getKey();
if (recordKey == null) {
((SqoopRecord)value).setField("Key", null);
LOG.fatal("Received record with no key. Attempting to continue."
+ " ResponseMessage received:\n" + message);
} else {
((SqoopRecord)value).setField("Key", recordKey);
}

if (mvalue != null) {
buf.put((byte)0);
buf.put((byte)mvalue.length);
for (int i = 0; i < mvalue.length; i++) {
buf.put(mvalue[i]);
}
}
((SqoopRecord)value).setField("Value", (deserialize(message)).toString());


ByteArrayInputStream in = new ByteArrayInputStream(buf.array());
DataInputStream dataIn = new DataInputStream(in);
((Writable)value).readFields(dataIn);
dataIn.close();
return true;
}

/**
* Attempt to get the object represented by the given serialized bytes.
*/
private Object deserialize(ResponseMessage message) {
SerializingTranscoder tc = new SerializingTranscoder();
CachedData d = new CachedData(message.getItemFlags(), message.getValue(),
CachedData.MAX_SIZE);
Object rv = null;
rv = tc.decode(d);
return rv;
}

}

0 comments on commit 1e84cb4

Please sign in to comment.