In [0]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
import spark.implicits._

In [1]:
// Define Spark Session
val spark = SparkSession.builder().appName("tlcTripRecordETL").getOrCreate()

In [2]:
def loadRawTLCDataParquet(spark: SparkSession, path: String): DataFrame= {
    spark.read.parquet(s"$path.parquet")
  }
 
  
def saveCleanData(resultDF: DataFrame, cleanOutputPath: String): Unit = {
    resultDF.write.mode(SaveMode.Overwrite).parquet(s"$cleanOutputPath")
  }
  
  
def loadCleanData(path: String): DataFrame = {
    spark.read.parquet(f"$path")
  }
  

def getNumericColumnLowerAndUpperBound(df: DataFrame, columnName: String) = {
    val meanValue = df.agg(stddev(columnName).alias("stddev"), mean(columnName).alias("mean")).head()
    val stddev_ = meanValue.getDouble(0)
    val mean_ = meanValue.getDouble(1)
    // Define the upper and lower bounds
    val lowerBound = mean_ - 3 * stddev_
    val upperBound = mean_ + 3 * stddev_
    (lowerBound, upperBound)
}


def cleanRawDataYellow(rawDF: DataFrame): DataFrame = {

    // Keep non-null entries in these columns
    val nonNullDF = rawDF.filter(
        col("PUlocationID").isNotNull &&
        col("DOlocationID").isNotNull &&
        col("total_amount").isNotNull &&
        col("fare_amount").isNotNull &&
        col("payment_type").isNotNull &&
        col("RatecodeID").isNotNull)
    
    // Rename columns
    val newColumnNames = Map(
      "tpep_dropoff_datetime" -> "dropoff_datetime",
      "tpep_pickup_datetime" -> "pickup_datetime",
      "PUlocationID" -> "pulocationID",
      "DOlocationID" -> "dolocationID"
    )
        
    val renamedDF = newColumnNames.foldLeft(nonNullDF) {
        case (accDF, (oldName, newName)) => accDF.withColumnRenamed(oldName, newName)
    }
    
    // Compute valid ranges of values for columns
    val validPaymentTypes = Seq(1, 2, 3)
    val validRateCodeIds = Seq(1, 5, 6)
    val validTotalAmount = getNumericColumnLowerAndUpperBound(rawDF, "total_amount")
    val totalAmountLower = validTotalAmount._1
    val totalAmountUpper = validTotalAmount._2
    val pattern = """(\d{4}-\d{2})""".r
    
    // Add month-date from data and compare it with month-date extracted from data file name
    val transformedDF = renamedDF.withColumn("year_month", regexp_extract(col("file_path"), pattern.toString, 1)).withColumn("data_year_month", concat_ws("-", year(col("pickup_datetime")), format_string("%02d", month(col("pickup_datetime")))))
    
    // Remove rows which follow rules
    val filteredDF = transformedDF.filter(col("passenger_count") =!= 0).filter(col("pulocationID") =!= col("dolocationID")).filter(col("data_year_month") === col("year_month")).filter(col("payment_type").isin(validPaymentTypes: _*)).filter(col("RatecodeID").isin(validRateCodeIds: _*)).filter(col("fare_amount") =!= 0).filter(col("total_amount").between(totalAmountLower, totalAmountUpper))
    
    // Impute: cast to Long
    val castDf = filteredDF.withColumn("pulocationID", col("pulocationID").cast(LongType)).withColumn("dolocationID", col("dolocationID").cast(LongType))
    castDf.select("pulocationID", "dolocationID").coalesce(1)
}
  
 
def ETLYellowTaxiTripDataset(spark: SparkSession, path: String, cleanOutputPath:String): Unit = {
    val years = Array("2020", "2021", "2022", "2023")
    val months = Array("01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12")
    
    val filePaths2020 = for (m <- months.slice(9, 12)) yield s"$path/2020/$m/yellow_tripdata_2020-${m}"
    val filePaths2021 = for (m <- months) yield s"$path/2021/${m}/yellow_tripdata_2021-${m}"
    val filePaths2022 = for (m <- months) yield s"$path/2022/${m}/yellow_tripdata_2022-${m}"
    val filePaths2023 = for (m <- months.slice(0, 9)) yield s"$path/2023/${m}/yellow_tripdata_2023-${m}"
    
    val filePaths = filePaths2020 ++ filePaths2021 ++ filePaths2022 ++ filePaths2023
    
    val schema = StructType(Array(StructField("pulocationID", LongType, true), StructField("dolocationID", LongType, true)))
    var resultDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

    for (filePath <- filePaths) {
        val rawDF = loadRawTLCDataParquet(spark, filePath)
        val transformedDF = rawDF.withColumn("file_path", lit(filePath))
        val cleanDF = cleanRawDataYellow(transformedDF)
        resultDF = resultDF.union(cleanDF)
    }
    
    saveCleanData(resultDF, cleanOutputPath)
    loadCleanData(cleanOutputPath).show()
}

