Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 17, 2020
1 parent f3d16c5 commit 16bccfe
Showing 1 changed file with 76 additions and 76 deletions.
152 changes: 76 additions & 76 deletions src/main/scala/ai/tripl/arc/load/DeltaLakeMergeLoad.scala
Expand Up @@ -282,102 +282,102 @@ object DeltaLakeMergeLoadStage {
}

try {
val sourceDF = stage.partitionBy match {
case Nil => nonNullDF
case partitionBy => {
// create a column array for repartitioning
val partitionCols = partitionBy.map(col => nonNullDF(col))
nonNullDF.repartition(partitionCols:_*)
}
val sourceDF = stage.partitionBy match {
case Nil => nonNullDF
case partitionBy => {
// create a column array for repartitioning
val partitionCols = partitionBy.map(col => nonNullDF(col))
nonNullDF.repartition(partitionCols:_*)
}
}

// this is pushed through to the delta layer
stage.numPartitions.foreach { numPartitions => spark.conf.set("arc.delta.partitions", numPartitions) }

// build the operation
try {
var deltaMergeOperation: DeltaMergeBuilder = DeltaTable.forPath(stage.outputURI.toString).as("target")
.merge(
sourceDF.as("source"),
stage.condition)

// match
deltaMergeOperation = if (stage.whenMatchedDeleteFirst) {
val deltaMergeOperationWithDelete = whenMatchedDeleteCondition(deltaMergeOperation)
whenMatchedUpdateCondition(deltaMergeOperationWithDelete)
} else {
val deltaMergeOperationWithUpdate = whenMatchedUpdateCondition(deltaMergeOperation)
whenMatchedDeleteCondition(deltaMergeOperationWithUpdate)
}
// this is pushed through to the delta layer
stage.numPartitions.foreach { numPartitions => spark.conf.set("arc.delta.partitions", numPartitions) }

// build the operation
try {
var deltaMergeOperation: DeltaMergeBuilder = DeltaTable.forPath(stage.outputURI.toString).as("target")
.merge(
sourceDF.as("source"),
stage.condition)

// match
deltaMergeOperation = if (stage.whenMatchedDeleteFirst) {
val deltaMergeOperationWithDelete = whenMatchedDeleteCondition(deltaMergeOperation)
whenMatchedUpdateCondition(deltaMergeOperationWithDelete)
} else {
val deltaMergeOperationWithUpdate = whenMatchedUpdateCondition(deltaMergeOperation)
whenMatchedDeleteCondition(deltaMergeOperationWithUpdate)
}

// if insert as source rows dont exist in target dataset
for (whenNotMatchedByTargetInsert <- stage.whenNotMatchedByTargetInsert) {
(whenNotMatchedByTargetInsert.condition, whenNotMatchedByTargetInsert.values) match {
case (Some(condition), Some(values)) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget(condition).insertExpr(values)
case (Some(condition), None) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget(condition).insertAll
case (None, Some(values)) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget.insertExpr(values)
case (None, None) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget.insertAll
}
// if insert as source rows dont exist in target dataset
for (whenNotMatchedByTargetInsert <- stage.whenNotMatchedByTargetInsert) {
(whenNotMatchedByTargetInsert.condition, whenNotMatchedByTargetInsert.values) match {
case (Some(condition), Some(values)) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget(condition).insertExpr(values)
case (Some(condition), None) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget(condition).insertAll
case (None, Some(values)) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget.insertExpr(values)
case (None, None) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedByTarget.insertAll
}
}

// if delete as target rows dont exist in source dataset
for (whenNotMatchedBySourceDelete <- stage.whenNotMatchedBySourceDelete) {
whenNotMatchedBySourceDelete.condition match {
case Some(condition) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedBySource(condition).delete
case None => deltaMergeOperation = deltaMergeOperation.whenNotMatchedBySource.delete
}
// if delete as target rows dont exist in source dataset
for (whenNotMatchedBySourceDelete <- stage.whenNotMatchedBySourceDelete) {
whenNotMatchedBySourceDelete.condition match {
case Some(condition) => deltaMergeOperation = deltaMergeOperation.whenNotMatchedBySource(condition).delete
case None => deltaMergeOperation = deltaMergeOperation.whenNotMatchedBySource.delete
}
}

// execute
deltaMergeOperation.execute()
} catch {
case e: Exception if (e.getMessage.contains("is not a Delta table")) => {
if (stage.createTableIfNotExists) {
stage.partitionBy match {
case Nil => {
stage.numPartitions match {
case Some(n) => nonNullDF.repartition(n).write.format("delta").save(stage.outputURI.toString)
case None => nonNullDF.write.format("delta").save(stage.outputURI.toString)
}
// execute
deltaMergeOperation.execute()
} catch {
case e: Exception if (e.getMessage.contains("is not a Delta table")) => {
if (stage.createTableIfNotExists) {
stage.partitionBy match {
case Nil => {
stage.numPartitions match {
case Some(n) => nonNullDF.repartition(n).write.format("delta").save(stage.outputURI.toString)
case None => nonNullDF.write.format("delta").save(stage.outputURI.toString)
}
case partitionBy => {
// create a column array for repartitioning
val partitionCols = partitionBy.map(col => nonNullDF(col))
stage.numPartitions match {
case Some(n) => nonNullDF.repartition(n, partitionCols:_*).write.format("delta").partitionBy(partitionBy:_*).save(stage.outputURI.toString)
case None => nonNullDF.repartition(partitionCols:_*).write.format("delta").partitionBy(partitionBy:_*).save(stage.outputURI.toString)
}
}
case partitionBy => {
// create a column array for repartitioning
val partitionCols = partitionBy.map(col => nonNullDF(col))
stage.numPartitions match {
case Some(n) => nonNullDF.repartition(n, partitionCols:_*).write.format("delta").partitionBy(partitionBy:_*).save(stage.outputURI.toString)
case None => nonNullDF.repartition(partitionCols:_*).write.format("delta").partitionBy(partitionBy:_*).save(stage.outputURI.toString)
}
}
} else {
throw new Exception(s"""'${stage.outputURI}' is not a Delta table and 'createTableIfNotExists' is false so cannot complete this operation.""")
}
} else {
throw new Exception(s"""'${stage.outputURI}' is not a Delta table and 'createTableIfNotExists' is false so cannot complete this operation.""")
}
case e: Exception => throw e
} finally {
spark.conf.unset("arc.delta.partitions")
}
case e: Exception => throw e
}

// symlink generation to support presto reading the output
if (stage.generateSymlinkManifest) {
val deltaTable = DeltaTable.forPath(stage.outputURI.toString)
deltaTable.generate("symlink_format_manifest")
}
// symlink generation to support presto reading the output
if (stage.generateSymlinkManifest) {
val deltaTable = DeltaTable.forPath(stage.outputURI.toString)
deltaTable.generate("symlink_format_manifest")
}

// version logging
val deltaLog = DeltaLog.forTable(spark, new Path(stage.outputURI.toString))
val commitInfos = deltaLog.history.getHistory(Some(1))
val commitInfo = commitInfos(0)
val commitMap = new java.util.HashMap[String, Object]()
commitMap.put("version", java.lang.Long.valueOf(commitInfo.getVersion))
commitMap.put("timestamp", Instant.ofEpochMilli(commitInfo.getTimestamp).atZone(ZoneId.systemDefault).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
commitInfo.operationMetrics.foreach { operationMetrics => commitMap.put("operationMetrics", operationMetrics.map { case (k, v) => (k, Try(v.toInt).getOrElse(v)) }.asJava) }
stage.stageDetail.put("commit", commitMap)
// version logging
val deltaLog = DeltaLog.forTable(spark, new Path(stage.outputURI.toString))
val commitInfos = deltaLog.history.getHistory(Some(1))
val commitInfo = commitInfos(0)
val commitMap = new java.util.HashMap[String, Object]()
commitMap.put("version", java.lang.Long.valueOf(commitInfo.getVersion))
commitMap.put("timestamp", Instant.ofEpochMilli(commitInfo.getTimestamp).atZone(ZoneId.systemDefault).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
commitInfo.operationMetrics.foreach { operationMetrics => commitMap.put("operationMetrics", operationMetrics.map { case (k, v) => (k, Try(v.toInt).getOrElse(v)) }.asJava) }
stage.stageDetail.put("commit", commitMap)

} catch {
case e: Exception => throw new Exception(e) with DetailException {
override val detail = stage.stageDetail
}
} finally {
spark.conf.unset("arc.delta.partitions")
}

Option(df)
Expand Down

0 comments on commit 16bccfe

Please sign in to comment.