From 07d545694690af2da3f3c661aa812f7b55f245d1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 21 Jun 2015 22:45:26 +0800 Subject: [PATCH] clean closure --- .../spark/sql/execution/basicOperators.scala | 3 +- .../hive/execution/InsertIntoHiveTable.scala | 31 ++++++++++--------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index eeb2b2a92b1db..b2671f5340bd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -180,8 +180,7 @@ case class TakeOrderedAndProject( // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - protected override def doExecute(): RDD[InternalRow] = - sparkContext.makeRDD(collectData(), 1) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) override def outputOrdering: Seq[SortOrder] = sortOrder } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 05f425f2b65f3..9d76fc870d320 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -48,16 +48,16 @@ case class InsertIntoHiveTable( overwrite: Boolean, ifNotExists: Boolean) extends UnaryNode with HiveInspectors { - @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] - @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass - @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.catalog + val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] + lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass + private lazy val hiveContext = new Context(sc.hiveconf) + private lazy val catalog = sc.catalog - private def newSerializer(tableDesc: TableDesc): Serializer = { + private val newSerializer = (tableDesc: TableDesc) => { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] serializer.initialize(null, tableDesc.getProperties) serializer - } + }: Serializer def output: Seq[Attribute] = child.output @@ -79,13 +79,10 @@ case class InsertIntoHiveTable( SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - writerContainer.driverSideSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writerContainer.commitJob() - + val newSer = newSerializer + val schema = table.schema // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) + val writeToFile = (context: TaskContext, iterator: Iterator[InternalRow]) => { val standardOI = ObjectInspectorUtils .getStandardObjectInspector( fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, @@ -106,12 +103,16 @@ case class InsertIntoHiveTable( } writerContainer - .getLocalFileWriter(row, table.schema) - .write(serializer.serialize(outputData, standardOI)) + .getLocalFileWriter(row, schema) + .write(newSer(fileSinkConf.getTableInfo).serialize(outputData, standardOI)) } writerContainer.close() - } + }: Unit + + writerContainer.driverSideSetup() + sc.sparkContext.runJob(rdd, sc.sparkContext.clean(writeToFile)) + writerContainer.commitJob() } /**