Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Commit

Permalink
Allow projections to be pushed down to MongoDB from Pig (HADOOP-167).
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lovett committed Oct 22, 2015
1 parent 0dcf8ad commit f5d51eb
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 18 deletions.
167 changes: 150 additions & 17 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoLoader.java
Expand Up @@ -2,48 +2,63 @@

import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;
import com.mongodb.util.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.bson.BSONObject;
import org.bson.BasicBSONObject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MongoLoader extends LoadFunc implements LoadMetadata {
public class MongoLoader extends LoadFunc
implements LoadMetadata, LoadPushDown {
private static final Log LOG = LogFactory.getLog(MongoStorage.class);
private static TupleFactory tupleFactory = TupleFactory.getInstance();
// Pig specific settings
private ResourceSchema schema = null;
private RecordReader in = null;
private final MongoInputFormat inputFormat = new MongoInputFormat();
private ResourceFieldSchema[] fields;
private HashMap<String, ResourceFieldSchema> schemaMapping;
private List<String> projectedFields;
private String idAlias = null;

@Override
public void setUDFContextSignature(final String signature) {
}
private String signature;

public MongoLoader() {
LOG.info("Initializing MongoLoader in dynamic schema mode.");
schema = null;
fields = null;
}

public ResourceFieldSchema[] getFields() {
return fields;
public MongoLoader(final String userSchema) {
this(userSchema, null);
}

public MongoLoader(final String userSchema, final String idAlias) {
Expand All @@ -56,14 +71,35 @@ public MongoLoader(final String userSchema, final String idAlias) {
}
}

public MongoLoader(final String userSchema) {
this(userSchema, null);
@Override
public void setUDFContextSignature(final String signature) {
this.signature = signature;
}

private Properties getUDFProperties() {
return UDFContext.getUDFContext()
.getUDFProperties(getClass(), new String[]{signature});
}

public ResourceFieldSchema[] getFields() {
return fields;
}

private BasicBSONObject getProjection() {
String projectionStr =
getUDFProperties().getProperty(MongoConfigUtil.INPUT_FIELDS);
return (BasicBSONObject) JSON.parse(projectionStr);
}

@Override
public void setLocation(final String location, final Job job) throws IOException {
MongoConfigUtil.setInputURI(job.getConfiguration(), location);

Configuration conf = job.getConfiguration();
MongoConfigUtil.setInputURI(conf, location);
String inputFieldsStr =
getUDFProperties().getProperty(MongoConfigUtil.INPUT_FIELDS);
if (inputFieldsStr != null) {
conf.set(MongoConfigUtil.INPUT_FIELDS, inputFieldsStr);
}
}

@Override
Expand All @@ -77,6 +113,29 @@ public void prepareToRead(final RecordReader reader, final PigSplit split) throw
if (in == null) {
throw new IOException("Invalid Record Reader");
}

BasicBSONObject projection = getProjection();
if (fields != null && projection != null) {
schemaMapping =
new HashMap<String, ResourceFieldSchema>(fields.length);
projectedFields = new ArrayList<String>();
Set<String> visitedKeys = new HashSet<String>();
// Prepare mapping of field name -> ResourceFieldSchema.
for (ResourceFieldSchema fieldSchema : fields) {
schemaMapping.put(fieldSchema.getName(), fieldSchema);
}
// Prepare list of projected fields.
for (Map.Entry<String, Object> entry : projection.entrySet()) {
boolean include = (Boolean) entry.getValue();
// Add the name of the outer-level field if this is a nested
// field. Pig will take care of pulling out the inner field.
String key = StringUtils.split(entry.getKey(), '.')[0];
if (include && !visitedKeys.contains(key)) {
projectedFields.add(key);
visitedKeys.add(key);
}
}
}
}

@Override
Expand All @@ -95,16 +154,39 @@ public Tuple getNext() throws IOException {
if (fields == null) {
// dynamic schema mode - just output a tuple with a single element,
// which is a map storing the keys/values in the document
// Since there is no schema, no projection can be made, and
// there's no need to worry about retrieving projected fields.
t = tupleFactory.newTuple(1);
t.set(0, BSONLoader.convertBSONtoPigType(val));
} else {
t = tupleFactory.newTuple(fields.length);
for (int i = 0; i < fields.length; i++) {
String fieldTemp = fields[i].getName();
if (idAlias != null && idAlias.equals(fieldTemp)) {
fieldTemp = "_id";
// A schema was provided. Try to retrieve the projection.
int tupleSize;
if (projectedFields != null) {
tupleSize = projectedFields.size();
} else {
tupleSize = fields.length;
}

t = tupleFactory.newTuple(tupleSize);
for (int i = 0; i < t.size(); i++) {
String fieldTemp;
ResourceFieldSchema fieldSchema;
if (null == projectedFields) {
fieldTemp = fields[i].getName();
fieldSchema = fields[i];
if (idAlias != null && idAlias.equals(fieldTemp)) {
fieldTemp = "_id";
}
} else {
fieldTemp = projectedFields.get(i);
// Use id alias in order to retrieve type info.
if (idAlias != null && "_id".equals(fieldTemp)) {
fieldSchema = schemaMapping.get(idAlias);
} else {
fieldSchema = schemaMapping.get(fieldTemp);
}
}
t.set(i, BSONLoader.readField(val.get(fieldTemp), fields[i]));
t.set(i, BSONLoader.readField(val.get(fieldTemp), fieldSchema));
}
}
return t;
Expand Down Expand Up @@ -145,4 +227,55 @@ public String[] getPartitionKeys(final String location, final Job job) throws IO
public void setPartitionFilter(final Expression partitionFilter) throws IOException {
}

@Override
public List<OperatorSet> getFeatures() {
// PROJECTION is all that exists in the OperatorSet enum.
return Collections.singletonList(OperatorSet.PROJECTION);
}

@Override
public RequiredFieldResponse pushProjection(
final RequiredFieldList requiredFieldList)
throws FrontendException {
// Cannot project any fields if there is no schema.
// Currently, Pig won't even attempt projection without a schema
// anyway, but more work will be needed if a future version supports
// this.
if (null == schema) {
return new RequiredFieldResponse(false);
}

BSONObject projection = new BasicBSONObject();
boolean needId = false;
for (RequiredField field : requiredFieldList.getFields()) {
String fieldName = field.getAlias();
if (idAlias != null && idAlias.equals(fieldName)) {
fieldName = "_id";
needId = true;
}
List<RequiredField> subFields = field.getSubFields();
if (subFields != null && !subFields.isEmpty()) {
// Pig is limited to populating at most one subfield level deep.
for (RequiredField subField : subFields) {
projection.put(fieldName + "." + subField.getAlias(), true);
}
} else {
projection.put(fieldName, true);
}
}
// Turn off _id unless asked for.
if (!needId) {
projection.put("_id", false);
}

LOG.debug("projection: " + projection);

// Store projection to be retrieved later and stored into the job
// configuration.
getUDFProperties().setProperty(
MongoConfigUtil.INPUT_FIELDS, JSON.serialize(projection));

// Return a response indicating that we can honor the projection.
return new RequiredFieldResponse(true);
}
}
42 changes: 41 additions & 1 deletion pig/src/test/java/com/mongodb/hadoop/pig/MongoLoaderTest.java
Expand Up @@ -2,25 +2,36 @@

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.input.MongoRecordReader;
import com.mongodb.hadoop.util.MongoConfigUtil;
import com.mongodb.util.JSON;
import org.apache.pig.LoadPushDown;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.bson.types.Binary;
import org.joda.time.DateTime;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -311,4 +322,33 @@ public void testMapWithTuple() throws Exception {
assertEquals("t21 value", t2.get(0));
*/
}

@Test
public void testPushProjection() throws FrontendException {
String userSchema = "a:int, m:[]";
MongoLoader ml = new MongoLoader(userSchema);
ml.setUDFContextSignature("signature");

LoadPushDown.RequiredField aField =
new LoadPushDown.RequiredField("a", 0, null, DataType.INTEGER);
List<LoadPushDown.RequiredField> mSubFields =
Collections.singletonList(
new LoadPushDown.RequiredField(
"x", 0, null, DataType.INTEGER));
LoadPushDown.RequiredField mField =
new LoadPushDown.RequiredField("m", 1, mSubFields, DataType.MAP);
LoadPushDown.RequiredFieldList requiredFields =
new LoadPushDown.RequiredFieldList(Arrays.asList(aField, mField));

LoadPushDown.RequiredFieldResponse response =
ml.pushProjection(requiredFields);
assertTrue(response.getRequiredFieldResponse());

Properties props = UDFContext.getUDFContext().getUDFProperties(
MongoLoader.class, new String[]{"signature"});
assertEquals(
new BasicDBObjectBuilder()
.add("a", true).add("m.x", true).add("_id", false).get(),
JSON.parse(props.getProperty(MongoConfigUtil.INPUT_FIELDS)));
}
}
26 changes: 26 additions & 0 deletions pig/src/test/java/com/mongodb/hadoop/pig/PigTest.java
@@ -1,6 +1,8 @@
package com.mongodb.hadoop.pig;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.ListIndexesIterable;
Expand Down Expand Up @@ -109,6 +111,30 @@ public void testPigUUID() throws IOException, ParseException {
test.assertOutput(new String[]{"(" + uuid.toString() + ")"});
}

@Test
public void testPigProjection() throws IOException, ParseException {
DBCollection collection = mongoClient
.getDB("mongo_hadoop").getCollection("projection_test");
String[] expected = new String[100];
for (int i = 0; i < expected.length; ++i) {
String letter = String.valueOf((char) ('a' + (i % 26)));
// {"_id": ObjectId(...), "i": <int>,
// "d": {"s": <string>, "j": <int>, "k": <int>}}
collection.insert(
new BasicDBObjectBuilder()
.add("i", i).push("d")
.add("s", letter)
.add("j", i + 1)
.add("k", i % 5).pop().get());
expected[i] = "(" + i + "," + letter + "," + i % 5 + ")";
}

org.apache.pig.pigunit.PigTest test =
new org.apache.pig.pigunit.PigTest(
getClass().getResource("/pig/projection.pig").getPath());
test.assertOutput(expected);
}

@Test
public void testPigBSONOutput() throws IOException, ParseException {
runMongoUpdateStorageTest(
Expand Down
10 changes: 10 additions & 0 deletions pig/src/test/resources/pig/projection.pig
@@ -0,0 +1,10 @@
data =
LOAD 'mongodb://localhost:27017/mongo_hadoop.projection_test'
USING com.mongodb.hadoop.pig.MongoLoader('id:chararray,i:int,d:[]', 'id');

-- Pig only pushes projections with subfields when the outer field is a map (d).
projected =
FOREACH data
GENERATE $1 AS age, d#'s' AS name, d#'k' AS ssn;

STORE projected INTO 'test_results';

0 comments on commit f5d51eb

Please sign in to comment.