Skip to content

Commit

Permalink
[SPARK-2736] PySpark converter and example script for reading Avro files
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-2736

This patch includes:
1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect).
2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter.
3. Fixing a classloading issue.

cc @MLnick @JoshRosen @mateiz

Author: Kan Zhang <kzhang@apache.org>

Closes apache#1916 from kanzhang/SPARK-2736 and squashes the following commits:

02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes
f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className
82cc505 [Kan Zhang] [SPARK-2736] Update data sample
0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files
c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models
2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes
536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
  • Loading branch information
kanzhang authored and conviva-zz committed Sep 4, 2014
1 parent 9b41f6b commit 6c5cb4c
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 13 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Expand Up @@ -25,6 +25,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
.*avsc
.*txt
.*json
.*data
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
Expand All @@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Expand Up @@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
Expand Down Expand Up @@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
Expand Down Expand Up @@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
Expand Down Expand Up @@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
} yield (Class.forName(k), Class.forName(v))
} yield (Utils.classForName(k), Utils.classForName(v))
}

private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
Expand Down Expand Up @@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}

Expand Down Expand Up @@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Expand Up @@ -146,6 +146,9 @@ private[spark] object Utils extends Logging {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}

/** Preferred alternative to Class.forName(className) */
def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)

/**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/
Expand Down
75 changes: 75 additions & 0 deletions examples/src/main/python/avro_inputformat.py
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

from pyspark import SparkContext

"""
Read data file users.avro in local Spark distro:
$ cd $SPARK_HOME
$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
> examples/src/main/resources/users.avro
{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
To read name and favorite_color fields only, specify the following reader schema:
$ cat examples/src/main/resources/user.avsc
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
{u'favorite_color': None, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'name': u'Ben'}
"""
if __name__ == "__main__":
if len(sys.argv) != 2 and len(sys.argv) != 3:
print >> sys.stderr, """
Usage: avro_inputformat <data_file> [reader_schema_file]
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
"""
exit(-1)

path = sys.argv[1]
sc = SparkContext(appName="AvroKeyInputFormat")

conf = None
if len(sys.argv) == 3:
schema_rdd = sc.textFile(sys.argv[2], 1).collect()
conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}

avro_rdd = sc.newAPIHadoopFile(path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
print k
8 changes: 8 additions & 0 deletions examples/src/main/resources/user.avsc
@@ -0,0 +1,8 @@
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Binary file added examples/src/main/resources/users.avro
Binary file not shown.
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.pythonconverters

import java.util.{Collection => JCollection, Map => JMap}

import scala.collection.JavaConversions._

import org.apache.avro.generic.{GenericFixed, IndexedRecord}
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._

import org.apache.spark.api.python.Converter
import org.apache.spark.SparkException


/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts
* an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
* to work with all 3 Avro data mappings (Generic, Specific and Reflect).
*/
class AvroWrapperToJavaConverter extends Converter[Any, Any] {
override def convert(obj: Any): Any = {
if (obj == null) {
return null
}
obj.asInstanceOf[AvroWrapper[_]].datum() match {
case null => null
case record: IndexedRecord => unpackRecord(record)
case other => throw new SparkException(
s"Unsupported top-level Avro data type ${other.getClass.getName}")
}
}

def unpackRecord(obj: Any): JMap[String, Any] = {
val map = new java.util.HashMap[String, Any]
obj match {
case record: IndexedRecord =>
record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
map.put(f.name, fromAvro(record.get(i), f.schema))
}
case other => throw new SparkException(
s"Unsupported RECORD type ${other.getClass.getName}")
}
map
}

def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
(key.toString, fromAvro(value, schema.getValueType))
}
}

def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
}

def unpackBytes(obj: Any): Array[Byte] = {
val bytes: Array[Byte] = obj match {
case buf: java.nio.ByteBuffer => buf.array()
case arr: Array[Byte] => arr
case other => throw new SparkException(
s"Unknown BYTES type ${other.getClass.getName}")
}
val bytearray = new Array[Byte](bytes.length)
System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
bytearray
}

def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
case c: JCollection[_] =>
c.map(fromAvro(_, schema.getElementType))
case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
arr.toSeq
case arr: Array[_] =>
arr.map(fromAvro(_, schema.getElementType)).toSeq
case other => throw new SparkException(
s"Unknown ARRAY type ${other.getClass.getName}")
}

def unpackUnion(obj: Any, schema: Schema): Any = {
schema.getTypes.toList match {
case List(s) => fromAvro(obj, s)
case List(n, s) if n.getType == NULL => fromAvro(obj, s)
case List(s, n) if n.getType == NULL => fromAvro(obj, s)
case _ => throw new SparkException(
"Unions may only consist of a concrete type and null")
}
}

def fromAvro(obj: Any, schema: Schema): Any = {
if (obj == null) {
return null
}
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
}
}
}

0 comments on commit 6c5cb4c

Please sign in to comment.