In [None]:
%scala

/**
* Databricks notebooks, DBR 14.0, 
* Spark version => 3.5.0
* Scala version => 2.12
*/

/**
* Load necessary artifacts
*/

import java.io.FileNotFoundException
import org.apach.logging.log4j.LogManager
import org.apache.spark.sql.functions.{countDistinct,concat,lit,col.year,month,to_timestamp}

/**
* Enable Spark 3.x ADQ settings [if it is not enabled by default]
*/

spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.skewedJoin.enabled","true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")

/**
* Setup the input/ouput configuration
* TODO - Inject the properties using ConfigReader
*/


val inputMemerEligibility = "dbfs:/FileStore/member_eligibility.csv"
val inputMemberEligibilityDetails = "dbfs:/FileStore/member_months.csv"
val outputJSONPath1 = "dbfs:/FileStore/output001"
val outputJSONPath2 = "dbfs:/FileStore/output002"

/**
* Setup the Logger
*/

private val LOGGER = LogManager.getLogger("MemberCalculation")

try {

    //Read the input File from local filestore

    val member_eligibility = spark.read.option("header","true")
    .csv(inputMemerEligibility)
    val member_eligibility_details = spark.read.option("header","true")
    .csv(inputMemberEligibilityDetails)

    //Lets put them in cache, since we are doing more than one aggregation

    member_eligibility.cache()
    member_eligibility_details.cache()

    /**
    * Calculate the total no of member member_months
    * Input ==> Two static input filestore
    * Output ==> JSON format and partition by high cardiality ? member_id
    */

    val monthCalculation = member_eligibility_details.na.drop()
    .distinct.groupBy("member_id")
    .agg(countDistinct("eligibility_member_month").alias("MonthCount"))
    .join(member_eligibility,"member_id")
    .withColumn("FullName", concat(col("first_name"),lit(" "),col("middle_name"),lit(" "),col("last_name")))
    .withColumnRenamed("member_id","MemberId")
    .drop("first_name","middle_name","last_name")

    LOGGER.debug("### Member agg done.. writing as json partition file...")

    monthCalculation.write.partitionBy("MemberId").json(outputJSONPath1)

    /**
    * TODO Write as a seperate module
    */

    /**
    * Calculate the total no of member member_months per year
    * Input ==> Two static input filestore
    * Output ==> JSON format 
    */

    val monthCalculationPerYear = member_eligibility_details.na.drop()
    .withColumn("Year",year(to_timestamp($"eligibility_member_month","yyyyMM")))
    .withColumn("month", month(to_timestamp($"eligibility_member_month","yyyyMM")))
    .distinct.groupBy("member_id","Year")
    .agg(countDistinct("month").alias("MonthCount"))
    .withColumnRenamed("member_id","MemberId")

    LOGGER.debug("### Member agg per year done.. writing as json file...")

    monthCalculationPerYear.write.json(outputJSONPath2)

    LOGGER.debug("### All done !!!")


}
catch {
    case ex: FileNotFoundException => { //TODO write a handler function
        LOGGER.error(s"### File not found, please check the path : $ex")
    }
    case unknown: Exception => {
        LOGGER.error(s"### Unknown exception, please trace.. : $unknown")
    }
}
