Skip to content

Commit

Permalink
added empty-write flag in FileWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
qwshen committed Jun 5, 2022
1 parent b0448bc commit 33a62e9
Show file tree
Hide file tree
Showing 44 changed files with 179 additions and 92 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The following explains the definition of each section in a pipeline:
- load-events
- transform user-train
```
In the above setting, the metrics of the two actions (load-events, transform user-train) will be collected and written to ${metrics_uri}. The default metrics include schema in DDL format, row-count, estimate-size and execute-time of the views. Custom metrics can be added by following this [guide](docs/custom-actor.md) _Please note: collecting metrics may impact the overall performance._
In the above setting, the metrics of the two actions (load-events, transform user-train) will be collected and written to ${metrics_uri}. The default metrics include schema in DDL format, row-count, estimate-size and execute-time of the views. Custom metrics can be added by following this [guide](docs/custom-actor.md). _Please note: collecting metrics may impact the overall performance._

- Staging - for situations where the results of one or more actions need to be checked for troubleshooting or data verification. This can be achieved by adding the following section in the definition of a pipeline:
```yaml
Expand All @@ -127,7 +127,7 @@ The following explains the definition of each section in a pipeline:
- load-events
- transform-user-train
```
In the above setting, the output of the two actions (load-events, transform-user-train) will be staged at ${staging_uri}. More actions for staging incur more impact on the performance. Therefore they should be used primarily in dev environments.
In the above setting, the output of the two actions (load-events, transform-user-train) will be staged at ${staging_uri}. More actions for staging incur more impact on the performance. Therefore, they should be used primarily in dev environments.
<br />

Pipeline examples:
Expand Down
4 changes: 2 additions & 2 deletions docs/custom-actor.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Make sure the **super.init(properties, config)** is called at the beginning of t

- Implement the data read/write/transformation logic:
```scala
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = {
//custom implementation here
}
```
Expand All @@ -67,7 +67,7 @@ The following code is to retrieve an existing view by name:
@PropertyKey("view", true)
private var _view: Option[String] = None

def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
//...
df <- this._view.flatMap(name => ctx.getView(name))
//...
Expand Down
4 changes: 4 additions & 0 deletions docs/file-writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ The FileWriter is for writing a data-frame to files in a local or hdfs file syst
- The support formats are csv, json, avro & parquet.
- The write mode can only be overwrite or append
- The partition-by is optional. If provided, it must be the names of one or more columns separated by comma.
- The empty-write is optional, which controls whether to write out an empty view. It must be either yes/no or disabled/enabled. Default: yes.

Actor Class: `com.qwshen.etl.sink.FileWriter`

Expand All @@ -13,6 +14,7 @@ The definition of the FileWriter:
actor:
type: file-writer
properties:
emptyWrite: "no"
format: csv
options:
header: true
Expand All @@ -29,6 +31,7 @@ The definition of the FileWriter:
"actor": {
"type": "file-writer",
"properties": {
"emptyWrite": "yes",
"format": "csv",
"options": {
"header": true,
Expand All @@ -47,6 +50,7 @@ The definition of the FileWriter:
```xml
<actor type="file-writer">
<properties>
<emptyWrite>disabled</emptyWrite>
<format>csv</format>
<options>
<header>true</header>
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/qwshen/etl/Launcher.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.qwshen.etl

import com.qwshen.etl.common.PipelineContext
import com.qwshen.etl.configuration.ArgumentParser
import com.qwshen.etl.pipeline.PipelineRunner
import com.qwshen.etl.pipeline.builder.PipelineFactory
import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.FileSystem

import scala.collection.JavaConverters._
import scala.util.Try

Expand Down Expand Up @@ -38,7 +40,7 @@ class Launcher {
for {
pipeline <- PipelineFactory.fromFile(arguments.pipelineFile)
} {
new PipelineRunner(new ApplicationContext()).run(pipeline)
new PipelineRunner(new PipelineContext()).run(pipeline)
}
}
finally {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/qwshen/etl/common/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ abstract class Actor extends PropertyComponent with Serializable {
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame]
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame]

