Skip to content

Commit

Permalink
Merge branch 'feature/2.3.0' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 10, 2020
2 parents 4af8f53 + 14a18fa commit 3327e67
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,10 @@
# Change Log

## 2.3.0

- add `canReturnLastCommit` support for `relativeVersion` which will return the oldest version supported greater than `relativeVersion`.
- add `shufflePartitions` to `DeltaLakeMergeLoad` which is used to distribute (`repartition`) data to workers and may need to be increased if many files are present.

## 2.2.0

- add `createTableIfNotExists` option to `DeltaLakeMergeLoad` to allow creation of initial set if missing. Default `false`.
Expand Down
132 changes: 88 additions & 44 deletions src/main/scala/ai/tripl/arc/extract/DeltaLakeExtract.scala
Expand Up @@ -57,7 +57,7 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
import ai.tripl.arc.config.ConfigUtils._
implicit val c = config

val expectedKeys = "type" :: "id" :: "name" :: "description" :: "environments" :: "inputURI" :: "outputView" :: "numPartitions" :: "partitionBy" :: "persist" :: "options" :: "authentication" :: "params" :: Nil
val expectedKeys = "type" :: "id" :: "name" :: "description" :: "environments" :: "inputURI" :: "outputView" :: "numPartitions" :: "partitionBy" :: "persist" :: "options" :: "authentication" :: "params" :: "schemaURI" :: "schemaView" :: Nil
val id = getOptionalValue[String]("id")
val name = getValue[String]("name")
val description = getOptionalValue[String]("description")
Expand All @@ -69,11 +69,14 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
val timestampAsOf = getOptionalValue[String]("timestampAsOf")
val timeTravel = readTimeTravel("options", c)
val authentication = readAuthentication("authentication")
val extractColumns = if(c.hasPath("schemaURI")) getValue[String]("schemaURI") |> parseURI("schemaURI") _ |> textContentForURI("schemaURI", authentication) |> getExtractColumns("schemaURI") _ else Right(List.empty)
val schemaView = if(c.hasPath("schemaView")) getValue[String]("schemaView") else Right("")
val params = readMap("params", c)
val invalidKeys = checkValidKeys(c)(expectedKeys)