In [3]:
val sourcePath = "/user/cg4177_nyu_edu/project/data/source/tlc/yellow"
val cleanOutputPath = "/user/cg4177_nyu_edu/project/data/clean/tlc/yellow/merged_yellow_cleaned_data.parquet"
ETLYellowTaxiTripDataset(spark, sourcePath, cleanOutputPath)

In [4]:
def cleanRawDataGreen(rawDF: DataFrame): DataFrame = {

    // Keep non-null entries in these columns
    val nonNullDF = rawDF.filter(
        col("PUlocationID").isNotNull &&
        col("DOlocationID").isNotNull &&
        col("total_amount").isNotNull &&
        col("fare_amount").isNotNull &&
        col("payment_type").isNotNull &&
        col("RatecodeID").isNotNull)
    
    // Rename columns
    val newColumnNames = Map(
      "lpep_pickup_datetime" -> "dropoff_datetime",
      "lpep_dropoff_datetime" -> "pickup_datetime",
      "PUlocationID" -> "pulocationID",
      "DOlocationID" -> "dolocationID"
    )
        
    val renamedDF = newColumnNames.foldLeft(nonNullDF) {
        case (accDF, (oldName, newName)) => accDF.withColumnRenamed(oldName, newName)
    }
    
    // Compute valid ranges of values for columns
    val validPaymentTypes = Seq(1, 2, 3)
    val validRateCodeIds = Seq(1, 5, 6)
    val validTotalAmount = getNumericColumnLowerAndUpperBound(rawDF, "total_amount")
    val totalAmountLower = validTotalAmount._1
    val totalAmountUpper = validTotalAmount._2
    val pattern = """(\d{4}-\d{2})""".r

    
    // Add month-date from data and compare it with month-date extracted from data file name
    val transformedDF = renamedDF.withColumn("year_month", regexp_extract(col("file_path"), pattern.toString, 1)).withColumn("data_year_month", concat_ws("-", year(col("pickup_datetime")), format_string("%02d", month(col("pickup_datetime")))))
    
    // Remove rows which follow rules
    val filteredDF = transformedDF.filter(col("passenger_count") =!= 0).filter(col("pulocationID") =!= col("dolocationID")).filter(col("data_year_month") === col("year_month")).filter(col("payment_type").isin(validPaymentTypes: _*)).filter(col("RatecodeID").isin(validRateCodeIds: _*)).filter(col("fare_amount") =!= 0).filter(col("total_amount").between(totalAmountLower, totalAmountUpper))
    
    // Impute: cast to Long
    val castDf = filteredDF.withColumn("pulocationID", col("pulocationID").cast(LongType)).withColumn("dolocationID", col("dolocationID").cast(LongType))
    castDf.select("pulocationID", "dolocationID").coalesce(1)
}
  
 
def ETLGreenTaxiTripDataset(spark: SparkSession, path: String, cleanOutputPath:String): Unit = {
    val years = Array("2020", "2021", "2022", "2023")
    val months = Array("01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12")
    
    val filePaths2020 = for (m <- months.slice(9, 12)) yield s"$path/2020/$m/green_tripdata_2020-${m}"
    val filePaths2021 = for (m <- months) yield s"$path/2021/${m}/green_tripdata_2021-${m}"
    val filePaths2022 = for (m <- months) yield s"$path/2022/${m}/green_tripdata_2022-${m}"
    val filePaths2023 = for (m <- months.slice(0, 9)) yield s"$path/2023/${m}/green_tripdata_2023-${m}"
    
    val filePaths = filePaths2020 ++ filePaths2021 ++ filePaths2022 ++ filePaths2023

    val schema = StructType(Array(StructField("pulocationID", LongType, true), StructField("dolocationID", LongType, true)))
    var resultDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

    for (filePath <- filePaths) {
        val rawDF = loadRawTLCDataParquet(spark, filePath)
        val transformedDF = rawDF.withColumn("file_path", lit(filePath))
        val cleanDF = cleanRawDataGreen(transformedDF)
        resultDF = resultDF.union(cleanDF)
    }
    
    saveCleanData(resultDF, cleanOutputPath)
    loadCleanData(cleanOutputPath).show()
}

In [5]:
val sourcePath = "/user/cg4177_nyu_edu/project/data/source/tlc/green"
val cleanOutputPath = "/user/cg4177_nyu_edu/project/data/clean/tlc/green/merged_green_cleaned_data.parquet"
ETLGreenTaxiTripDataset(spark, sourcePath, cleanOutputPath)

### Data Profiling for TLC Yellow and Green Taxi Trip Record Data

Three options:
* Summary of cleaned data
* Distinct Values
* Frequency Distribution and Value Counts

Observations - 
* Do Green Taxis pick up from Lower Manhattan and Midtown?
* What are the top-5 pick-up locations for Yellow and Green Taxis?
* What's the most frequent route for Yellow and Green Taxis?

In [7]:
val tlcYellowCleanDF = loadCleanData("/user/cg4177_nyu_edu/project/data/clean/tlc/yellow/merged_yellow_cleaned_data.parquet")
val tlcGreenCleanDF = loadCleanData("/user/cg4177_nyu_edu/project/data/clean/tlc/green/merged_green_cleaned_data.parquet")