/**
* Collect metrics of current actor
* @param df
* @param session
* @return
*/
def collectMetrics(df: DataFrame)(implicit session: SparkSession): Seq[(String, String)] = Nil
def collectMetrics(df: DataFrame): Seq[(String, String)] = Nil
}
2 changes: 1 addition & 1 deletion src/main/scala/com/qwshen/etl/common/DeltaWriteActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[etl] abstract class DeltaWriteActor[T] extends Actor { self: T =>
/**
* Run the file-reader
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
df <- this._view.flatMap(name => ctx.getView(name))
} yield Try(write(df)) match {
case Success(_) => df
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/qwshen/etl/common/HBaseWriteActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[etl] abstract class HBaseWriteActor[T] extends HBaseActor[T] { self: T =
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
table <- this._table
df <- this._view.flatMap(name => ctx.getView(name))
} yield Try {
Expand Down
13 changes: 11 additions & 2 deletions .../qwshen/etl/common/ExecutionContext.scala → ...la/com/qwshen/etl/common/JobContext.scala
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.qwshen.etl.common

import com.qwshen.common.logging.Loggable
import com.qwshen.etl.ApplicationContext
import com.qwshen.etl.pipeline.definition.View
import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -12,7 +11,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
* @param session
* @param appCtx
*/
class ExecutionContext private[etl](val appCtx: ApplicationContext, val config: Option[Config] = None)(implicit session: SparkSession) extends Loggable {
class JobContext private[etl](val appCtx: PipelineContext, val config: Option[Config] = None)(implicit session: SparkSession) extends Loggable {
//the _container for holding any object between actions
private val _container: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map.empty[String, Any]

Expand Down Expand Up @@ -60,4 +59,14 @@ class ExecutionContext private[etl](val appCtx: ApplicationContext, val config:
None
}
def getView(view: View): Option[DataFrame] = getView(if (view.global) s"${appCtx.global_db}.${view.name}" else view.name)

//metrics collection hint
private var _metricsRequired: Boolean = false
/**
* Check this flag to see if metrics collection is required
* @return
*/
def metricsRequired = this._metricsRequired
//this method is only called from pipeline-runner
private[etl] def metricsRequired_= (newVal: Boolean) = this._metricsRequired = newVal
}
2 changes: 1 addition & 1 deletion src/main/scala/com/qwshen/etl/common/KafkaReadActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[etl] abstract class KafkaReadActor[T] extends KafkaActor[T] { self: T =
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
df <- this.load(session)
} yield Try {
//calculate all columns except key & value
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/qwshen/etl/common/KafkaWriteActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[etl] abstract class KafkaWriteActor[T] extends KafkaActor[T] { self: T =
/**
* Run the file-reader
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
df <- this._view.flatMap(name => ctx.getView(name))
} yield Try {
def getBySchema(schema: Schema, dstField: String)(x: DataFrame): DataFrame = {
Expand Down
26 changes: 23 additions & 3 deletions ...a/com/qwshen/etl/ApplicationContext.scala → ...m/qwshen/etl/common/PipelineContext.scala
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.qwshen.etl
package com.qwshen.etl.common

import com.qwshen.common.logging.Loggable

import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.util.Calendar
import com.qwshen.common.logging.Loggable

/**
* The application-context describes the common and environment variables at the application level
*/
final class ApplicationContext() extends Loggable {
final class PipelineContext() extends Loggable {
//the _container for holding any object between jobs
private val _container: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map.empty[String, Any]

/**
* The name of the global database in spark-sql
*/
Expand Down Expand Up @@ -111,4 +115,20 @@ final class ApplicationContext() extends Loggable {
* @return
*/
def ioBatchSize: Int = 1600

/**
* Add an object into the _container
*
* @param key
* @param obj
* @return
*/
def addObject(key: String, obj: Any): Unit = this._container.put(key, obj)

/**
* Get an object by name from the _container
* @param key
* @return
*/
def getObject(key: String): Option[Any] = this._container.get(key)
}
2 changes: 1 addition & 1 deletion src/main/scala/com/qwshen/etl/common/SparkConfActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SparkConfActor extends Actor {
* @param session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = {
this._settings.foreach { case (k, v) => session.conf.set(k, v) };
this._hadoopSettings.foreach { case(k, v) => session.sparkContext.hadoopConfiguration.set(k, v) }
None
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/qwshen/etl/common/SqlActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[etl] class SqlBase[T] extends Actor with VariableResolver { self: T =>
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
stmt <- this._sqlStmt
} yield {
//log the sql statement in debug mode
Expand All @@ -59,7 +59,7 @@ private[etl] class SqlBase[T] extends Actor with VariableResolver { self: T =>
* @param session
* @return
*/
override def collectMetrics(df: DataFrame)(implicit session: SparkSession): Seq[(String, String)] = this._sqlStmt.map(stmt => Seq(("sql-stmt", stmt))).getOrElse(Nil)
override def collectMetrics(df: DataFrame): Seq[(String, String)] = this._sqlStmt.map(stmt => Seq(("sql-stmt", stmt))).getOrElse(Nil)

/**
* Initialize the actor with the properties & config
Expand Down
20 changes: 13 additions & 7 deletions src/main/scala/com/qwshen/etl/pipeline/PipelineRunner.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.qwshen.etl.pipeline

import com.qwshen.common.logging.Loggable
import com.qwshen.etl.ApplicationContext
import com.qwshen.etl.common.ExecutionContext
import com.qwshen.etl.common.{JobContext, PipelineContext}
import com.qwshen.etl.pipeline.definition._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.PrintWriter
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}

final class PipelineRunner(appCtx: ApplicationContext) extends Loggable {
final class PipelineRunner(appCtx: PipelineContext) extends Loggable {
//to describe the content of metric entry
private case class MetricEntry(jobName: String, actionName: String, key: String, value: String)

Expand All @@ -34,7 +34,7 @@ final class PipelineRunner(appCtx: ApplicationContext) extends Loggable {
val curSession: SparkSession = if (pipeline.singleSparkSession) session else session.newSession()
try {
//create execution context
val ctx: ExecutionContext = new ExecutionContext(appCtx, pipeline.config)(curSession)
val ctx: JobContext = new JobContext(appCtx, pipeline.config)(curSession)

//register UDFs if any
registerUDFs(pipeline.udfRegistrations)(curSession)
Expand All @@ -54,6 +54,8 @@ final class PipelineRunner(appCtx: ApplicationContext) extends Loggable {
}
//check if all referenced view exist
ensureViewsExist(scanReferencedViews(action))(curSession)
//check if metrics collection is required, so to give the hint to the actor before running
ctx.metricsRequired = pipeline.metricsLogging.exists(ml => ml.loggingActions.exists(a => a.equalsIgnoreCase(action.name)))
//execute
action.actor.run(ctx)(curSession) collect { case r: DataFrame => r } foreach (df => {
promoteView(df, action, pipeline.globalViewAsLocal, viewsReferenced)
Expand Down Expand Up @@ -140,15 +142,19 @@ final class PipelineRunner(appCtx: ApplicationContext) extends Loggable {
ml <- metricsLogging
_ <- ml.loggingActions.find(a => a.equalsIgnoreCase(action.name)) if !df.isStreaming
} yield {
val customMetrics = action.actor.collectMetrics(df)
.map(x => MetricEntry(jobName, action.name, x._1, x._2.replace("\"", "\\\"").replaceAll("[\r|\n| ]+", " ").replace("\r", "").replace("\n", " ")))

if (!(df.storageLevel.useMemory || df.storageLevel.useDisk || df.storageLevel.useOffHeap)) {
df.persist(StorageLevel.MEMORY_AND_DISK)
}
Seq(
val systemMetrics = Seq(
MetricEntry(jobName, action.name, "ddl-schema", df.schema.toDDL),
MetricEntry(jobName, action.name, "row-count", df.count.toString),
MetricEntry(jobName, action.name, "estimate-size", String.format("%s bytes", df.sparkSession.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes.toString())),
MetricEntry(jobName, action.name, "execute-time", LocalDateTime.now.toString)
) ++ action.actor.collectMetrics(df).map(x => MetricEntry(jobName, action.name, x._1, x._2.replace("\"", "\\\"").replaceAll("[\r\n|\r|\n| ]+", " ").replace("\r", "").replace("\n", " ")))
)
systemMetrics ++ customMetrics
}
} match {
case Success(r) => r.getOrElse(Nil)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/qwshen/etl/sink/FileStreamWriter.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.qwshen.etl.sink

import com.qwshen.common.PropertyKey
import com.qwshen.etl.common.{ExecutionContext, FileWriteActor}
import com.qwshen.etl.common.{JobContext, FileWriteActor}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -32,7 +32,7 @@ class FileStreamWriter extends FileWriteActor[FileStreamWriter] {
* @param session - the spark-session
* @return
*/
override def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
override def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
fmt <- this._format
mode <- this._outputMode
uri <- this._fileUri
Expand Down
26 changes: 19 additions & 7 deletions src/main/scala/com/qwshen/etl/sink/FileWriter.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.qwshen.etl.sink

import com.qwshen.common.PropertyKey
import com.qwshen.etl.common.{ExecutionContext, FileWriteActor}
import com.qwshen.etl.common.{FileWriteActor, JobContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -12,6 +13,8 @@ class FileWriter extends FileWriteActor[FileWriter] {
//the mode for the writing
@PropertyKey("mode", true)
protected var _mode: Option[String] = None
@PropertyKey("emptyWrite", false)
protected var _emptyWrite: Option[String] = None

/**
* Run the file-writer
Expand All @@ -20,17 +23,26 @@ class FileWriter extends FileWriteActor[FileWriter] {
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
fmt <- this._format
mode <- this._mode
uri <- this._fileUri
df <- this._view.flatMap(name => ctx.getView(name))
} yield Try {
val initWriter = this._options.foldLeft(df.write.format(fmt))((s, o) => s.option(o._1, o._2))
//with partitionBy
val partitionWriter = this._partitionBy.foldLeft(initWriter)((w, cs) => w.partitionBy(cs.split(",").map(_.trim): _*))
//write
partitionWriter.mode(mode).save(uri)
val goWrite = if (this._emptyWrite.exists(ew => ew.equalsIgnoreCase("no") || ew.equalsIgnoreCase("disabled"))) {
if (!(df.storageLevel.useMemory || df.storageLevel.useDisk || df.storageLevel.useOffHeap)) {
df.persist(StorageLevel.MEMORY_AND_DISK)
}
df.count > 0
} else true

if (goWrite) {
val initWriter = this._options.foldLeft(df.write.format(fmt))((s, o) => s.option(o._1, o._2))
//with partitionBy
val partitionWriter = this._partitionBy.foldLeft(initWriter)((w, cs) => w.partitionBy(cs.split(",").map(_.trim): _*))
//write
partitionWriter.mode(mode).save(uri)
}
} match {
case Success(_) => df
case Failure(ex) => throw new RuntimeException(s"Cannot write data to the target - $uri.", ex)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/qwshen/etl/sink/IcebergStreamWriter.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.qwshen.etl.sink

import com.qwshen.common.PropertyKey
import com.qwshen.etl.common.{ExecutionContext, IcebergActor}
import com.qwshen.etl.common.{JobContext, IcebergActor}
import com.typesafe.config.Config
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -48,7 +48,7 @@ class IcebergStreamWriter extends IcebergActor[IcebergStreamWriter] {
* @param session - the spark-session
* @return
*/
def run(ctx: ExecutionContext)(implicit session: SparkSession): Option[DataFrame] = for {
def run(ctx: JobContext)(implicit session: SparkSession): Option[DataFrame] = for {
table <- this._table
mode <- this._outputMode
df <- this._view.flatMap(name => ctx.getView(name))
Expand Down
Loading

0 comments on commit 33a62e9

Please sign in to comment.