(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel) match {
case (Right(id), Right(name), Right(description), Right(parsedGlob), Right(outputView), Right(authentication), Right(persist), Right(numPartitions), Right(partitionBy), Right(invalidKeys), Right(timeTravel)) =>
(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel, extractColumns, schemaView) match {
case (Right(id), Right(name), Right(description), Right(parsedGlob), Right(outputView), Right(authentication), Right(persist), Right(numPartitions), Right(partitionBy), Right(invalidKeys), Right(timeTravel), Right(extractColumns), Right(schemaView)) =>
val schema = if(c.hasPath("schemaView")) Left(schemaView) else Right(extractColumns)

val stage = DeltaLakeExtractStage(
plugin=this,
Expand All @@ -87,7 +90,8 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
persist=persist,
numPartitions=numPartitions,
partitionBy=partitionBy,
timeTravel=timeTravel
timeTravel=timeTravel,
schema=schema
)

stage.stageDetail.put("input", parsedGlob)
Expand All @@ -105,7 +109,7 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {

Right(stage)
case _ =>
val allErrors: Errors = List(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel).collect{ case Left(errs) => errs }.flatten
val allErrors: Errors = List(id, name, description, parsedGlob, outputView, authentication, persist, numPartitions, partitionBy, invalidKeys, timeTravel, extractColumns, schemaView).collect{ case Left(errs) => errs }.flatten
val stageName = stringOrDefault(name, "unnamed stage")
val err = StageError(index, stageName, c.origin.lineNumber, allErrors)
Left(err :: Nil)
Expand All @@ -121,12 +125,20 @@ class DeltaLakeExtract extends PipelineStagePlugin with JupyterCompleter {
if (c.hasPath(path)) {
try {
val config = c.getConfig(path)
val expectedKeys = "relativeVersion" :: "timestampAsOf" :: "canReturnLastCommit" :: "versionAsOf" :: Nil
val expectedKeys = "relativeVersion" :: "timestampAsOf" :: "versionAsOf" :: "canReturnLastCommit" :: Nil
val invalidKeys = checkValidKeys(config)(expectedKeys)
(invalidKeys) match {
case Right(_) => {
(config.hasPath("relativeVersion"), config.hasPath("timestampAsOf"), config.hasPath("versionAsOf"), config.hasPath("canReturnLastCommit")) match {
case (true, false, false, _) => {
case (true, false, false, true) => {
val relativeVersion = config.getInt("relativeVersion")
if (relativeVersion > 0) {
throw new Exception(s"relativeVersion must be less than or equal to zero.")
} else {
Right(Some(TimeTravel(Option(relativeVersion), None, Option(config.getBoolean("canReturnLastCommit")), None)))
}
}
case (true, false, false, false) => {
val relativeVersion = config.getInt("relativeVersion")
if (relativeVersion > 0) {
throw new Exception(s"relativeVersion must be less than or equal to zero.")
Expand Down Expand Up @@ -171,7 +183,8 @@ case class DeltaLakeExtractStage(
persist: Boolean,
numPartitions: Option[Int],
partitionBy: List[String],
timeTravel: Option[TimeTravel]
timeTravel: Option[TimeTravel],
schema: Either[String, List[ExtractColumn]]
) extends ExtractPipelineStage {

override def execute()(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {
Expand All @@ -183,6 +196,15 @@ object DeltaLakeExtractStage {

def execute(stage: DeltaLakeExtractStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

// try to get the schema
val optionSchema = try {
ExtractUtils.getSchema(stage.schema)(spark, logger)
} catch {
case e: Exception => throw new Exception(e) with DetailException {
override val detail = stage.stageDetail
}
}

CloudUtils.setHadoopConfiguration(stage.authentication)

val df = try {
Expand All @@ -197,20 +219,31 @@ object DeltaLakeExtractStage {
var calculatedVersionAsOf: Option[Long] = None

// determine the read options
for (timeTravel <- stage.timeTravel) {
stage.timeTravel.foreach { timeTravel =>

timeTravel.timestampAsOf.foreach { optionsMap.put("timestampAsOf", _) }
timeTravel.canReturnLastCommit.foreach { canReturnLastCommit => optionsMap.put("canReturnLastCommit", canReturnLastCommit.toString) }
timeTravel.versionAsOf.foreach { versionAsOf => optionsMap.put("versionAsOf", versionAsOf.toString) }

// determine whether to time travel to a specific version or a calculated version
for (relativeVersion <- timeTravel.relativeVersion) {
timeTravel.relativeVersion.foreach { relativeVersion =>
val versions = commitInfos.map { version => version.getVersion }
val minVersion = versions.reduceLeft(_ min _)
val maxVersion = versions.reduceLeft(_ max _)
val maxOffset = maxVersion - minVersion
if (relativeVersion < (maxOffset * -1)) {
val canReturnLastCommit = timeTravel.canReturnLastCommit.getOrElse(false)
if (relativeVersion < (maxOffset * -1) && !canReturnLastCommit) {
if (optionSchema.isEmpty) {
throw new Exception(s"Cannot time travel Delta table to version ${relativeVersion}. Available versions: [-${maxOffset} ... 0].")
} else {
val calculatedVersion = maxVersion + relativeVersion
calculatedVersionAsOf = Option(calculatedVersion)
optionsMap.put("versionAsOf", calculatedVersion.toString)
}
} else if (relativeVersion < (maxOffset * -1) && canReturnLastCommit) {
val calculatedVersion = minVersion
calculatedVersionAsOf = Option(calculatedVersion)
optionsMap.put("versionAsOf", calculatedVersion.toString)
} else {
val calculatedVersion = maxVersion + relativeVersion
calculatedVersionAsOf = Option(calculatedVersion)
Expand All @@ -220,43 +253,54 @@ object DeltaLakeExtractStage {
}

// read the data
val df = spark.read.format("delta").options(optionsMap).load(stage.input)

// version logging
// this is useful for timestampAsOf as DeltaLake will extract the last timestamp EARLIER than the given timestamp
// so the value passed in by the user is not nescessarily aligned with an actual version
val commitInfo = stage.timeTravel match {
case Some(timeTravel) => {
val tt = (calculatedVersionAsOf, timeTravel.versionAsOf, timeTravel.timestampAsOf, timeTravel.canReturnLastCommit) match {
case (Some(calculatedVersionAsOf), None, _, _) => {
DeltaTimeTravelSpec(None, None, Some(calculatedVersionAsOf), None)
}
case (None, Some(versionAsOf), _, _) => {
DeltaTimeTravelSpec(None, None, Some(versionAsOf), None)
}
case (None, None, Some(timestampAsOf), None) => {
DeltaTimeTravelSpec(Some(Literal(timestampAsOf)), None, None, None)
}
case (None, None, Some(timestampAsOf), Some(canReturnLastCommit)) => {
DeltaTimeTravelSpec(Some(Literal(timestampAsOf)), Some(canReturnLastCommit), None, None)
}
case _ => {
throw new Exception("invalid state please raise issue.")
val (df, empty) = try {
(spark.read.format("delta").options(optionsMap).load(stage.input), false)
} catch {
case e: Exception if (e.getMessage.contains("Cannot time travel Delta table to version")) =>
optionSchema match {
case Some(schema) => (spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema), true)
case None => throw e
}
}

// if the version was available then log commitInfo
if (!empty) {
// version logging
// this is useful for timestampAsOf as DeltaLake will extract the last timestamp EARLIER than the given timestamp
// so the value passed in by the user is not nescessarily aligned with an actual version
val commitInfo = stage.timeTravel match {
case Some(timeTravel) => {
val tt = (calculatedVersionAsOf, timeTravel.versionAsOf, timeTravel.timestampAsOf, timeTravel.canReturnLastCommit) match {
case (Some(calculatedVersionAsOf), None, _, _) => {
DeltaTimeTravelSpec(None, None, Some(calculatedVersionAsOf), None)
}
case (None, Some(versionAsOf), _, _) => {
DeltaTimeTravelSpec(None, None, Some(versionAsOf), None)
}
case (None, None, Some(timestampAsOf), None) => {
DeltaTimeTravelSpec(Some(Literal(timestampAsOf)), None, None, None)
}
case (None, None, Some(timestampAsOf), Some(canReturnLastCommit)) => {
DeltaTimeTravelSpec(Some(Literal(timestampAsOf)), Some(canReturnLastCommit), None, None)
}
case _ => {
throw new Exception("invalid state please raise issue.")
}
}
val (version, _) = DeltaTableUtils.resolveTimeTravelVersion(spark.sessionState.conf, deltaLog, tt)
commitInfos.filter { commit =>
commit.getVersion == version
}(0)
}
val (version, _) = DeltaTableUtils.resolveTimeTravelVersion(spark.sessionState.conf, deltaLog, tt)
commitInfos.filter { commit =>
commit.getVersion == version
}(0)
case None => commitInfos.sortBy(_.version).reverse(0)
}
case None => commitInfos.sortBy(_.version).reverse(0)
}

val commitMap = new java.util.HashMap[String, Object]()
commitMap.put("version", java.lang.Long.valueOf(commitInfo.getVersion))
commitMap.put("timestamp", Instant.ofEpochMilli(commitInfo.getTimestamp).atZone(ZoneId.systemDefault).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
commitInfo.operationMetrics.foreach { operationMetrics => commitMap.put("operationMetrics", operationMetrics.map { case (k, v) => (k, Try(v.toInt).getOrElse(v)) }.asJava) }
stage.stageDetail.put("commit", commitMap)
val commitMap = new java.util.HashMap[String, Object]()
commitMap.put("version", java.lang.Long.valueOf(commitInfo.getVersion))
commitMap.put("timestamp", Instant.ofEpochMilli(commitInfo.getTimestamp).atZone(ZoneId.systemDefault).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
commitInfo.operationMetrics.foreach { operationMetrics => commitMap.put("operationMetrics", operationMetrics.map { case (k, v) => (k, Try(v.toInt).getOrElse(v)) }.asJava) }
stage.stageDetail.put("commit", commitMap)
}

df
}
Expand Down
60 changes: 56 additions & 4 deletions src/test/scala/ai/tripl/arc/extract/DeltaLakeExtractSuite.scala
Expand Up @@ -155,13 +155,27 @@ class DeltaLakeExtractSuite extends FunSuite with BeforeAndAfter {
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None))
timeTravel=Some(extract.TimeTravel(Some(-3), None, None, None)),
schema=Right(Nil),
)
).get
}
assert(thrown0.getMessage.contains("Cannot time travel Delta table to version -3. Available versions: [-2 ... 0]."))

val dataset = extract.DeltaLakeExtractStage.execute(
// test a schema
val schema = ai.tripl.arc.util.ArcSchema.parseArcSchema("""
|[{
| "name": "version",
| "type": "integer",
| "trim": true,
| "nullable": true,
| "nullableValues": [
| "",
| "null"
| ]
|}]""".stripMargin)

val dataset0 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
Expand All @@ -174,11 +188,49 @@ class DeltaLakeExtractSuite extends FunSuite with BeforeAndAfter {
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-2), None, None, None))
timeTravel=Some(extract.TimeTravel(Some(-100), None, None, None)),
schema=Right(schema.right.get),
)
).get
assert(dataset0.count == 0)

assert(dataset.first.getInt(0) == 2)
val dataset1 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
name=outputView,
description=None,
input=output,
outputView=outputView,
authentication=None,
params=Map.empty,
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-3), None, Some(true), None)),
schema=Right(Nil),
)
).get
assert(dataset1.first.getInt(0) == 2)

val dataset2 = extract.DeltaLakeExtractStage.execute(
extract.DeltaLakeExtractStage(
plugin=new extract.DeltaLakeExtract,
id=None,
name=outputView,
description=None,
input=output,
outputView=outputView,
authentication=None,
params=Map.empty,
persist=false,
numPartitions=None,
partitionBy=Nil,
timeTravel=Some(extract.TimeTravel(Some(-2), None, None, None)),
schema=Right(Nil),
)
).get
assert(dataset2.first.getInt(0) == 2)
}

test("DeltaLakeExtract: bad option key") {
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
@@ -1 +1 @@
version := "2.2.0"
version := "2.3.0"

0 comments on commit 3327e67

Please sign in to comment.