Skip to content

Commit

Permalink
added global application conf and arguments parsing for parameters th…
Browse files Browse the repository at this point in the history
…at are common to all jobs, todo : add conf files for specific jobs
  • Loading branch information
vbounyasit committed Feb 21, 2020
1 parent 040b87e commit 311bd56
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 44 deletions.
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,6 +1,6 @@
name := "DataFlow"

version := "1.0.1-SNAPSHOT"
version := "1.0.1"

scalaVersion := "2.11.12"

Expand Down
39 changes: 25 additions & 14 deletions src/main/scala/com/vbounyasit/bigdata/ETL.scala
Expand Up @@ -40,7 +40,7 @@ trait ETL[U, V] {
* @param args The list of arguments to parse
* @return An ExecutionData object containing all the required parameters
*/
protected def loadExecutionData(args: Array[String]): ExecutionData
protected def loadExecutionData(args: Array[String]): ExecutionData[_, _]

/**
* Extracts data from a provided sources configuration
Expand Down Expand Up @@ -93,12 +93,15 @@ trait ETL[U, V] {
*
* @param executionData The ExecutionData object that will be used
*/
def runETL[Config, Argument, ConfigInput, ArgumentInput](executionData: ExecutionData): Unit = {
def runETL[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput](executionData: ExecutionData[GlobalConfig, GlobalArgument]): Unit = {
implicit val spark: SparkSession = executionData.spark

val globalApplicationConf: Option[GlobalConfig] = executionData.applicationConf
val globalArguments: Option[GlobalArgument] = executionData.applicationArguments

executionData.jobFullExecutionParameters
.foreach(jobParametersExistential => {
val jobParameters = jobParametersExistential.asInstanceOf[JobFullExecutionParameters[Config, Argument, ConfigInput, ArgumentInput]]
val jobParameters = jobParametersExistential.asInstanceOf[JobFullExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument, ConfigInput, ArgumentInput]]
//extract
val sources: Sources = extract(
jobParameters.outputTable.table,
Expand All @@ -112,6 +115,10 @@ trait ETL[U, V] {
sources,
//relying on runtime for this cast
jobParameters.executionFunction(
OptionalJobParameters(
globalApplicationConf,
globalArguments
),
OptionalJobParameters(
jobParameters.optionalJobParameters.applicationConfig.map(_.asInstanceOf[ConfigInput]),
jobParameters.optionalJobParameters.arguments.map(_.asInstanceOf[ArgumentInput])
Expand All @@ -136,30 +143,34 @@ trait ETL[U, V] {
*/
object ETL {

type ExecutionConfig = ExecutionParameters[_, _]
type ExecutionConfig = ExecutionParameters[_, _, _, _]

type EmptyOptionalParameters = OptionalJobParameters[Nothing, Nothing]

case class ExecutionData(configurations: ConfigurationsLoader,
jobFullExecutionParameters: Seq[JobFullExecutionParameters[_, _, _, _]],
spark: SparkSession,
environment: String)
type OptionalParameters[GlobalConfig, GlobalArgument, Config, Argument] = (OptionalJobParameters[GlobalConfig, GlobalArgument], OptionalJobParameters[Config, Argument])

case class ExecutionData[GlobalConfig, GlobalArgument](configurations: ConfigurationsLoader,
applicationConf: Option[GlobalConfig],
applicationArguments: Option[GlobalArgument],
jobFullExecutionParameters: Seq[JobFullExecutionParameters[_, _, _, _, _, _]],
spark: SparkSession,
environment: String)

case class JobFullExecutionParameters[Config, Argument, ConfigInput, ArgumentInput](jobConf: JobConf,
outputTable: TableMetadata,
optionalJobParameters: OptionalJobParameters[Config, Argument],
executionFunction: OptionalJobParameters[ConfigInput, ArgumentInput] => ExecutionPlan)
case class JobFullExecutionParameters[GlobalConfigInput, GlobalArgumentInput, Config, Argument, ConfigInput, ArgumentInput](jobConf: JobConf,
outputTable: TableMetadata,
optionalJobParameters: OptionalJobParameters[Config, Argument],
executionFunction: OptionalParameters[GlobalConfigInput, GlobalArgumentInput, ConfigInput, ArgumentInput] => ExecutionPlan)

case class OptionalJobParameters[Config, Argument](applicationConfig: Option[Config],
arguments: Option[Argument])

case class ExecutionParameters[Config, Argument](executionFunction: OptionalJobParameters[Config, Argument] => ExecutionPlan,
case class ExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument](executionFunction: OptionalParameters[GlobalConfig, GlobalArgument, Config, Argument] => ExecutionPlan,
additionalArguments: Option[ArgumentsConfiguration[Argument]] = None)

case class TableMetadata(database: String, table: String)

object ExecutionParameters {
def apply[Config, Argument](executionPlan: ExecutionPlan): ExecutionParameters[Config, Argument] =
def apply[GlobalConfig, GlobalArgument, Config, Argument](executionPlan: ExecutionPlan): ExecutionParameters[GlobalConfig, GlobalArgument, Config, Argument] =
ExecutionParameters(_ => executionPlan)
}

Expand Down
45 changes: 33 additions & 12 deletions src/main/scala/com/vbounyasit/bigdata/SparkApplication.scala
Expand Up @@ -31,7 +31,7 @@ import com.vbounyasit.bigdata.exceptions.ExceptionHandler._
import com.vbounyasit.bigdata.providers.{LoggerProvider, SparkSessionProvider}
import com.vbounyasit.bigdata.transform.ExecutionPlan
import com.vbounyasit.bigdata.utils.MonadUtils._
import com.vbounyasit.bigdata.utils.{CollectionsUtils, DateUtils}
import com.vbounyasit.bigdata.utils.{CollectionsUtils, DateUtils, MonadUtils}
import org.apache.spark.sql.functions.{lit, _}
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand Down Expand Up @@ -61,14 +61,35 @@ abstract class SparkApplication[U, V] extends SparkSessionProvider with ETL[U, V
* @param args The list of arguments to parse
* @return An ExecutionData object containing all the required parameters
*/
protected def loadExecutionData(args: Array[String]): ExecutionData = {
protected def loadExecutionData(args: Array[String]): ExecutionData[_, _] = {

case class TableMetadataSeq(tables: Seq[TableMetadata])

case class JobsConfMap(configs: Map[String, JobConf])

case class ExecutionParametersMap(parameters: Map[String, ExecutionConfig])

/**
* Optional Application config
* TODO : Currently, a single config file defined in ConfigDefinition will be used for every job's configuration. Might want to define one file per job.
*/
val parsedApplicationConfiguration: Option[_] = configDefinition.applicationConf.map(conf =>
handleEither(loadConfig(conf.configFileName, conf.pureConfigLoaded))
)

/**
* Parsing of the global application configuration file and arguments
*/
val parsedApplicationArguments: Option[_] = {
configDefinition.applicationArguments.map(argsConf => {
val argumentParser = argsConf.argumentParser
argumentParser.parseArguments(
"",
args
)
})
}

handleEither(for {
/**
* Loading configuration files
Expand All @@ -90,7 +111,7 @@ abstract class SparkApplication[U, V] extends SparkSessionProvider with ETL[U, V
* Getting the list of jobs to compute
*/
tablesToCompute <- {
val output: Option[TableMetadataSeq] = configDefinition.resultingOutputTables match {
val output: Option[TableMetadataSeq] = configDefinition.resultingOutputTables(parsedApplicationConfiguration, parsedApplicationArguments) match {
case None => (parsedBaseArgument.table, parsedBaseArgument.database) match {
case ("N/A", _) | (_, "N/A") => None
case (database, table) => Some(TableMetadataSeq(Seq(TableMetadata(database, table))))
Expand Down Expand Up @@ -119,13 +140,7 @@ abstract class SparkApplication[U, V] extends SparkSessionProvider with ETL[U, V
}
} yield {
val spark: SparkSession = getSparkSession(loadedConfigurations.sparkParamsConf)
/**
* Optional Application config
* TODO : Currently, a single config file defined in ConfigDefinition will be used for every job's configuration. Might want to define one file per job.
*/
val customConfiguration: Option[_] = configDefinition.applicationConf.map(conf =>
handleEither(loadConfig(conf.configFileName, conf.pureConfigLoaded))
)


/**
* Merging with the list of output tables
Expand All @@ -150,19 +165,25 @@ abstract class SparkApplication[U, V] extends SparkSessionProvider with ETL[U, V
*/
.map {
case ((jobConf, tableMetadata), executionConfig) =>
val parsedCustomArguments: Option[_] = executionConfig.additionalArguments.map(argsConf => {
val parsedJobArguments: Option[_] = executionConfig.additionalArguments.map(argsConf => {
val argumentParser = argsConf.argumentParser
handleEither(argumentParser.parseArguments(
loadedConfigurations.sparkParamsConf.appName,
args
))
})
JobFullExecutionParameters(jobConf, tableMetadata, OptionalJobParameters(customConfiguration, parsedCustomArguments), executionConfig.executionFunction)
JobFullExecutionParameters(
jobConf,
tableMetadata,
OptionalJobParameters(parsedApplicationConfiguration, parsedJobArguments),
executionConfig.executionFunction)
}.toSeq
logger.info("Successfully loaded parameters from configuration files")

ExecutionData(
loadedConfigurations,
parsedApplicationConfiguration,
parsedApplicationArguments,
jobFullExecutionParameters,
spark,
parsedBaseArgument.env
Expand Down
Expand Up @@ -22,6 +22,7 @@ package com.vbounyasit.bigdata.config
import com.typesafe.config.{Config, ConfigFactory}
import com.vbounyasit.bigdata.ApplicationConf
import com.vbounyasit.bigdata.ETL.TableMetadata
import com.vbounyasit.bigdata.args.ArgumentsConfiguration

/**
* Everything related to configuration files loading is handled here.
Expand All @@ -30,14 +31,6 @@ trait ConfigDefinition {

implicit def toOptionalOutputTables(tables: Seq[(String, String)]): Option[Seq[(String, String)]] = Some(tables)

/**
* An optional configuration file related to our application.
*
* Note: On the Application side, you can fill this parameter using the
* loadConfig function from pureconfig
*/
val applicationConf: ApplicationConf[_] = None

/**
* The spark parameters that will be used on the remote cluster we submit our job on.
*/
Expand All @@ -58,10 +51,24 @@ trait ConfigDefinition {
*/
val jobsConf: Config

/**
* An optional configuration file related to our application.
*
* Note: On the Application side, you can fill this parameter using the
* loadConfig function from pureconfig
*/
val applicationConf: ApplicationConf[_] = None

/**
* The arguments parameters that will be parsed for every jobs launched
*/
val applicationArguments: Option[ArgumentsConfiguration[_]] = None

/**
* The output tables and jobs to run
*/
val resultingOutputTables: Option[Seq[TableMetadata]] = None
def resultingOutputTables[GlobalConfig, GlobalArgument](applicationConf: Option[GlobalConfig],
applicationArgument: GlobalArgument): Option[Seq[TableMetadata]] = None


}
Expand Up @@ -91,8 +91,8 @@ object ExceptionHandler {
}

case class NoOutputTablesSpecified() extends ExceptionHandler {
override protected val exceptionType: String = "No output tables specified"
override protected val constructedMessage: String = "Please specify output tables either in argument command line or the ConfigDefinition class"
override protected val exceptionType: String = "No output tables found"
override protected val constructedMessage: String = "Please specify output tables in argument command line or the ConfigDefinition class"
}

case class MergingMapKeyNotFound(key: String) extends ExceptionHandler {
Expand Down
Expand Up @@ -53,16 +53,22 @@ trait JobsTestGenerator extends TestComponents {
*/
val dataFrameWriter: DataFrameWriter

/**
* An optional custom argument object we want to use in our tests.
*/
val defaultCustomArgument: Option[_] = None

/**
* An optional application conf file we want to use in our tests.
*/
val defaultApplicationConf: Option[_] = None

/**
* An optional application argument object we want to use in our tests.
*/
val defaultApplicationArguments: Option[_] = None

/**
* An optional job argument object we want to use in our tests.
*/
val defaultJobArguments: Option[_] = None

/**
* Executing the tests.
*/
Expand All @@ -77,7 +83,9 @@ trait JobsTestGenerator extends TestComponents {
logger.info("Hive environment successfully setup")
}

val optionalParameters: OptionalJobParameters[Any, Any] = OptionalJobParameters(defaultApplicationConf, defaultCustomArgument)
//todo change application conf to job conf
val optionalApplicationParameters: OptionalJobParameters[Any, Any] = OptionalJobParameters(defaultApplicationConf, defaultApplicationArguments)
val optionalJobParameters: OptionalJobParameters[Any, Any] = OptionalJobParameters(defaultApplicationConf, defaultJobArguments)

sparkApplication.executionPlans.foreach {
case (jobName, ExecutionParameters(executionFunction, _)) => {
Expand Down Expand Up @@ -148,7 +156,7 @@ trait JobsTestGenerator extends TestComponents {
val resultDataFrame = sparkApplication.transform(
jobName,
sources,
executionFunction(optionalParameters),
executionFunction(optionalApplicationParameters, optionalJobParameters),
Some(jobConf.outputMetadata.outputColumns),
None
)
Expand Down

0 comments on commit 311bd56

Please sign in to comment.