Skip to content

Commit

Permalink
Support avro.schema.url being HDFS location
Browse files Browse the repository at this point in the history
Fixes #11548
  • Loading branch information
findepi committed Oct 9, 2018
1 parent d3f1a6a commit 2c131f9
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 11 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.type.TypeManager; import com.facebook.presto.spi.type.TypeManager;
import io.airlift.slice.Slice; import io.airlift.slice.Slice;
import io.airlift.slice.Slices; import io.airlift.slice.Slices;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.Deserializer;
Expand Down Expand Up @@ -114,6 +115,7 @@ class GenericHiveRecordCursor<K, V extends Writable>
private boolean closed; private boolean closed;


public GenericHiveRecordCursor( public GenericHiveRecordCursor(
Configuration configuration,
Path path, Path path,
RecordReader<K, V> recordReader, RecordReader<K, V> recordReader,
long totalBytes, long totalBytes,
Expand All @@ -136,7 +138,7 @@ public GenericHiveRecordCursor(
this.value = recordReader.createValue(); this.value = recordReader.createValue();
this.hiveStorageTimeZone = hiveStorageTimeZone; this.hiveStorageTimeZone = hiveStorageTimeZone;


this.deserializer = getDeserializer(splitSchema); this.deserializer = getDeserializer(configuration, splitSchema);
this.rowInspector = getTableObjectInspector(deserializer); this.rowInspector = getTableObjectInspector(deserializer);


int size = columns.size(); int size = columns.size();
Expand Down
Expand Up @@ -71,6 +71,7 @@ public Optional<RecordCursor> createRecordCursor(
() -> HiveUtil.createRecordReader(configuration, path, start, length, schema, columns)); () -> HiveUtil.createRecordReader(configuration, path, start, length, schema, columns));


return Optional.of(new GenericHiveRecordCursor<>( return Optional.of(new GenericHiveRecordCursor<>(
configuration,
path, path,
genericRecordReader(recordReader), genericRecordReader(recordReader),
length, length,
Expand Down
10 changes: 6 additions & 4 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
Expand Up @@ -98,6 +98,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_SERDE_NOT_FOUND; import static com.facebook.presto.hive.HiveErrorCode.HIVE_SERDE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION; import static com.facebook.presto.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static com.facebook.presto.hive.util.ConfigurationUtils.copy;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -342,12 +343,12 @@ public static String getDeserializerClassName(Properties schema)
} }


@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static Deserializer getDeserializer(Properties schema) public static Deserializer getDeserializer(Configuration configuration, Properties schema)
{ {
String name = getDeserializerClassName(schema); String name = getDeserializerClassName(schema);


Deserializer deserializer = createDeserializer(getDeserializerClass(name)); Deserializer deserializer = createDeserializer(getDeserializerClass(name));
initializeDeserializer(deserializer, schema); initializeDeserializer(configuration, deserializer, schema);
return deserializer; return deserializer;
} }


Expand Down Expand Up @@ -382,10 +383,11 @@ private static Deserializer createDeserializer(Class<? extends Deserializer> cla
} }


@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private static void initializeDeserializer(Deserializer deserializer, Properties schema) private static void initializeDeserializer(Configuration configuration, Deserializer deserializer, Properties schema)
{ {
try { try {
deserializer.initialize(new Configuration(false), schema); configuration = copy(configuration); // Some SerDes (e.g. Avro) modify passed configuration
deserializer.initialize(configuration, schema);
} }
catch (SerDeException e) { catch (SerDeException e) {
throw new RuntimeException("error initializing deserializer: " + deserializer.getClass().getName()); throw new RuntimeException("error initializing deserializer: " + deserializer.getClass().getName());
Expand Down
Expand Up @@ -37,9 +37,14 @@ private ConfigurationUtils() {}


public static Configuration getInitialConfiguration() public static Configuration getInitialConfiguration()
{ {
Configuration configuration = new Configuration(false); return copy(INITIAL_CONFIGURATION);
copy(INITIAL_CONFIGURATION, configuration); }
return configuration;
public static Configuration copy(Configuration configuration)
{
Configuration copy = new Configuration(false);
copy(configuration, copy);
return copy;
} }


public static void copy(Configuration from, Configuration to) public static void copy(Configuration from, Configuration to)
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.hive; package com.facebook.presto.hive;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer; import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
Expand Down Expand Up @@ -58,7 +59,7 @@ public void testGetThriftDeserializer()
schema.setProperty(SERIALIZATION_CLASS, IntString.class.getName()); schema.setProperty(SERIALIZATION_CLASS, IntString.class.getName());
schema.setProperty(SERIALIZATION_FORMAT, TBinaryProtocol.class.getName()); schema.setProperty(SERIALIZATION_FORMAT, TBinaryProtocol.class.getName());


assertInstanceOf(getDeserializer(schema), ThriftDeserializer.class); assertInstanceOf(getDeserializer(new Configuration(false), schema), ThriftDeserializer.class);
} }


@Test @Test
Expand Down
Expand Up @@ -8,6 +8,7 @@
connector.name=hive-hadoop2 connector.name=hive-hadoop2
hive.metastore.uri=thrift://hadoop-master:9083 hive.metastore.uri=thrift://hadoop-master:9083
hive.metastore.thrift.client.socks-proxy=hadoop-master:1180 hive.metastore.thrift.client.socks-proxy=hadoop-master:1180
hive.config.resources=/docker/volumes/conf/presto/etc/hive-default-fs-site.xml
hive.allow-add-column=true hive.allow-add-column=true
hive.allow-drop-column=true hive.allow-drop-column=true
hive.allow-rename-column=true hive.allow-rename-column=true
Expand Down
6 changes: 6 additions & 0 deletions presto-product-tests/conf/presto/etc/hive-default-fs-site.xml
@@ -0,0 +1,6 @@
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hadoop-master:9000</value>
</property>
</configuration>
Expand Up @@ -60,8 +60,8 @@ public Object[][] avroSchemaLocations()
return new Object[][] { return new Object[][] {
{"file:///docker/volumes/presto-product-tests/avro/original_schema.avsc"}, // mounted in hadoop and presto containers {"file:///docker/volumes/presto-product-tests/avro/original_schema.avsc"}, // mounted in hadoop and presto containers
{"hdfs://hadoop-master:9000/user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"}, {"hdfs://hadoop-master:9000/user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"},
// TODO {"hdfs:///user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"}, {"hdfs:///user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"},
// TODO {"/user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"}, // `avro.schema.url` can actually be path on HDFS (not URL) {"/user/hive/warehouse/TestAvroSchemaUrl/schemas/original_schema.avsc"}, // `avro.schema.url` can actually be path on HDFS (not URL)
}; };
} }


Expand Down

0 comments on commit 2c131f9

Please sign in to comment.