Skip to content

Commit

Permalink
fixed existential type error by converting jobfullparameters in execu…
Browse files Browse the repository at this point in the history
…tion data to existential, and in runETL with the type provided by the end user
  • Loading branch information
vbounyasit committed Feb 20, 2020
1 parent 773d3c3 commit d13d68a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
17 changes: 9 additions & 8 deletions src/main/scala/com/vbounyasit/bigdata/ETL.scala
Expand Up @@ -19,7 +19,7 @@

package com.vbounyasit.bigdata

import com.vbounyasit.bigdata.ETL.{ExecutionData, OptionalJobParameters}
import com.vbounyasit.bigdata.ETL.{ExecutionData, JobFullExecutionParameters, OptionalJobParameters}
import com.vbounyasit.bigdata.args.ArgumentsConfiguration
import com.vbounyasit.bigdata.args.base.OutputArguments
import com.vbounyasit.bigdata.config.ConfigurationsLoader
Expand All @@ -41,7 +41,7 @@ trait ETL[U, V] {
* @param args The list of arguments to parse
* @return An ExecutionData object containing all the required parameters
*/
def loadExecutionData(args: Array[String]): ExecutionData[_, _, _, _]
protected def loadExecutionData(args: Array[String]): ExecutionData

/**
* Extracts data from a provided sources configuration
Expand Down Expand Up @@ -94,11 +94,12 @@ trait ETL[U, V] {
*
* @param executionData The ExecutionData object that will be used
*/
def runETL[Config, Argument, ConfigInput, ArgumentInput](executionData: ExecutionData[Config, Argument, ConfigInput, ArgumentInput]): Unit = {
def runETL[Config, Argument, ConfigInput, ArgumentInput](executionData: ExecutionData): Unit = {
implicit val spark: SparkSession = executionData.spark

executionData.jobFullExecutionParameters
.foreach(jobParameters => {
.foreach(jobParametersExistential => {
val jobParameters = jobParametersExistential.asInstanceOf[JobFullExecutionParameters[Config, Argument, ConfigInput, ArgumentInput]]
//extract
val sources: Sources = extract(
jobParameters.outputTable.table,
Expand Down Expand Up @@ -140,10 +141,10 @@ object ETL {

type EmptyOptionalParameters = OptionalJobParameters[Nothing, Nothing]

case class ExecutionData[Config, Argument, ConfigInput, ArgumentInput](configurations: ConfigurationsLoader,
jobFullExecutionParameters: Seq[JobFullExecutionParameters[Config, Argument, ConfigInput, ArgumentInput]],
spark: SparkSession,
environment: String)
case class ExecutionData(configurations: ConfigurationsLoader,
jobFullExecutionParameters: Seq[JobFullExecutionParameters[_, _, _, _]],
spark: SparkSession,
environment: String)

case class JobFullExecutionParameters[Config, Argument, ConfigInput, ArgumentInput](jobConf: JobConf,
outputTable: TableMetadata,
Expand Down
Expand Up @@ -60,7 +60,7 @@ abstract class SparkApplication[U, V] extends SparkSessionProvider with ETL[U, V
* @param args The list of arguments to parse
* @return An ExecutionData object containing all the required parameters
*/
protected def loadExecutionData(args: Array[String]): ExecutionData[_, _, _, _] = {
protected def loadExecutionData(args: Array[String]): ExecutionData = {

case class TableMetadataSeq(tables: Seq[TableMetadata])

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/vbounyasit/bigdata/utils/MonadUtils.scala
Expand Up @@ -33,10 +33,10 @@ object MonadUtils {
}

def getMapSubList[K, V, Error <: ExceptionHandler](keySubSeq: List[K], dataMap: Map[K, V], errorNotFound: ExceptionWithMessage[Error])(implicit keyConverter: K => String): Either[Error, Map[K, V]] = {
def toEither[U](option: Option[(K, U)]): Either[Error, (K, U)] = Either.cond(option.isDefined, option.get, errorNotFound(keyConverter(option.get._1)))
def toEither[U](key: K, option: Option[(K, U)]): Either[Error, (K, U)] = Either.cond(option.isDefined, option.get, errorNotFound(keyConverter(key)))
keySubSeq
.map(key => dataMap.get(key).map(key -> _))
.map(toEither)
.map(key => (key, dataMap.get(key).map(key -> _)))
.map(e => toEither(e._1, e._2))
.sequence.map(_.toMap)
}
}

0 comments on commit d13d68a

Please sign in to comment.