In [None]:
// Definiing pipeline parameters
val ADFRunID = ""
val loadType = ""
val processingLayer = ""
val System = ""

In [None]:
%%spark
// Reading synapse pipeline parameter values
val adfRunID = ADFRunID
val consumptionType = loadType
val LayerProcessing = processingLayer
val SRC_System = System

In [None]:
%%spark
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.collection.mutable.ArrayBuffer

In [None]:
%%spark
var metadata : String = ""
if (LayerProcessing.toLowerCase() == "gold"){
    metadata = spark.sql(s"""SELECT 
                        TargetTableName, TargetFilePath, SRC_EntityName, PrimaryKey, TargetBaseFilePath, SourceQuery, cast(isDeltaTableRequired as Int)
                        FROM metadata.entitymetadata
                        WHERE isActive = 1
                        AND LayerProcessing = '${LayerProcessing}'
                        AND SRC_System = '${SRC_System}'
                        AND SourceQuery IS NOT NULL
                        """).map(_.mkString(";")).collectAsList.toArray.toBuffer.mkString("#")
} else {
    metadata = spark.sql(s"""
                        SELECT TargetTableName, TargetFilePath, SRC_EntityName, PrimaryKey, TargetBaseFilePath, NULL AS SourceQuery, cast(isDeltaTableRequired as Int)
                        FROM metadata.entitymetadata
                        WHERE ConsumptionType = '${consumptionType}'
                        AND isActive = 1
                        AND LayerProcessing = '${LayerProcessing}'
                        AND SRC_System = '${SRC_System}'
                        """).map(_.mkString(";")).collectAsList.toArray.toBuffer.mkString("#")
}

In [None]:
%%spark
def parseEntities (entities: String) : ArrayBuffer[(String, String , String, String, String, String, String)] = {

  var sourceTables = new ArrayBuffer[(String, String , String, String, String, String, String)]()

  try {
  sourceTables = entities.split("#").map{ line =>
                                          val lineSplit = line.split(";")
                                          ( lineSplit(0), lineSplit(1), lineSplit(2), lineSplit(3), lineSplit(4), lineSplit(5), lineSplit(6))
                                        }.to[ArrayBuffer]

  return sourceTables
  } catch {
      case e: Exception => {
          throw new Exception("Unable to parse input parameter for sourceTables : " + entities.toString())
          }
    }
}

In [None]:
%%spark
val allEntities = parseEntities(metadata)

# functions to merge data for full raw into gold tables

In [None]:
%%spark
def mergeFullRawToGold(goldTableName: String, targetFolder: String, srcEntityName: String, targetBaseFilePath: String): Future[Boolean] = Future {

  val rawFilePath = targetBaseFilePath + "/" + targetFolder + "/" + srcEntityName
  val goldTableWritePath = targetBaseFilePath + "/" + targetFolder.replace("raw/","gold/").replace("/full","")

  try{
    val goldDf = spark.read.parquet(rawFilePath)
    goldDf.write.format("delta").mode("overwrite").save(goldTableWritePath)

    spark.sql("""CREATE TABLE IF NOT EXISTS """ + goldTableName + """ 
                USING DELTA LOCATION '""" + goldTableWritePath + """'
              """)
    } catch {
      case e: Exception => {
        throw new Exception("Failed to merge raw data for entity: " + goldTableName + " : with exception: " + e.toString())
        }
    }
  true
}

# functions to merge data for delta raw into gold tables

In [None]:
%%spark
/**
  Purpose: To generate the merge condition based on the primary key
  Params : goldTbl<String>
           stagingTbl<String>
           pKeysList<Array[String]>
  Return : mergeCondition<String>
*/
def gererateMergeCondition(goldTbl: String, stagingTbl: String, pKeysList: Array[String]): String = {
  var mergeCondition = " ON "
  val lastPk = pKeysList.last.trim()
  for (pk <- pKeysList) {
    val p = pk.trim()
    mergeCondition += goldTbl + "." + p + " = " + stagingTbl + "." + p + " "
    if (p != lastPk) {
      mergeCondition += " AND "
    }
  }
  return mergeCondition
}

/**
  Purpose: Execute the function and ingest the data into Gold table.
*/

