Skip to content

Commit

Permalink
Added custom partitions as a new feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
mvrpl committed Mar 26, 2017
1 parent 218914a commit 1abac7a
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions src/main/scala/utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ class Spark extends Logs {
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

def makeDF(rows: RDD[Row], schema: StructType): DataFrame = {
val dataFrame = hiveContext.createDataFrame(rows, schema)
return dataFrame
}

def writePartitionsDF(dataFrame: DataFrame, partitions: Seq[String], targetTable: String, saveModeDF: SaveMode): Boolean = {
if (sparkVer >= 2.0) {
val reOrder = dataFrame.columns.filter(!partitions.contains(_)) ++ partitions
Expand Down Expand Up @@ -140,6 +135,28 @@ class Spark extends Logs {
dataFrame.write.partitionBy(partitionName).format("orc").mode(saveModeDF).saveAsTable(targetTable)
}
}
} else if (configs.TARGET.CUSTOMPARTITION.isDefined) {
val partitions = configs.TARGET.CUSTOMPARTITION.map(_.map(_.toStr).toSeq).toSeq
var dataDF: DataFrame = dataFrame
partitions.foreach(values => {
if (values.size != 2) {
error("The custom partitions need 2 values every item: [NAME, VALUE]")
System.exit(1)
}
if (values(1).startsWith("D{") && values(1).endsWith("}")) {
val formatD = values(1).stripPrefix("D{").stripSuffix("}")
val format = new java.text.SimpleDateFormat(formatD)
dataDF = dataDF.withColumn(values(0), functions.lit(format.format(new java.util.Date())))
} else {
dataDF = dataDF.withColumn(values(0), functions.lit(values(1)))
}
})
if (sparkVer >= 2.0) {
dataDF.write.format("orc").mode(saveModeDF).insertInto(targetTable)
} else if (sparkVer >= 1.4) {
val partitionsDF = partitions.map(_(0))
writePartitionsDF(dataDF, partitionsDF, targetTable, saveModeDF)
}
} else {
if (sparkVer >= 2.0) {
dataFrame.write.format("orc").mode(saveModeDF).insertInto(targetTable)
Expand Down

0 comments on commit 1abac7a

Please sign in to comment.