Permalink
Browse files

Fix dependencies for modules.

  • Loading branch information...
1 parent da47378 commit f339c8630cf009366114168b4479dcb0af749f65 @larsgeorge committed Sep 21, 2011
Showing with 75 additions and 54 deletions.
  1. +0 −5 ch06/pom.xml
  2. +11 −18 ch07/src/main/java/mapreduce/ParseJson2.java
  3. +59 −31 ch07/src/main/java/mapreduce/ParseJsonMulti.java
  4. +5 −0 pom.xml
View
@@ -138,11 +138,6 @@
<version>2.3.0</version>
</dependency>
<dependency>
- <groupId>com.google.collections</groupId>
- <artifactId>google-collections</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version>
@@ -1,6 +1,6 @@
package mapreduce;
-// cc ParseJson MapReduce job that parses the raw data into separate columns.
+// cc ParseJson2 MapReduce job that parses the raw data into separate columns (map phase only).
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -35,14 +35,13 @@
private static final Log LOG = LogFactory.getLog(ParseJson2.class);
- public static final String NAME = "ParseJson";
+ public static final String NAME = "ParseJson2";
public enum Counters { ROWS, COLS, ERROR, VALID }
/**
* Implements the <code>Mapper</code> that reads the data and extracts the
* required information.
*/
- // vv ParseJson
static class ParseMapper
extends TableMapper<ImmutableBytesWritable, Writable> {
@@ -56,7 +55,6 @@ protected void setup(Context context)
context.getConfiguration().get("conf.columnfamily"));
}
- // ^^ ParseJson
/**
* Maps the input.
*
@@ -65,7 +63,6 @@ protected void setup(Context context)
* @param context The task context.
* @throws java.io.IOException When mapping the input fails.
*/
- // vv ParseJson
@Override
public void map(ImmutableBytesWritable row, Result columns, Context context)
throws IOException {
@@ -79,7 +76,7 @@ public void map(ImmutableBytesWritable row, Result columns, Context context)
JSONObject json = (JSONObject) parser.parse(value);
for (Object key : json.keySet()) {
Object val = json.get(key);
- put.add(columnFamily, Bytes.toBytes(key.toString()), // co ParseJson-1-Put Store the top-level JSON keys as columns, with their value set as the column value.
+ put.add(columnFamily, Bytes.toBytes(key.toString()),
Bytes.toBytes(val.toString()));
}
}
@@ -92,7 +89,6 @@ public void map(ImmutableBytesWritable row, Result columns, Context context)
context.getCounter(Counters.ERROR).increment(1);
}
}
- // ^^ ParseJson
/*
{
"updated": "Mon, 14 Sep 2009 17:09:02 +0000",
@@ -121,10 +117,8 @@ public void map(ImmutableBytesWritable row, Result columns, Context context)
e104984ea5f37cf8ae70451a619c9ac0#outernationalist"
}
*/
- // vv ParseJson
}
- // ^^ ParseJson
/**
* Parse the command line parameters.
*
@@ -173,10 +167,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
- // vv ParseJson
public static void main(String[] args) throws Exception {
- /*...*/
- // ^^ ParseJson
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
@@ -188,30 +179,32 @@ public static void main(String[] args) throws Exception {
String output = cmd.getOptionValue("o");
String column = cmd.getOptionValue("c");
- // vv ParseJson
Scan scan = new Scan();
if (column != null) {
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
if (colkey.length > 1) {
scan.addColumn(colkey[0], colkey[1]);
- conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); // co ParseJson-2-Conf Store the column family in the configuration for later use in the mapper.
+ conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); // co ParseJson2-2-Conf Store the column family in the configuration for later use in the mapper.
conf.set("conf.columnqualifier", Bytes.toStringBinary(colkey[1]));
} else {
scan.addFamily(colkey[0]);
conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0]));
}
}
- Job job = new Job(conf, "Parse data in " + input + ", write to " + output);
+ // vv ParseJson2
+ /*...*/
+ Job job = new Job(conf, "Parse data in " + input + ", write to " + output +
+ "(map only)");
job.setJarByClass(ParseJson2.class);
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
- // co ParseJson-3-SetMap Setup map phase details using the utility method.
ImmutableBytesWritable.class, Put.class, job);
TableMapReduceUtil.initTableReducerJob(output,
- // co ParseJson-4-SetReduce Configure an identity reducer to store the parsed data.
IdentityTableReducer.class, job);
+ /*[*/job.setNumReduceTasks(0);/*]*/
+ /*...*/
+ // ^^ ParseJson2
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
- // ^^ ParseJson
}
@@ -1,6 +1,6 @@
package mapreduce;
-// cc ParseJson MapReduce job that parses the raw data into separate columns.
+// cc ParseJsonMulti MapReduce job that parses the raw data into separate tables.
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -13,16 +13,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -35,28 +36,43 @@
private static final Log LOG = LogFactory.getLog(ParseJsonMulti.class);
- public static final String NAME = "ParseJson";
+ public static final String NAME = "ParseJsonMulti";
public enum Counters { ROWS, COLS, ERROR, VALID }
/**
* Implements the <code>Mapper</code> that reads the data and extracts the
* required information.
*/
- // vv ParseJson
+ // vv ParseJsonMulti
static class ParseMapper
extends TableMapper<ImmutableBytesWritable, Writable> {
+ private HTable infoTable = null;
+ private HTable linkTable = null;
private JSONParser parser = new JSONParser();
private byte[] columnFamily = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
+ infoTable = new HTable(context.getConfiguration(),
+ context.getConfiguration().get("conf.infotable")); // co ParseJsonMulti-1-Setup Create and configure both target tables in the setup() method.
+ infoTable.setAutoFlush(false);
+ linkTable = new HTable(context.getConfiguration(),
+ context.getConfiguration().get("conf.linktable"));
+ linkTable.setAutoFlush(false);
columnFamily = Bytes.toBytes(
context.getConfiguration().get("conf.columnfamily"));
}
- // ^^ ParseJson
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ infoTable.flushCommits();
+ linkTable.flushCommits(); // co ParseJsonMulti-2-Cleanup Flush all pending commits when the task is complete.
+ }
+
+ // ^^ ParseJsonMulti
/**
* Maps the input.
*
@@ -65,25 +81,32 @@ protected void setup(Context context)
* @param context The task context.
* @throws java.io.IOException When mapping the input fails.
*/
- // vv ParseJson
+ // vv ParseJsonMulti
@Override
public void map(ImmutableBytesWritable row, Result columns, Context context)
throws IOException {
context.getCounter(Counters.ROWS).increment(1);
String value = null;
try {
- Put put = new Put(row.get());
+ Put infoPut = new Put(row.get());
+ Put linkPut = new Put(row.get());
for (KeyValue kv : columns.list()) {
context.getCounter(Counters.COLS).increment(1);
value = Bytes.toStringBinary(kv.getValue());
JSONObject json = (JSONObject) parser.parse(value);
for (Object key : json.keySet()) {
Object val = json.get(key);
- put.add(columnFamily, Bytes.toBytes(key.toString()), // co ParseJson-1-Put Store the top-level JSON keys as columns, with their value set as the column value.
- Bytes.toBytes(val.toString()));
+ if ("link".equals(key)) {
+ linkPut.add(columnFamily, Bytes.toBytes(key.toString()),
+ Bytes.toBytes(val.toString()));
+ } else {
+ infoPut.add(columnFamily, Bytes.toBytes(key.toString()),
+ Bytes.toBytes(val.toString()));
+ }
}
}
- context.write(row, put);
+ infoTable.put(infoPut); // co ParseJsonMulti-3-TwoTbls Save parsed values into two separate tables.
+ linkTable.put(linkPut);
context.getCounter(Counters.VALID).increment(1);
} catch (Exception e) {
e.printStackTrace();
@@ -92,7 +115,7 @@ public void map(ImmutableBytesWritable row, Result columns, Context context)
context.getCounter(Counters.ERROR).increment(1);
}
}
- // ^^ ParseJson
+ // ^^ ParseJsonMulti
/*
{
"updated": "Mon, 14 Sep 2009 17:09:02 +0000",
@@ -121,10 +144,10 @@ public void map(ImmutableBytesWritable row, Result columns, Context context)
e104984ea5f37cf8ae70451a619c9ac0#outernationalist"
}
*/
- // vv ParseJson
+ // vv ParseJsonMulti
}
- // ^^ ParseJson
+ // ^^ ParseJsonMulti
/**
* Parse the command line parameters.
*
@@ -139,15 +162,20 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
o.setArgName("input-table-name");
o.setRequired(true);
options.addOption(o);
- o = new Option("o", "output", true,
- "table to write to (must exist)");
- o.setArgName("output-table-name");
- o.setRequired(true);
- options.addOption(o);
o = new Option("c", "column", true,
"column to read data from (must exist)");
o.setArgName("family:qualifier");
options.addOption(o);
+ o = new Option("o", "infotbl", true,
+ "info table to write to (must exist)");
+ o.setArgName("info-table-name");
+ o.setRequired(true);
+ options.addOption(o);
+ o = new Option("l", "linktbl", true,
+ "link table to write to (must exist)");
+ o.setArgName("link-table-name");
+ o.setRequired(true);
+ options.addOption(o);
options.addOption("d", "debug", false, "switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
@@ -173,10 +201,10 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
- // vv ParseJson
+ // vv ParseJsonMulti
public static void main(String[] args) throws Exception {
/*...*/
- // ^^ ParseJson
+ // ^^ ParseJsonMulti
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
@@ -185,33 +213,33 @@ public static void main(String[] args) throws Exception {
if (cmd.hasOption("d")) conf.set("conf.debug", "true");
// get details
String input = cmd.getOptionValue("i");
- String output = cmd.getOptionValue("o");
String column = cmd.getOptionValue("c");
-
- // vv ParseJson
+ // vv ParseJsonMulti
+ conf.set("conf.infotable", cmd.getOptionValue("o")); // co ParseJsonMulti-4-Conf Store table names in configuration for later use in the mapper.
+ conf.set("conf.linktable", cmd.getOptionValue("l"));
+ /*...*/
+ // ^^ ParseJsonMulti
Scan scan = new Scan();
if (column != null) {
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
if (colkey.length > 1) {
scan.addColumn(colkey[0], colkey[1]);
- conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); // co ParseJson-2-Conf Store the column family in the configuration for later use in the mapper.
+ conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0]));
conf.set("conf.columnqualifier", Bytes.toStringBinary(colkey[1]));
} else {
scan.addFamily(colkey[0]);
conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0]));
}
}
-
- Job job = new Job(conf, "Parse data in " + input + ", write to " + output);
+ // vv ParseJsonMulti
+ Job job = new Job(conf, "Parse data in " + input + ", into two tables");
job.setJarByClass(ParseJsonMulti.class);
TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
- // co ParseJson-3-SetMap Setup map phase details using the utility method.
ImmutableBytesWritable.class, Put.class, job);
- TableMapReduceUtil.initTableReducerJob(output,
- // co ParseJson-4-SetReduce Configure an identity reducer to store the parsed data.
- IdentityTableReducer.class, job);
+ job.setOutputFormatClass(NullOutputFormat.class); // co ParseJsonMulti-5-Null Set the output format to be ignored by the framework.
+ job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
- // ^^ ParseJson
+ // ^^ ParseJsonMulti
}
View
@@ -75,6 +75,11 @@
<dependencies>
<dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>

0 comments on commit f339c86

Please sign in to comment.