Skip to content

Commit

Permalink
Merge branch 'release/2.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 11, 2020
2 parents 2f87893 + be8b5e3 commit b7c0a79
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
# Change Log

## 2.3.0

- add `canReturnLastCommit` support for `relativeVersion` which will return the oldest version supported greater than `relativeVersion`.
- add `shufflePartitions` to `DeltaLakeMergeLoad` which is used to distribute (`repartition`) data to workers and may need to be increased if many files are present.
- add `schemaURI` and `schemaView` to `DeltaLakeExtract` to allow provision of a schema if the `inputURI` does not exist.

## 2.2.0

- add `createTableIfNotExists` option to `DeltaLakeMergeLoad` to allow creation of initial set if missing. Default `false`.
Expand Down
57 changes: 45 additions & 12 deletions src/main/scala/ai/tripl/arc/extract/DeltaLakeExtract.scala
Expand Up @@ -57,7 +57,7 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
import ai.tripl.arc.config.ConfigUtils._
implicit val c = config

val expectedKeys = "type" :: "id" :: "name" :: "description" :: "environments" :: "inputURI" :: "outputView" :: "numPartitions" :: "partitionBy" :: "persist" :: "options" :: "authentication" :: "params" :: Nil
val expectedKeys = "type" :: "id" :: "name" :: "description" :: "environments" :: "inputURI" :: "outputView" :: "numPartitions" :: "partitionBy" :: "persist" :: "options" :: "authentication" :: "params" :: "schemaURI" :: "schemaView" :: Nil
val id = getOptionalValue[String]("id")
val name = getValue[String]("name")
val description = getOptionalValue[String]("description")
Expand All @@ -69,11 +69,14 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
val timestampAsOf = getOptionalValue[String]("timestampAsOf")
val timeTravel = readTimeTravel("options", c)
val authentication = readAuthentication("authentication")
val extractColumns = if(c.hasPath("schemaURI")) getValue[String]("schemaURI") |> parseURI("schemaURI") _ |> textContentForURI("schemaURI", authentication) |> getExtractColumns("schemaURI") _ else Right(List.empty)
val schemaView = if(c.hasPath("schemaView")) getValue[String]("schemaView") else Right("")
val params = readMap("params", c)
val invalidKeys = checkValidKeys(c)(expectedKeys)

