Skip to content

Commit

Permalink
DOCS-26. Update importer to use KijiBulkImporter.
Browse files Browse the repository at this point in the history
Mark other uses of org.kiji.schema.mapreduce as
explicitly @deprecated and refer to appropriate
KijiMR components.

Signed-off-by: Lee Sheng <lsheng@wibidata.com>
  • Loading branch information
Aaron Kimball authored and Lee Sheng committed Feb 22, 2013
1 parent c269bce commit 398be8e
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 98 deletions.
3 changes: 3 additions & 0 deletions RELEASE_NOTES.txt
Expand Up @@ -3,6 +3,9 @@


Version 1.0.0-rc4
* DOCS-26. Update bulk importing to use KijiMR; deprecate MapReduce
examples. Check out the "Kiji Music" example project for a
complete KijiMR example application.
* SCHEMA-133. Update usage to match API from KjiiSchema 1.0.0-rc4
(use KijiDataRequestBuilders instead of mutable KijiDataRequests.)

Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -39,6 +39,7 @@
<kiji-schema.version>1.0.0-rc4-SNAPSHOT</kiji-schema.version>
<kiji-schema-shell.version>${kiji-schema.version}</kiji-schema-shell.version>
<kiji-cdh4-platform.version>${kiji-schema.version}</kiji-cdh4-platform.version>
<kiji-mapreduce.version>1.0.0-rc4-SNAPSHOT</kiji-mapreduce.version>
</properties>