In [8]:
z.show(tlcYellowCleanDF.summary())

In [9]:
z.show(tlcGreenCleanDF.summary())

### Distinct Counts

In [11]:
// Distinct Pick-Up and Drop-Off Location IDs in the Dataset

val tlcYellowpulocationIDs: Array[Any] = tlcYellowCleanDF.select("pulocationID").distinct().orderBy("pulocationID").collect().map(row => row(0))
val tlcYellowdolocationIDs: Array[Any] = tlcYellowCleanDF.select("dolocationID").distinct().orderBy("dolocationID").collect().map(row => row(0))

val tlcGreenpulocationIDs: Array[Any] = tlcGreenCleanDF.select("pulocationID").distinct().orderBy("pulocationID").collect().map(row => row(0))
val tlcGreendolocationIDs: Array[Any] = tlcGreenCleanDF.select("dolocationID").distinct().orderBy("dolocationID").collect().map(row => row(0))

In [12]:
// Green taxis are allowed to operate in above W 110 St/E 96th St in Manhattan
val manhattanAreas = Array(4,24,12,13,41,45,42,43,48,50,68,79,74,75,87,88,90,125,100,103,103,103,107,113,114,116,120,127,128,151,140,137,141,142,152,143,144,148,153,158,161,162,163,164,170,166,186,194,202,209,211,224,229,230,231,239,232,233,234,236,237,238,263,243,244,246,249,261,262)
val greenTaxiManhattanValidAreas = Array(166, 41, 74, 75, 152, 42, 116, 244, 120, 243, 128, 127, 153)
val greenTaxiExcludedAreas = manhattanAreas.filterNot(greenTaxiManhattanValidAreas.contains)

// Lets check if the pick-up and drop-off areas intersect for Green Taxis
val greenTaxiViolatedAreas = (tlcGreenpulocationIDs.intersect(greenTaxiExcludedAreas) ++ tlcGreendolocationIDs.intersect(greenTaxiExcludedAreas)).toSet

if (greenTaxiViolatedAreas.nonEmpty) {
  println("Green taxis violate pick-up and drop-off in these Manhattan areas: " + greenTaxiViolatedAreas.mkString(", "))
} else {
  println("Green Taxis operate in designated areas")
}

We observe that Green Taxis violate the rules of operation below W 110 St/E 96th St in Manhattan

The dataset contains pick-up and dropoff from all locations within all the 5 boroughs. These numbers are representative of the areas within the boroughs. For detailed info refer - 
* https://www.nyc.gov/assets/tlc/images/content/pages/about/taxi_zone_map_manhattan.jpg


### Frequency Distribution and Value Counts

In [16]:
// Top-5 pick-up locations for Yellow Taxis and Green Taxis

val columnName = "pulocationID"
val top5pulocationIDYellow = tlcYellowCleanDF.groupBy(columnName)
  .agg(
      count(columnName).alias("trips"),
      round((count(columnName) / tlcYellowCleanDF.count())*100.0, 2).alias("percentage")
      )
  .orderBy(col("trips").desc)
  .limit(5)
  
val top5pulocationIDGreen = tlcGreenCleanDF.groupBy(columnName)
  .agg(
      count(columnName).alias("trips"),
      round((count(columnName) / tlcGreenCleanDF.count())*100.0, 2).alias("percentage")
      )
  .orderBy(col("trips").desc)
  .limit(5)

In [17]:
println("Top-5 Pick-Up locations for Yellow Taxis: ")
top5pulocationIDYellow.show()

In [18]:
println("Top-5 Pick-Up locations for Green Taxis: ")
top5pulocationIDGreen.show()

* Roughly 20% of Yellow Taxis pickup from Midtown and Upper East Side next to Central Park in Manhattan
* About 50% of Green Taxis pickup from Upper East Side above Central Park and from above 110th St W in Manhattan. The next popular pickup stop is in Queens

In [20]:
// Most popular Yellow and Green Taxi Trip
val column1 = "pulocationID"
val column2 = "dolocationID"

val topRouteYellowTaxi = tlcYellowCleanDF.withColumn("trip_route", when(col(column1) < col(column2), array(column1, column2)).otherwise(array(column2, column1)))
    .groupBy("trip_route")
    .agg(
        count("*").alias("counts"),
        (count("*") / tlcYellowCleanDF.count() * 100).alias("percentage")
        )
    .orderBy(col("counts").desc)
    .limit(1)

val topRouteGreenTaxi = tlcGreenCleanDF.withColumn("trip_route", when(col(column1) < col(column2), array(column1, column2)).otherwise(array(column2, column1)))
    .groupBy("trip_route")
    .agg(
        count("*").alias("counts"),
        (count("*") / tlcGreenCleanDF.count() * 100).alias("percentage")
        )
    .orderBy(col("counts").desc)
    .limit(1)

In [21]:
topRouteYellowTaxi.show();
topRouteGreenTaxi.show();

Most popular trip route for both taxis is in Upper East Side Manhattan