Skip to content

Commit

Permalink
Merge pull request #6 from vbounyasit/feature/TDD_and_refacto
Browse files Browse the repository at this point in the history
Feature/tdd and refacto
  • Loading branch information
vbounyasit committed Sep 26, 2020
2 parents 590e25e + 4124fad commit a02a61d
Show file tree
Hide file tree
Showing 23 changed files with 194 additions and 305 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -12,4 +12,5 @@ example/
jobs_params.conf
sources.conf
application.conf
my_job.conf
jobs/
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,6 +1,6 @@
name := "DataFlow"

version := "1.0.6"
version := "1.0.7"

scalaVersion := "2.11.12"

Expand Down
104 changes: 56 additions & 48 deletions src/main/scala/com/vbounyasit/bigdata/ETL.scala
Expand Up @@ -19,20 +19,20 @@

package com.vbounyasit.bigdata

import com.vbounyasit.bigdata.ETL.{ExecutionData, JobExecutionParameters, JobParameters, TableMetadata}
import com.vbounyasit.bigdata.ETL._
import com.vbounyasit.bigdata.args.ArgumentsConfiguration
import com.vbounyasit.bigdata.config.ConfigurationsLoader
import com.vbounyasit.bigdata.config.data.JobsConfig.{JobConf, JobSource}
import com.vbounyasit.bigdata.config.data.SourcesConfig.SourcesConf
import com.vbounyasit.bigdata.exceptions.ErrorHandler
import com.vbounyasit.bigdata.config.{ConfigurationsLoader, CustomConfig}
import com.vbounyasit.bigdata.transform.ExecutionPlan
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* A trait defining the functions for an ETL operation.
* TODO get rid of those two generic types in ETL
*/
trait ETL[U, V] {
trait ETL {

protected def parseApplicationParameters(args: Array[String]): ParsedParameters[_, _]

/**
* Loads a set of parameters needed for the ETL Operation
Expand All @@ -42,7 +42,10 @@ trait ETL[U, V] {
* @param args The list of arguments to parse
* @return An ExecutionData object containing all the required parameters
*/
protected def loadExecutionData(args: Array[String]): ExecutionData[_, _]
protected def loadExecutionData(configuration: ConfigurationsLoader,
tablesToCompute: Seq[TableMetadata],
environment: String,
args: Array[String]): ExecutionData

/**
* Extracts data from a provided sources configuration
Expand Down Expand Up @@ -79,58 +82,60 @@ trait ETL[U, V] {
* Saves the resulting dataFrame to disk
*
* @param dataFrame The resulting DataFrame
* @param outputTable The output database and table
* @param optionalJobParameters An OptionalJobParameters object containing any custom
* argument/application files we defined through our application.
* @param outputTable The output database and table
* @param inputParameters A ParametersPair object containing the global input parameters.
*/
def load(dataFrame: DataFrame,
def load[GlobalConfig, GlobalArgument, Config, Argument](dataFrame: DataFrame,
outputTable: TableMetadata,
optionalJobParameters: JobParameters[U, V]): Unit
inputParameters: InputParameters[GlobalConfig, GlobalArgument, Config, Argument]): Unit


/**
* The main method containing the logic for running our ETL job
*
* @param executionData The ExecutionData object that will be used
*/
def runETL[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput](executionData: ExecutionData[GlobalConfig, GlobalArgument]): Unit = {
def runETL[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput](parsedParameters: ParsedParameters[GlobalConfig, GlobalArgument],
executionData: ExecutionData): Unit = {
implicit val spark: SparkSession = executionData.spark
type JobExecParams = JobExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput]

val globalParameters: JobParameters[GlobalConfig, GlobalArgument] = JobParameters(executionData.applicationConf, executionData.applicationArguments)
val globalParameters: ParametersPair[GlobalConfig, GlobalArgument] = ParametersPair(parsedParameters.applicationConf, parsedParameters.applicationArguments)

executionData.jobExecutionParameters
.foreach(jobParametersExistential => {
val jobParameters = jobParametersExistential.asInstanceOf[JobExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput]]
val jobExecutionParameters: JobExecParams = jobParametersExistential.asInstanceOf[JobExecParams]
val jobParameters = ParametersPair(
jobExecutionParameters.jobParameters.applicationConfig.map(_.asInstanceOf[ConfigInput]),
jobExecutionParameters.jobParameters.arguments.map(_.asInstanceOf[ArgumentInput])
)
val inputParameters = InputParameters(
globalParameters,
jobParameters
)
//extract
val sources: Sources = extract(
jobParameters.outputTable.table,
jobParameters.jobConf.sources,
executionData.configurations.sourcesConf,
jobExecutionParameters.outputTable.table,
jobExecutionParameters.jobConf.sources,
parsedParameters.configurations.sourcesConf,
executionData.environment)

//transform
val resultDataFrame = transform(
jobParameters.outputTable.table,
jobExecutionParameters.outputTable.table,
sources,
jobParameters.executionFunction(
JobParameters(
globalParameters.applicationConfig,
globalParameters.arguments
),
JobParameters(
jobParameters.jobParameters.applicationConfig.map(_.asInstanceOf[ConfigInput]),
jobParameters.jobParameters.arguments.map(_.asInstanceOf[ArgumentInput])
)
jobExecutionParameters.executionFunction(
inputParameters
),
Some(jobParameters.jobConf.outputMetadata.outputColumns),
Some(jobParameters.jobConf.outputMetadata.dateColumn)
Some(jobExecutionParameters.jobConf.outputMetadata.outputColumns),
Some(jobExecutionParameters.jobConf.outputMetadata.dateColumn)
)

//load
load(
resultDataFrame,
jobParameters.outputTable,
globalParameters.asInstanceOf[JobParameters[U, V]]
jobExecutionParameters.outputTable,
inputParameters
)
})
}
Expand All @@ -143,35 +148,38 @@ object ETL {

type ExecutionConfig = ExecutionConfigs[_, _, _, _]

type EmptyJobParameters = JobParameters[Nothing, Nothing]

type JobParametersPair[GlobalConfig, GlobalArgument, Config, Argument] = (JobParameters[GlobalConfig, GlobalArgument], JobParameters[Config, Argument])
type EmptyJobParameters = ParametersPair[Nothing, Nothing]

case class ParsedParameters[GlobalConfig, GlobalArgument](configurations: ConfigurationsLoader,
applicationConf: Option[GlobalConfig],
applicationArguments: Option[GlobalArgument])

case class ExecutionData[GlobalConfig, GlobalArgument](configurations: ConfigurationsLoader,
applicationConf: Option[GlobalConfig],
applicationArguments: Option[GlobalArgument],
jobExecutionParameters: Seq[JobExecutionParameters[_, _, _, _, _, _]],
spark: SparkSession,
environment: String)
case class ExecutionData(jobExecutionParameters: Seq[JobExecutionParameters[_, _, _, _, _, _]],
spark: SparkSession,
environment: String)

case class JobExecutionParameters[GlobalConfigInput, GlobalArgumentInput, Config, Argument, ConfigInput, ArgumentInput](jobConf: JobConf,
outputTable: TableMetadata,
jobParameters: JobParameters[Config, Argument],
executionFunction: JobParametersPair[GlobalConfigInput, GlobalArgumentInput, ConfigInput, ArgumentInput] => ExecutionPlan)
case class JobExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput](jobConf: JobConf,
outputTable: TableMetadata,
jobParameters: ParametersPair[Config, Argument],
executionFunction: InputParameters[GlobalConfig, GlobalArgument, ConfigInput, ArgumentInput] => ExecutionPlan)

case class TableMetadata(database: String, table: String)
case class JobParameters[Config, Argument](applicationConfig: Option[Config],
arguments: Option[Argument])

case class ParametersPair[Config, Argument](applicationConfig: Option[Config],
arguments: Option[Argument])

case class ExecutionConfigs[GlobalConfig, GlobalArgument, Config, Argument](executionFunction: JobParametersPair[GlobalConfig, GlobalArgument, Config, Argument] => ExecutionPlan,
additionalConfig: Option[Either[ErrorHandler, Config]] = None,
case class InputParameters[GlobalConfig, GlobalArgument, Config, Argument](globalParameter: ParametersPair[GlobalConfig, GlobalArgument],
jobSpecificParameter: ParametersPair[Config, Argument])


case class ExecutionConfigs[GlobalConfig, GlobalArgument, Config, Argument](executionFunction: InputParameters[GlobalConfig, GlobalArgument, Config, Argument] => ExecutionPlan,
additionalConfig: Option[CustomConfig[Config]] = None,
additionalArguments: Option[ArgumentsConfiguration[Argument]] = None)


object ExecutionConfigs {
def apply[GlobalConfig, GlobalArgument, Config, Argument](executionPlan: ExecutionPlan): ExecutionConfigs[GlobalConfig, GlobalArgument, Config, Argument] =
ExecutionConfigs(_ => executionPlan)
}

}

0 comments on commit a02a61d

Please sign in to comment.