def mergeDeltaRawToGold(goldTableName: String, targetFolder: String, primaryKey: String, targetBaseFilePath: String, isDeltaTableRequired: String): Future[Boolean] = Future {
  try{
    
    val deltaViewSuffix = goldTableName.split('.').last
    val baseFilePath = targetBaseFilePath
    val rawFolderPath = baseFilePath + "/" + targetFolder
    val pKeysList = primaryKey.split(",")
    
    var deltaDf = spark.read.parquet(rawFolderPath)
      
    val isGoldTableExists = spark.catalog.tableExists(goldTableName)
    if (isGoldTableExists) {
      val tblDf = spark.sql("select * from " + goldTableName + " limit 1")
      val targetTblSchema = tblDf.columns.toList
      val sourceSchema = deltaDf.columns.toList

      if (targetTblSchema.length != sourceSchema.length) {
        println("Schema mismatch found for " + goldTableName + ". Handling within the process....")
        for (attr <- targetTblSchema){
            if (!sourceSchema.contains(attr)) {
                deltaDf = deltaDf.withColumn(attr, lit(null))
            }
        }
      }
      deltaDf = deltaDf.selectExpr(targetTblSchema:_*)      
      val deltaViewName = "deltaView_" + deltaViewSuffix
      deltaDf.createOrReplaceTempView(deltaViewName)

      println("Gold Table Refreshed!!!!")

      if(isDeltaTableRequired.equals("1")){
        val deltadataDf = spark.sql(s"""with cte as (
        select *, row_number() over(partition by incidentid order by modifiedon desc) rn 
        from $deltaViewName
        )
        select *
        from cte
        where rn =1""")

        deltadataDf.createOrReplaceTempView(deltaViewName)
      }
      
      val mergeCondition = gererateMergeCondition(goldTableName, deltaViewName, pKeysList)
      val mergeQuery = s"""MERGE INTO $goldTableName  
                          USING $deltaViewName 
                          $mergeCondition 
                          WHEN MATCHED 
                            THEN UPDATE SET *
                          WHEN NOT MATCHED 
                            THEN INSERT *
                        """
      println(mergeQuery)
      spark.sql(mergeQuery)
      println(goldTableName + " is merged with delta data")
    } else {
      val goldTableWritePath = baseFilePath + "/" + targetFolder.replace("raw/","gold/").replace("/delta","")
      deltaDf.write.format("delta").
                    mode("overwrite").
                    save(goldTableWritePath)
      spark.sql("""CREATE TABLE """ + goldTableName + """  
                  USING DELTA 
                  LOCATION '""" + goldTableWritePath + """'
                """)
      println(goldTableName + " is loaded with delta data")
    }

    if(isDeltaTableRequired.equals("1"))
    {
      
      deltaDf.createOrReplaceTempView("vw_incident_delta")
      val dataDf = spark.sql("""with cte as (
          select *, row_number() over(partition by incidentid order by modifiedon desc) rn 
          from vw_incident_delta
          )
          select *
          from cte
          where rn =1""")

      print("delta table is needed for "+ goldTableName)
      dataDf.write.mode("overwrite").saveAsTable(goldTableName+"_delta")
    }
    true
  } catch {
  case e: Exception => {
    throw new Exception("Failed to merge raw data for entity: " + goldTableName + " : with exception: " + e.toString())
    }
  }
}


# functions to merge data latest data into derived gold tables

In [None]:
%%spark
def mergeInDerivedGold(goldTableName: String, targetFolder: String, targetBaseFilePath: String, sourceQuery: String): Future[Boolean] = Future {
    val goldTableWritePath = targetBaseFilePath +"/"+ targetFolder

    try{
    val goldDf = spark.sql(sourceQuery)
    goldDf.write.format("delta").mode("overwrite").save(goldTableWritePath)
    spark.sql("""CREATE TABLE IF NOT EXISTS """ + goldTableName + """ 
                USING DELTA LOCATION '""" + goldTableWritePath + """'
                """)
    } catch {
    case e: Exception => {
        throw new Exception("Failed to merge data for entity: " + goldTableName + " : with exception: " + e.toString())
        }
    }
    true
}

In [None]:
%%spark
if (consumptionType.toLowerCase() == "full_clean" && LayerProcessing.toLowerCase() == "raw") {
    val createFullLoadTasks = Future.sequence( allEntities.map( k => mergeFullRawToGold(k._1, k._2, k._3, k._5)) )
    val viewFullLoadResults = Await.result( createFullLoadTasks, 999 minutes )
} else if (consumptionType.toLowerCase() == "delta_clean" && LayerProcessing.toLowerCase() == "raw") {
    val createDeltaLoadTasks = Future.sequence( allEntities.map( k => mergeDeltaRawToGold(k._1, k._2, k._4, k._5, k._7)) )
    val viewDeltaLoadResults = Await.result(createDeltaLoadTasks, 999 minutes )
} else if (consumptionType == "" && LayerProcessing.toLowerCase() == "gold"){
    val createDerivedgoldLoadTasks = Future.sequence( allEntities.map( k => mergeInDerivedGold(k._1, k._2, k._5, k._6)) )
    val viewDerivedgoldLoadResults = Await.result(createDerivedgoldLoadTasks, 999 minutes )
}

In [None]:
%%spark
mssparkutils.notebook.exit("Notebook completed")