<build>
Expand Down Expand Up @@ -112,6 +113,12 @@
<version>${kiji-schema.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kiji.mapreduce</groupId>
<artifactId>kiji-mapreduce</artifactId>
<version>${kiji-mapreduce.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Expand Up @@ -50,7 +50,15 @@

/**
* Extracts fields from the address column into individual columns in the derived column family.
*
* @deprecated using "Raw" MapReduce over Kiji tables is no longer the preferred
* mechanism for iterating over rows of a Kiji table. To write a function that
* processes and updates a table in a row-by-row fashion, you should extend {@link
* org.kiji.mapreduce.produce.KijiProducer}. You should use {@link
* org.kiji.mapreduce.produce.KijiProduceJobBuilder} for constructing such MapReduce
* jobs.
*/
@Deprecated
public class AddressFieldExtractor extends Configured implements Tool {
/** Name of the table to read for phonebook entries. */
public static final String TABLE_NAME = "phonebook";
Expand Down
Expand Up @@ -53,6 +53,14 @@

/**
* Deletes all entries from the phonebook table that have an address from a particular US state.
*
* @deprecated using "Raw" MapReduce is no longer the preferred mechanism to iterate
* over rows of a Kiji table. You should instead use the KijiMR library. To write a
* function that processes and updates a table in a row-by-row fashion, you should
* extend {@link org.kiji.mapreduce.produce.KijiProducer}. You should use {@link
* org.kiji.mapreduce.produce.KijiProduceJobBuilder} for constructing such MapReduce
* jobs. You will still need to explicitly open a KijiTableWriter to call {@link
* KijiTableWriter#deleteRow}.
*/
public class DeleteEntriesByState extends Configured implements Tool {
/** Name of the table to read for phonebook entries. */
Expand Down
Expand Up @@ -54,6 +54,14 @@
* Reads an input file that lists the number of minutes of talk time
* per person. The talk time is incremented in the person's record in
* the phone book table.
*
* @deprecated using "Raw" MapReduce jobs that interact with Kiji tables is no longer
* the preferred mechanism. The <tt>org.kiji.schema.mapreduce</tt> classes are
* deprecated and may be removed in a future version of KijiSchema. You should instead
* use the KijiMR library, extend the {@link org.kiji.mapreduce.KijiMapper} class and
* use the {@link org.kiji.mapreduce.KijiMapReduceJobBuilder} class to configure such
* jobs, rather than use classes like {@link DistributedCacheJars} and {@link
* KijiConfKeys}.
*/
public class IncrementTalkTime extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(IncrementTalkTime.class);
Expand Down
123 changes: 35 additions & 88 deletions src/main/java/org/kiji/examples/phonebook/PhonebookImporter.java
Expand Up @@ -19,43 +19,33 @@

package org.kiji.examples.phonebook;

import java.io.File;
import java.io.IOException;

import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.GenericTableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.kiji.mapreduce.KijiTableContext;
import org.kiji.mapreduce.MapReduceJob;
import org.kiji.mapreduce.bulkimport.KijiBulkImportJobBuilder;
import org.kiji.mapreduce.bulkimport.KijiBulkImporter;
import org.kiji.mapreduce.input.TextMapReduceJobInput;
import org.kiji.mapreduce.output.DirectKijiTableMapReduceJobOutput;
import org.kiji.schema.EntityId;
import org.kiji.schema.Kiji;
import org.kiji.schema.KijiTable;
import org.kiji.schema.KijiTableWriter;
import org.kiji.schema.KijiURI;
import org.kiji.schema.KijiURIException;
import org.kiji.schema.mapreduce.DistributedCacheJars;
import org.kiji.schema.mapreduce.KijiConfKeys;
import org.kiji.schema.util.ResourceUtils;

/**
* Shell for the PhonebookImportMapper class. This class manages
* the command line arguments and job setup for the mapper class.
* the command line arguments and job setup for the bulk importer.
*/
public class PhonebookImporter extends Configured implements Tool {
/** Name of the table to insert phonebook entries into. */
Expand All @@ -65,34 +55,15 @@ public class PhonebookImporter extends Configured implements Tool {
* Map task that will parse user records from a text file and insert the records
* into the phonebook table.
*/
public static class PhonebookImportMapper
extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(PhonebookImportMapper.class);
public static class PhonebookBulkImporter
extends KijiBulkImporter<LongWritable, Text> {

private Kiji mKiji;
private KijiTable mTable;
private KijiTableWriter mWriter;
private static final Logger LOG = LoggerFactory.getLogger(PhonebookBulkImporter.class);

/** {@inheritDoc} */
@Override
protected void setup(Context hadoopContext) throws IOException, InterruptedException {
super.setup(hadoopContext);
final Configuration conf = hadoopContext.getConfiguration();
KijiURI tableURI;
try {
tableURI = KijiURI.newBuilder(conf.get(KijiConfKeys.OUTPUT_KIJI_TABLE_URI)).build();
} catch (KijiURIException kue) {
throw new IOException(kue);
}
mKiji = Kiji.Factory.open(tableURI, conf);
mTable = mKiji.openTable(TABLE_NAME);
mWriter = mTable.openTableWriter();
}

/** {@inheritDoc} */
@Override
public void map(LongWritable byteOffset, Text line, Context hadoopContext)
throws IOException, InterruptedException {
public void produce(LongWritable byteOffset, Text line, KijiTableContext context)
throws IOException {
// Each line of the text file has the form:
//
// firstname|lastname|email|telephone|addressJson
Expand Down Expand Up @@ -122,53 +93,33 @@ public void map(LongWritable byteOffset, Text line, Context hadoopContext)
final Address streetAddr = datumReader.read(null, decoder);

// Create a row ID with the first and last name.
final EntityId user = mTable.getEntityId(firstName + "," + lastName);
final EntityId user = context.getEntityId(firstName + "," + lastName);

// Write the fields to appropriate table columns in the row.
// The column names are specified as constants in the Fields.java class.
mWriter.put(user, Fields.INFO_FAMILY, Fields.FIRST_NAME, firstName);
mWriter.put(user, Fields.INFO_FAMILY, Fields.LAST_NAME, lastName);
mWriter.put(user, Fields.INFO_FAMILY, Fields.EMAIL, email);
mWriter.put(user, Fields.INFO_FAMILY, Fields.TELEPHONE, telephone);
mWriter.put(user, Fields.INFO_FAMILY, Fields.ADDRESS, streetAddr);
}

/** {@inheritDoc} */
@Override
protected void cleanup(Context hadoopContext) throws IOException, InterruptedException {
ResourceUtils.closeOrLog(mWriter);
ResourceUtils.closeOrLog(mTable);
ResourceUtils.releaseOrLog(mKiji);
super.cleanup(hadoopContext);
context.put(user, Fields.INFO_FAMILY, Fields.FIRST_NAME, firstName);
context.put(user, Fields.INFO_FAMILY, Fields.LAST_NAME, lastName);
context.put(user, Fields.INFO_FAMILY, Fields.EMAIL, email);
context.put(user, Fields.INFO_FAMILY, Fields.TELEPHONE, telephone);
context.put(user, Fields.INFO_FAMILY, Fields.ADDRESS, streetAddr);
}
}

/**
* Configure the MapReduce job to run the import.
*
* @param job the MapReduce Job object to configure.
* @param inputPath the Path to the input data.
* @param tableUri the URI to the destination table for the import.
* @return a MapReduceJob that's ready to run.
* @throws IOException if there's an error interacting with the job or the Kiji URI.
*/
void configureJob(Job job, Path inputPath) throws IOException {
// Read from a text file.
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, inputPath);

// Run the mapper that will import entries from the input file.
job.setMapperClass(PhonebookImportMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);

// Use no reducer (this is a map-only job).
job.setNumReduceTasks(0);
// Since table writers do not emit any key-value pairs, we set the output format to Null.
job.setOutputFormatClass(NullOutputFormat.class);

// Direct the job output to the phonebook table.
final KijiURI tableURI =
KijiURI.newBuilder(String.format("kiji://.env/default/%s", TABLE_NAME)).build();
job.getConfiguration().set(KijiConfKeys.OUTPUT_KIJI_TABLE_URI, tableURI.toString());
MapReduceJob configureJob(Path inputPath, KijiURI tableUri) throws IOException {
return KijiBulkImportJobBuilder.create()
.withConf(getConf())
.withInput(new TextMapReduceJobInput(inputPath))
.withOutput(new DirectKijiTableMapReduceJobOutput(tableUri))
.withBulkImporter(PhonebookBulkImporter.class)
.build();
}

/**
Expand All @@ -183,21 +134,17 @@ public int run(String[] args) throws Exception {
// Load HBase configuration before connecting to Kiji.
setConf(HBaseConfiguration.addHbaseResources(getConf()));

// Configure a map-only job that imports phonebook entries from a file into the table.
final Job job = new Job(getConf(), "PhonebookImporter");
configureJob(job, new Path(args[0]));
// Direct the job output to the phonebook table. Due to the size of this data set,
// we can write directly to the table rather than use HFileMapReduceJobOutput.
// small amount of output
final KijiURI tableUri =
KijiURI.newBuilder(String.format("kiji://.env/default/%s", TABLE_NAME)).build();

// Tell Hadoop where the java dependencies are located, so they
// can be shipped to the cluster during execution.
job.setJarByClass(PhonebookImporter.class);
GenericTableMapReduceUtil.addAllDependencyJars(job);
DistributedCacheJars.addJarsToDistributedCache(
job, new File(System.getenv("KIJI_HOME"), "lib"));
job.setUserClassesTakesPrecedence(true);
// Configure a map-only job that imports phonebook entries from a file into the table.
final MapReduceJob job = configureJob(new Path(args[0]), tableUri);

// Run the job.
final boolean isSuccessful = job.waitForCompletion(true);

final boolean isSuccessful = job.run();
return isSuccessful ? 0 : 1;
}

Expand Down
Expand Up @@ -36,11 +36,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.kiji.mapreduce.MapReduceJob;
import org.kiji.schema.Kiji;
import org.kiji.schema.KijiDataRequest;
import org.kiji.schema.KijiRowData;
Expand All @@ -49,7 +49,6 @@
import org.kiji.schema.KijiTableReader;
import org.kiji.schema.KijiURI;
import org.kiji.schema.layout.KijiTableLayout;
import org.kiji.schema.mapreduce.KijiConfKeys;
import org.kiji.schema.testutil.AbstractKijiIntegrationTest;
import org.kiji.schema.util.Resources;

Expand Down Expand Up @@ -123,16 +122,13 @@ public void testPhonebookImporter() throws Exception {
PhonebookImporter importer = new PhonebookImporter();
importer.setConf(mConf);

Job job = new Job(mConf);

importer.configureJob(job, mInputPath);

// Override the output table uri in the job with one associated with this
// test-specific Kiji instance.
// configure a MapReduce job that uses our specific HBase instance as well as the
// one-off filename for the input data.
final KijiURI tableURI = KijiURI.newBuilder(getKijiURI()).withTableName("phonebook").build();
job.getConfiguration().set(KijiConfKeys.OUTPUT_KIJI_TABLE_URI, tableURI.toString());

final boolean jobSuccess = job.waitForCompletion(true);
MapReduceJob job = importer.configureJob(mInputPath, tableURI);

final boolean jobSuccess = job.run();
assertTrue("Importer exited with non-zero status", jobSuccess);

checkOutputTable();
Expand Down

0 comments on commit 398be8e

Please sign in to comment.