Skip to content

Commit

Permalink
clean closure
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 24, 2015
1 parent 20821ec commit 07d5456
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ case class TakeOrderedAndProject(

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
protected override def doExecute(): RDD[InternalRow] =
sparkContext.makeRDD(collectData(), 1)
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ case class InsertIntoHiveTable(
overwrite: Boolean,
ifNotExists: Boolean) extends UnaryNode with HiveInspectors {

@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog
val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
private lazy val hiveContext = new Context(sc.hiveconf)
private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
private val newSerializer = (tableDesc: TableDesc) => {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
serializer
}
}: Serializer

def output: Seq[Attribute] = child.output

Expand All @@ -79,13 +79,10 @@ case class InsertIntoHiveTable(
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)

writerContainer.driverSideSetup()
sc.sparkContext.runJob(rdd, writeToFile _)
writerContainer.commitJob()

val newSer = newSerializer
val schema = table.schema
// Note that this function is executed on executor side
def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val writeToFile = (context: TaskContext, iterator: Iterator[InternalRow]) => {
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
Expand All @@ -106,12 +103,16 @@ case class InsertIntoHiveTable(
}

writerContainer
.getLocalFileWriter(row, table.schema)
.write(serializer.serialize(outputData, standardOI))
.getLocalFileWriter(row, schema)
.write(newSer(fileSinkConf.getTableInfo).serialize(outputData, standardOI))
}

writerContainer.close()
}
}: Unit

writerContainer.driverSideSetup()
sc.sparkContext.runJob(rdd, sc.sparkContext.clean(writeToFile))
writerContainer.commitJob()
}

/**
Expand Down

0 comments on commit 07d5456

Please sign in to comment.