Skip to content

Commit

Permalink
[SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper
Browse files Browse the repository at this point in the history
according liancheng‘s  comment in https://issues.apache.org/jira/browse/SPARK-6505,  this patch remove the  reflection call in HiveFunctionWrapper, and implement the functions named "deserializeObjectByKryo" and "serializeObjectByKryo" according the functions with the save name in
org.apache.hadoop.hive.ql.exec.Utilities.java

Author: baishuo <vc_java@hotmail.com>

Closes apache#5660 from baishuo/SPARK-6505-20150423 and squashes the following commits:

ae61ec4 [baishuo] modify code style
78d9fa3 [baishuo] modify code style
0b522a7 [baishuo] modify code style
a5ff9c7 [baishuo] Remove the reflection call in HiveFunctionWrapper
  • Loading branch information
baishuo authored and liancheng committed Apr 27, 2015
1 parent d188b8b commit 82bb7fd
Showing 1 changed file with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package org.apache.spark.sql.hive

import java.rmi.server.UID
import java.util.{Properties, ArrayList => JArrayList}
import java.io.{OutputStream, InputStream}

import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
Expand All @@ -46,6 +50,7 @@ import org.apache.hadoop.{io => hadoopIo}

import org.apache.spark.Logging
import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
import org.apache.spark.util.Utils._

/**
* This class provides the UDF creation and also the UDF instance serialization and
Expand All @@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
// for Serialization
def this() = this(null)

import org.apache.spark.util.Utils._

@transient
private val methodDeSerialize = {
val method = classOf[Utilities].getDeclaredMethod(
"deserializeObjectByKryo",
classOf[Kryo],
classOf[java.io.InputStream],
classOf[Class[_]])
method.setAccessible(true)

method
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
in: InputStream,
clazz: Class[_]): T = {
val inp = new Input(in)
val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
inp.close()
t
}

@transient
private val methodSerialize = {
val method = classOf[Utilities].getDeclaredMethod(
"serializeObjectByKryo",
classOf[Kryo],
classOf[Object],
classOf[java.io.OutputStream])
method.setAccessible(true)

method
def serializeObjectByKryo(
kryo: Kryo,
plan: Object,
out: OutputStream ) {
val output: Output = new Output(out)
kryo.writeObject(output, plan)
output.close()
}

def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), is, clazz)
deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
.asInstanceOf[UDFType]
}

def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), function, out)
serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
}

private var instance: AnyRef = null
Expand Down

0 comments on commit 82bb7fd

Please sign in to comment.