(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel) match {
case (Right(id), Right(name), Right(description), Right(parsedGlob), Right(outputView), Right(authentication), Right(persist), Right(numPartitions), Right(partitionBy), Right(invalidKeys), Right(timeTravel)) =>
(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel, extractColumns, schemaView) match {
case (Right(id), Right(name), Right(description), Right(parsedGlob), Right(outputView), Right(authentication), Right(persist), Right(numPartitions), Right(partitionBy), Right(invalidKeys), Right(timeTravel), Right(extractColumns), Right(schemaView)) =>
val schema = if(c.hasPath("schemaView")) Left(schemaView) else Right(extractColumns)

val stage = DeltaLakeExtractStage(
plugin=this,
Expand All @@ -87,7 +90,8 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
persist=persist,
numPartitions=numPartitions,
partitionBy=partitionBy,
timeTravel=timeTravel
timeTravel=timeTravel,
schema=schema
)

stage.stageDetail.put("input", parsedGlob)
Expand All @@ -105,7 +109,7 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {

Right(stage)
case _ =>
val allErrors: Errors = List(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel).collect{ case Left(errs) => errs }.flatten
val allErrors: Errors = List(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel, extractColumns, schemaView).collect{ case Left(errs) => errs }.flatten
val stageName = stringOrDefault(name, "unnamed stage")
val err = StageError(index, stageName, c.origin.lineNumber, allErrors)
Left(err :: Nil)
Expand All @@ -121,12 +125,20 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
if (c.hasPath(path)) {
try {
val config = c.getConfig(path)
val expectedKeys = "relativeVersion" :: "timestampAsOf" :: "canReturnLastCommit" :: "versionAsOf" :: Nil
val expectedKeys = "relativeVersion" :: "timestampAsOf" :: "versionAsOf" :: "canReturnLastCommit" :: Nil
val invalidKeys = checkValidKeys(config)(expectedKeys)
(invalidKeys) match {
case Right(_) => {
(config.hasPath("relativeVersion"), config.hasPath("timestampAsOf"), config.hasPath("versionAsOf"), config.hasPath("canReturnLastCommit")) match {
case (true, false, false, _) => {
case (true, false, false, true) => {
val relativeVersion = config.getInt("relativeVersion")
if (relativeVersion > 0) {
throw new Exception(s"relativeVersion must be less than or equal to zero.")
} else {
Right(Some(TimeTravel(Option(relativeVersion), None, Option(config.getBoolean("canReturnLastCommit")), None)))
}
}
case (true, false, false, false) => {
val relativeVersion = config.getInt("relativeVersion")
if (relativeVersion > 0) {
throw new Exception(s"relativeVersion must be less than or equal to zero.")
Expand Down Expand Up @@ -171,7 +183,8 @@ case class DeltaLakeExtractStage(
persist: Boolean,
numPartitions: Option[Int],
partitionBy: List[String],
timeTravel: Option[TimeTravel]
timeTravel: Option[TimeTravel],
schema: Either[String, List[ExtractColumn]]
) extends ExtractPipelineStage {

override def execute()(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {
Expand All @@ -183,6 +196,15 @@ object DeltaLakeExtractStage {

def execute(stage: DeltaLakeExtractStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

// try to get the schema
val optionSchema = try {
ExtractUtils.getSchema(stage.schema)(spark, logger)
} catch {
case e: Exception => throw new Exception(e) with DetailException {
override val detail = stage.stageDetail
}
}

CloudUtils.setHadoopConfiguration(stage.authentication)

val df = try {
Expand All @@ -197,20 +219,25 @@ object DeltaLakeExtractStage {
var calculatedVersionAsOf: Option[Long] = None

// determine the read options
for (timeTravel <- stage.timeTravel) {
stage.timeTravel.foreach { timeTravel =>

timeTravel.timestampAsOf.foreach { optionsMap.put("timestampAsOf", _) }
timeTravel.canReturnLastCommit.foreach { canReturnLastCommit => optionsMap.put("canReturnLastCommit", canReturnLastCommit.toString) }
timeTravel.versionAsOf.foreach { versionAsOf => optionsMap.put("versionAsOf", versionAsOf.toString) }

// determine whether to time travel to a specific version or a calculated version
for (relativeVersion <- timeTravel.relativeVersion) {
timeTravel.relativeVersion.foreach { relativeVersion =>
val versions = commitInfos.map { version => version.getVersion }
val minVersion = versions.reduceLeft(_ min _)
val maxVersion = versions.reduceLeft(_ max _)
val maxOffset = maxVersion - minVersion
if (relativeVersion < (maxOffset * -1)) {
throw new Exception(s"Cannot time travel Delta table to version ${relativeVersion}. Available versions: [-${maxOffset} ... 0].")
val canReturnLastCommit = timeTravel.canReturnLastCommit.getOrElse(false)
if (relativeVersion < (maxOffset * -1) && !canReturnLastCommit) {
throw new Exception(s"Cannot time travel Delta table to version ${relativeVersion}. Available versions: [-${maxOffset} ... 0].")
} else if (relativeVersion < (maxOffset * -1) && canReturnLastCommit) {
val calculatedVersion = minVersion
calculatedVersionAsOf = Option(calculatedVersion)
optionsMap.put("versionAsOf", calculatedVersion.toString)
} else {
val calculatedVersion = maxVersion + relativeVersion
calculatedVersionAsOf = Option(calculatedVersion)
Expand Down Expand Up @@ -258,9 +285,15 @@ object DeltaLakeExtractStage {
commitInfo.operationMetrics.foreach { operationMetrics => commitMap.put("operationMetrics", operationMetrics.map { case (k, v) => (k, Try(v.toInt).getOrElse(v)) }.asJava) }
stage.stageDetail.put("commit", commitMap)


df
}
} catch {
case e: Exception if (e.getMessage.contains("No such file or directory") && e.getMessage.contains("_delta_log")) =>
optionSchema match {
case Some(schema) => spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
case None => throw new Exception(EmptySchemaExtractError(Some(stage.input)).getMessage)
}
case e: Exception => throw new Exception(e) with DetailException {
override val detail = stage.stageDetail
}
Expand Down
85 changes: 79 additions & 6 deletions src/test/scala/ai/tripl/arc/extract/DeltaLakeExtractSuite.scala
Expand Up @@ -137,11 +137,64 @@ class DeltaLakeExtractSuite extends FunSuite with BeforeAndAfter {
val testUUID = UUID.randomUUID.toString
val output = s"${FileUtils.getTempDirectoryPath}/${testUUID}"

val thrown0 = intercept[Exception with DetailException] {
val dataset = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
name=outputView,
description=None,
input=output,
outputView=outputView,
authentication=None,
params=Map.empty,
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None)),
schema=Right(Nil),
)
).get
}
assert(thrown0.getMessage.contains("does not contain any fields and no schema has been provided to create an empty dataframe"))

// test a schema
val schema = ai.tripl.arc.util.ArcSchema.parseArcSchema("""
|[{
| "name": "version",
| "type": "integer",
| "trim": true,
| "nullable": true,
| "nullableValues": [
| "",
| "null"
| ]
|}]""".stripMargin)

val dataset0 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
name=outputView,
description=None,
input=output,
outputView=outputView,
authentication=None,
params=Map.empty,
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None)),
schema=Right(schema.right.get),
)
).get
assert(dataset0.count == 0)

Seq((2)).toDF("version").write.format("delta").mode("overwrite").save(output)
Seq((1)).toDF("version").write.format("delta").mode("overwrite").save(output)
Seq((0)).toDF("version").write.format("delta").mode("overwrite").save(output)

val thrown0 = intercept[Exception with DetailException] {
val thrown1 = intercept[Exception with DetailException] {
val dataset = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
Expand All @@ -155,13 +208,14 @@ class DeltaLakeExtractSuite extends FunSuite with BeforeAndAfter {
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None))
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None)),
schema=Right(Nil),
)
).get
}
assert(thrown0.getMessage.contains("Cannot time travel Delta table to version -3. Available versions: [-2 ... 0]."))
assert(thrown1.getMessage.contains("Cannot time travel Delta table to version -3. Available versions: [-2 ... 0]."))

val dataset = extract.DeltaLakeExtractStage.execute(
val dataset1 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
Expand All @@ -174,11 +228,30 @@ class DeltaLakeExtractSuite extends FunSuite with BeforeAndAfter {
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-2), None, None, None))
timeTravel=Some(extract.TimeTravel(Some(-3), None, Some(true), None)),
schema=Right(Nil),
)
).get
assert(dataset1.first.getInt(0) == 2)

assert(dataset.first.getInt(0) == 2)
val dataset2 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
name=outputView,
description=None,
input=output,
outputView=outputView,
authentication=None,
params=Map.empty,
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-2), None, None, None)),
schema=Right(Nil),
)
).get
assert(dataset2.first.getInt(0) == 2)
}

test("DeltaLakeExtract: bad option key") {
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
@@ -1 +1 @@
version := "2.2.0"
version := "2.3.0"

0 comments on commit b7c0a79

Please sign in to comment.