Skip to content

Commit

Permalink
Merge everything that we have on 3.0.0-palantir.29 to 2.5.x (#564)
Browse files Browse the repository at this point in the history
The context behind this merge is to keep `release/2.5.x` up to date with what we have in `3.0.0-palantir.29` before we do a merge from upstream into master.
  • Loading branch information
vinooganesh authored and bulldozer-bot[bot] committed May 29, 2019
1 parent e05c028 commit 4b4bad6
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 24 deletions.
40 changes: 20 additions & 20 deletions dev/deps/spark-deps-hadoop-palantir
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,26 @@ gson-2.2.4.jar
guava-14.0.1.jar
guice-3.0.jar
guice-servlet-3.0.jar
hadoop-annotations-2.9.2-palantir.5.jar
hadoop-auth-2.9.2-palantir.5.jar
hadoop-aws-2.9.2-palantir.5.jar
hadoop-azure-2.9.2-palantir.5.jar
hadoop-azure-datalake-2.9.2-palantir.5.jar
hadoop-client-2.9.2-palantir.5.jar
hadoop-common-2.9.2-palantir.5.jar
hadoop-hdfs-client-2.9.2-palantir.5.jar
hadoop-mapreduce-client-app-2.9.2-palantir.5.jar
hadoop-mapreduce-client-common-2.9.2-palantir.5.jar
hadoop-mapreduce-client-core-2.9.2-palantir.5.jar
hadoop-mapreduce-client-jobclient-2.9.2-palantir.5.jar
hadoop-mapreduce-client-shuffle-2.9.2-palantir.5.jar
hadoop-openstack-2.9.2-palantir.5.jar
hadoop-yarn-api-2.9.2-palantir.5.jar
hadoop-yarn-client-2.9.2-palantir.5.jar
hadoop-yarn-common-2.9.2-palantir.5.jar
hadoop-yarn-registry-2.9.2-palantir.5.jar
hadoop-yarn-server-common-2.9.2-palantir.5.jar
hadoop-yarn-server-web-proxy-2.9.2-palantir.5.jar
hadoop-annotations-2.9.2-palantir.6.jar
hadoop-auth-2.9.2-palantir.6.jar
hadoop-aws-2.9.2-palantir.6.jar
hadoop-azure-2.9.2-palantir.6.jar
hadoop-azure-datalake-2.9.2-palantir.6.jar
hadoop-client-2.9.2-palantir.6.jar
hadoop-common-2.9.2-palantir.6.jar
hadoop-hdfs-client-2.9.2-palantir.6.jar
hadoop-mapreduce-client-app-2.9.2-palantir.6.jar
hadoop-mapreduce-client-common-2.9.2-palantir.6.jar
hadoop-mapreduce-client-core-2.9.2-palantir.6.jar
hadoop-mapreduce-client-jobclient-2.9.2-palantir.6.jar
hadoop-mapreduce-client-shuffle-2.9.2-palantir.6.jar
hadoop-openstack-2.9.2-palantir.6.jar
hadoop-yarn-api-2.9.2-palantir.6.jar
hadoop-yarn-client-2.9.2-palantir.6.jar
hadoop-yarn-common-2.9.2-palantir.6.jar
hadoop-yarn-registry-2.9.2-palantir.6.jar
hadoop-yarn-server-common-2.9.2-palantir.6.jar
hadoop-yarn-server-web-proxy-2.9.2-palantir.6.jar
hibernate-validator-5.2.4.Final.jar
hk2-api-2.5.0-b32.jar
hk2-locator-2.5.0-b32.jar
Expand Down
2 changes: 1 addition & 1 deletion dists/hadoop-palantir-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<sbt.project.name>spark-dist-hadoop-palantir-bom</sbt.project.name>
<build.testJarPhase>none</build.testJarPhase>
<build.copyDependenciesPhase>none</build.copyDependenciesPhase>
<hadoop.version>2.9.2-palantir.5</hadoop.version>
<hadoop.version>2.9.2-palantir.6</hadoop.version>
<curator.version>2.7.1</curator.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (UNION, _) =>
val allTypes = avroType.getTypes.asScala
val nonNullTypes = allTypes.filter(_.getType != NULL)
val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
if (nonNullTypes.nonEmpty) {
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, path)
Expand Down Expand Up @@ -253,7 +254,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
(updater, ordinal, value) => {
val row = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(row)
val i = GenericData.get().resolveUnion(avroType, value)
val i = GenericData.get().resolveUnion(nonNullAvroType, value)
fieldWriters(i)(fieldUpdater, i, value)
updater.set(ordinal, row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("SPARK-27858 Union type: More than one non-null type") {
withTempDir { dir =>
val complexNullUnionType = Schema.createUnion(
List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
val fields = Seq(
new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, new File(s"$dir.avro"))
val avroRec = new GenericData.Record(schema)
avroRec.put("field1", 42)
dataFileWriter.append(avroRec)
val avroRec2 = new GenericData.Record(schema)
avroRec2.put("field1", "Alice")
dataFileWriter.append(avroRec2)
dataFileWriter.flush()
dataFileWriter.close()

val df = spark.read.format("avro").load(s"$dir.avro")
assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
}
}

test("Complex Union Type") {
withTempPath { dir =>
val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>2.9.2-palantir.5</hadoop.version>
<hadoop.version>2.9.2-palantir.6</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<zookeeper.version>3.4.7</zookeeper.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ object SQLConf {
val PARQUET_PARTITION_PRUNING_ENABLED = buildConf("spark.sql.parquet.enablePartitionPruning")
.doc("Enables driver-side partition pruning for Parquet.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
.doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
// openCostInBytes to disable file merging.
test("SPARK-17059: Allow FileFormat to specify partition pruning strategy") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (128 * 1024 * 1024).toString) {
withTempPath { path =>
spark.sparkContext.parallelize(Seq(1, 2, 3), 3)
Expand All @@ -902,6 +903,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext

test("Do not filter out parquet file when missing in _metadata file") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (128 * 1024 * 1024).toString) {
withTempPath { path =>
spark.sparkContext.parallelize(Seq(1, 2, 3), 3)
Expand All @@ -918,6 +920,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext

test("Only read _metadata file once for a given root path") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "true",
"fs.count.impl" -> classOf[CountingFileSystem].getName,
"fs.count.impl.disable.cache" -> "true") {
withTempPath { path =>
Expand Down Expand Up @@ -974,6 +977,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext

test("Ensure timestamps are filterable") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "false") {
withTempPath { path =>
val ts = new Timestamp(System.currentTimeMillis())
Expand All @@ -994,6 +998,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext

test("Ensure dates are filterable") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.PARQUET_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "false") {
withTempPath { path =>
val date = new Date(2016, 1, 1)
Expand Down

0 comments on commit 4b4bad6

Please sign in to comment.