Skip to content

Commit

Permalink
throwing ExecExceptions instead of returning null
Browse files Browse the repository at this point in the history
  • Loading branch information
mmay committed Jul 6, 2011
1 parent fd913e4 commit 83c4ba8
Showing 1 changed file with 69 additions and 64 deletions.
133 changes: 69 additions & 64 deletions JsonLoader.java
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
Expand All @@ -43,7 +45,8 @@
import java.util.Map;

/**
* Reads and parses JSON from a json file.
* Parses JSON data from a JSON file one line at a time. Each line is expected
* to contain a separate JSON record.
*
* Usage:
* e.g. JSON that looks like:
Expand All @@ -60,6 +63,9 @@
* }
* }}
* --------------------------------------------------------------
* **The above json record is expanded for readability. This entire record should be
* condensed to one line in your json file.
*
* register the jar containing this class (e.g. piggybank.jar)
* a = load '/tmp/jsontest' using org.pig.piggybank.storage.JsonLoader() as (json:map[]);
* b = foreach a generate flatten(json#'menu') as menu;
Expand All @@ -69,61 +75,60 @@
*
*/
public class JsonLoader extends LoadFunc {
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
private ObjectMapper mapper;
private LineRecordReader in = null;
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
private ObjectMapper mapper;
private LineRecordReader in = null;

public JsonLoader() {
public JsonLoader() {
super();
mapper = new ObjectMapper();
}
}

@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}

@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
@Override
public Tuple getNext() throws IOException {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
Text val = in.getCurrentValue();
if (val == null) {
return null;
}
String line = val.toString();
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (line.length() > 0) {
Tuple t = parseStringToTuple(line);
if (t != null) {
return t;
}
}
return t;
}
}
return null;
}
}

protected Tuple parseStringToTuple(String line) {
protected Tuple parseStringToTuple(String line) throws IOException {
try {
Map<String, Object> values = new HashMap<String, Object>();
JsonNode node = mapper.readTree(line);
System.out.println(mapper.readTree(line));
flatten_value(node, values);
return tupleFactory.newTuple(values);
} catch (NumberFormatException e) {
e.getMessage().concat("Very big number exceeds the scale of long: " + line);
return null;
int errCode = 6018;
String errMsg = "Error while reading input - Very big number exceeds the scale of long: " + line;
throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e);
} catch (JsonParseException e) {
e.getMessage().concat("Could not json-decode string: " + line);
return null;
} catch (IOException e) {
e.getMessage();
return null;
int errCode = 6018;
String errMsg = "Error while reading input - Could not json-decode string: " + line;
throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e);
}
}

private void flatten_value(JsonNode node, Map<String, Object> values) {
private void flatten_value(JsonNode node, Map<String, Object> values) {
Iterator<String> keys = node.getFieldNames();
Iterator<JsonNode> nodes = node.getElements();
while (keys.hasNext()) {
Expand All @@ -146,35 +151,35 @@ private void flatten_value(JsonNode node, Map<String, Object> values) {
values.put(key, value != null ? value.toString().replaceAll("[\"]", "") : null);
}
}
}
}

private void flatten_array(JsonNode value, DataBag bag) {
if(value.isArray()) {
ArrayNode array = (ArrayNode)value;
DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
for(JsonNode innervalue :array) {
flatten_array(innervalue, b);
}
bag.addAll(b);
} else if (value.isObject()){
Map<String, Object> values2 = new HashMap<String, Object>();
flatten_value((ObjectNode)value, values2);
bag.add(tupleFactory.newTuple(values2));
} else {
if(value !=null) {
bag.add( tupleFactory.newTuple(value));
}
}
}
private void flatten_array(JsonNode value, DataBag bag) {
if(value.isArray()) {
ArrayNode array = (ArrayNode)value;
DataBag b = DefaultBagFactory.getInstance().newDefaultBag();
for(JsonNode innervalue :array) {
flatten_array(innervalue, b);
}
bag.addAll(b);
} else if (value.isObject()){
Map<String, Object> values2 = new HashMap<String, Object>();
flatten_value((ObjectNode)value, values2);
bag.add(tupleFactory.newTuple(values2));
} else {
if(value !=null) {
bag.add( tupleFactory.newTuple(value));
}
}
}

@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
in = (LineRecordReader) reader;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
in = (LineRecordReader) reader;
}

@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
@Override
public void setLocation(String location, Job job) throws IOException {
PigFileInputFormat.setInputPaths(job, location);
}
}

0 comments on commit 83c4ba8

Please sign in to comment.