# Use Case
This notebook covers the following use case:
1. Read NYC Taxi Parquet Data format - List of Parquet files URLs are provided from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
2. For each url (file) perform some transformation and store in Delta format.
3. Compute the average distance, average cost per mile and average cost from Delta Table using incremental load
4. Store computed value from Step#3 in Delta format into the KPI output folder
5. Repeat step 3 to 5 for each month
6. Create Delta Table on Delta Format output folder (auto refresh)
7. The KPI output folder will have multiple versions of the average distance and the average cost per mile for a trip
8. Use Delta Time Travel to present KPI output in a graphical format

## Provide require configurations for the delta lake 
** If cluster is enabled with delta lake, then we would not require the following configuration cell
** Delta Lake Spark Compatibility matrix -  https://docs.delta.io/latest/releases.html, change Delta Lake version based on Spark Version

In [None]:
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
           "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
           "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
          }
}

## List of data files 
** These file URLs are from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [None]:
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._

// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)

// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-02.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-03.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-04.parquet"/*,,
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-05.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-06.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-07.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-08.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-09.parquet",
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-10.parquet"*/
)

// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
    val urlPath=url.split("/") 
    val fileName = urlPath(urlPath.size-1)
    val urlSaveFilePath = s"/tmp/${fileName}"
    val hdfsSaveFilePath = s"/tmp/${fileName}"
    val file = new File(urlSaveFilePath)
    FileUtils.copyURLToFile(new URL(url), file)
    // copy local file to HDFS /tmp/${fileName}
    // we will use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
    fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
    DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

##  Create output Directory
The location where we would like to create delta format output, change the transformDeltaOutputPath and avgDeltaOutputKPIPath varibale if require
- avgDeltaOutputKPIPath - to store average KPI in delta format
- transformDeltaOutputPath - store transformed output in delta format

In [None]:
import org.apache.hadoop.fs._
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)

## Create Delta Format Data From Parquet Format

- Input data will be from listOfDataFile (data downloaded from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
- To demonstrate the Time travel and version, we will load them individually
- Perform transformation and compute following business KPI on incremental load:
    - The average distance
    - The average cost per mile
    - The average cost
- Save transformed and KPI data in delta format

In [None]:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame

// UDF to compute sum of value paid by customer
def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
    val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
    total
})

// read parquet file from spark conf with given file input
// transform data to compute total amount
// compute kpi for the given file/batch data
def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
    val df = spark.read.format(format).load(fileName)
    val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
    // union with old data to compute KPI
    val dfFullLoad= oldData match {
        case Some(odf)=>
                dfNewLoad.union(odf)
        case _ =>
                dfNewLoad
    }
    dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
    val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
    // save only new transformed data
    dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
    //save compute KPI
    dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
    // return incremental dataframe for next set of load
    dfFullLoad
}

// load data for each data file, use last dataframe for KPI compute with the current load
def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
    if(dataFile.isEmpty) {
        true
    } else {
        val nextDataFile = dataFile.head
        val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
        loadData(dataFile.tail,Some(newFullDF))
    }
}
loadData(listOfDataFile,None)

## Read delta format using Delta Table
- read transformed data
- read KPI data

In [None]:
import io.delta.tables._
val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)

## Print Schema
Print Delta Table Schema for transformed and average KPI data

In [None]:
// tranform data schema
dtTransformed.toDF.printSchema
// Average KPI Data Schema
dtAvgKpi.toDF.printSchema

## Display Last Computed KPI from Data Table 

In [None]:
dtAvgKpi.toDF.show(false)

## Display Computed KPI History

In [None]:
dtAvgKpi.history().show(false)

## Display KPI data after each data load
Using Time travel you can view KPI changes 

In [None]:
val kpiAvgLogDF = spark.read.json(s"${avgDeltaOutputKPIPath}/_delta_log/*.json")
val kpiAvgLogDetailDF = kpiAvgLogDF.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL")
val kpiParquetTableDF = spark.read.parquet(s"$avgDeltaOutputKPIPath/*.parquet").withColumn("input_file",substring_index(input_file_name, "/", -1))
kpiParquetTableDF.join(kpiAvgLogDetailDF,kpiParquetTableDF("input_file") === kpiAvgLogDetailDF("file_path"),"inner" ).select("avgDist","avgCostPerMile","avgCost","version").orderBy("version").show(false)

## Delta Log for transformed data
Query the .json _delta_log transaction files which will tell us which version has added which file 

In [None]:
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)