Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Commit

Permalink
Copy only specific table properties in MongoStorageHandler (HADOOP-176).
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lovett committed Jan 29, 2015
1 parent 9c8625e commit 9010025
Showing 1 changed file with 29 additions and 4 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
Expand All @@ -36,6 +37,7 @@
import java.util.Properties;

import static java.lang.String.format;
import static com.mongodb.hadoop.hive.BSONSerDe.MONGO_COLS;

/**
* Used to sync documents in some MongoDB collection with
Expand Down Expand Up @@ -133,12 +135,35 @@ public void configureOutputJobProperties(final TableDesc tableDesc, final Map<St
* Helper function to copy properties
*/
private void copyJobProperties(final Properties from, final Map<String, String> to) {
for (Entry<Object, Object> e : from.entrySet()) {
to.put((String) e.getKey(), (String) e.getValue());
// Copy Hive-specific properties used directly by
// HiveMongoInputFormat, BSONSerDe.
if (from.containsKey(serdeConstants.LIST_COLUMNS)) {
to.put(serdeConstants.LIST_COLUMNS,
(String)from.get(serdeConstants.LIST_COLUMNS));
}
if (from.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
to.put(serdeConstants.LIST_COLUMN_TYPES,
(String)from.get(serdeConstants.LIST_COLUMN_TYPES));
}
if (from.containsKey(MONGO_COLS)) {
to.put(MONGO_COLS, (String)from.get(MONGO_COLS));
}
if (from.containsKey(TABLE_LOCATION)) {
to.put(TABLE_LOCATION, (String)from.get(TABLE_LOCATION));
}

// Copy general connector properties, such as ones defined in
// MongoConfigUtil. These are all prefixed with "mongo.".
for (Entry<Object, Object> entry : from.entrySet()) {
String key = (String)entry.getKey();
if (key.startsWith("mongo.")) {
to.put(key, (String)from.get(key));
}
}

if (to.containsKey(MONGO_URI)) {
String mongoURIStr = to.get(MONGO_URI);
// Update the keys for MONGO_URI per MongoConfigUtil.
if (from.containsKey(MONGO_URI)) {
String mongoURIStr = (String)from.get(MONGO_URI);
to.put(MongoConfigUtil.INPUT_URI, mongoURIStr);
to.put(MongoConfigUtil.OUTPUT_URI, mongoURIStr);
}
Expand Down

0 comments on commit 9010025

Please sign in to comment.