# Big Data project A.Y. 2024-2025 - First Job

## Members

- Giovanni Antonioni
- Luca Rubboli - 0001083742

In [22]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("First job")
  .getOrCreate()

val sc = spark.sparkContext

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5b7bc469
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@352dc96a


### Define useful parameters

- Dataset location
- Iterator (defined like this to overcome different names for same columns in dataset)

In [23]:
val decimals: Int = 4
val datasetDir = "dataset"
val outputDir = "output/firstJobOutputOpt"
val yellowDatasetDir = s"$datasetDir/yellow_cab"
val greenDatasetDir = s"$datasetDir/green_cab"
val fhvDatasetDir = s"$datasetDir/fhv_cab"
val fhvhvDatasetDir = s"$datasetDir/fhvhv_cab"
val datasetDirMap: Map[String, String] = Map("yellow" -> yellowDatasetDir, "green" -> greenDatasetDir, "fhv" -> fhvDatasetDir, "fhvhv" -> fhvhvDatasetDir)
val datasetIterator: Map[String, (String, String)] = Map(
  "yellow" -> ("tpep_dropoff_datetime", "tpep_pickup_datetime"),
  "green" -> ("lpep_dropoff_datetime", "lpep_pickup_datetime"),
  // ("fhv", "tpep_dropoff_datetime", "tpep_pickup_datetime"),
  // ("fhvhv", "tpep_dropoff_datetime", "tpep_pickup_datetime"),
)

decimals: Int = 4
datasetDir: String = dataset
outputDir: String = output/firstJobOutputOpt
yellowDatasetDir: String = dataset/yellow_cab
greenDatasetDir: String = dataset/green_cab
fhvDatasetDir: String = dataset/fhv_cab
fhvhvDatasetDir: String = dataset/fhvhv_cab
datasetDirMap: Map[String,String] = Map(yellow -> dataset/yellow_cab, green -> dataset/green_cab, fhv -> dataset/fhv_cab, fhvhv -> dataset/fhvhv_cab)
datasetIterator: Map[String,(String, String)] = Map(yellow -> (tpep_dropoff_datetime,tpep_pickup_datetime), green -> (lpep_dropoff_datetime,lpep_pickup_datetime))


## Define Columns for analysis
- Columns names
- Time zones for overprice
- Columns used in classification for average price calculation
- Columns which values are used in analysis

In [24]:
val colDurationMinutes: String = "duration_minutes"
val colDurationMinutesBinLabel: String = "duration_minutes_bin_label"
val colYear: String = "year"
val colWeekdaySurcharge: String = "weekday_surcharge"
val colAggregateFee: String = "fees"
val colAggregateFeeBin: String = "agg_fee_bin_label"
val colDistanceBin: String = "distance_bin_label"
val colFareAmount: String = "fare_amount"
val colPricePerDistance: String = "cost_per_distance"
val colPricePerTime: String = "cost_per_time"
val colAvgPricePerDistance: String = "avg_cost_per_distance"
val colAvgPricePerTime: String = "avg_cost_per_time"
val colPricePerDistanceDiff: String = "cost_per_distance_diff"
val colPricePerDistanceDiffPcg: String = "cost_per_distance_diff_pcg"
val colPricePerTimeDiff: String = "cost_per_time_diff"
val colPricePerTimeDiffPcg: String = "cost_per_time_diff_pcg"
val colPricePerDistanceDiffPcgLabel: String = colPricePerDistanceDiffPcg + "_label"
val colPricePerTimeDiffPcgLabel: String = colPricePerTimeDiffPcg + "_label"

val timeZoneOver: String = "overnight"
val timeZones = Map(timeZoneOver -> (20, 6), "regular" -> (6, 20))
val weekDaySurcharge: Double = 2.5

val colDurationOvernightPcg: String = s"${timeZoneOver}_duration_pcg"

val colToUse: Set[String] = Set(
  "tpep_pickup_datetime",
  "tpep_dropoff_datetime",
  "lpep_pickup_datetime",
  "lpep_dropoff_datetime",
  "passenger_count",
  "trip_distance",
  "ratecodeid",
  "store_and_fwd_flag",
  "payment_type",
  "fare_amount",
  "extra",
  "mta_tax",
  "tip_amount",
  "tolls_amount",
  "improvement_surcharge",
  "total_amount",
  "congestion_surcharge",
  "airport_fee")

val colFees: Set[String] = Set(
  "extra",
  "mta_tax",
  "improvement_surcharge",
  "congestion_surcharge",
  "airport_fee")

val colsForClassification: Seq[String] = Seq(
  "passenger_count",
  "store_and_fwd_flag",
  "payment_type",
  colAggregateFeeBin,
  colDurationMinutesBinLabel,
  colDistanceBin,
  colYear,
  s"${colDurationOvernightPcg}_label",
  colPricePerDistanceDiffPcgLabel,
  colPricePerTimeDiffPcgLabel
)

val colsForValuesAnalysis: Seq[String] = Seq(
  "passenger_count",
  "store_and_fwd_flag",
  "payment_type",
  colAggregateFeeBin,
  colDurationMinutesBinLabel,
  colDistanceBin,
  colYear,
  s"${colDurationOvernightPcg}_label",
)

colDurationMinutes: String = duration_minutes
colDurationMinutesBinLabel: String = duration_minutes_bin_label
colYear: String = year
colWeekdaySurcharge: String = weekday_surcharge
colAggregateFee: String = fees
colAggregateFeeBin: String = agg_fee_bin_label
colDistanceBin: String = distance_bin_label
colFareAmount: String = fare_amount
colPricePerDistance: String = cost_per_distance
colPricePerTime: String = cost_per_time
colAvgPricePerDistance: String = avg_cost_per_distance
colAvgPricePerTime: String = avg_cost_per_time
colPricePerDistanceDiff: String = cost_per_distance_diff
colPricePerDistanceDiffPcg: String = cost_per_distance_diff_pcg
colPricePerTimeDiff: String = cost_per_time_diff
colPricePerTimeDiffPcg: String = cost_per_time_diff_pcg
colPricePerDistanceDiffPcgLabel: String = ...


### Define preprocess rules

In [25]:
val featureFilters: Map[String, Any => Boolean] = Map(
  "passenger_count" -> {
    case i: Int => i > 0
    case f: Float => val i = f.toInt; i > 0
    case d: Double => val i = d.toInt; i > 0
    case _ => false
  },
  "trip_distance" -> {
    case i: Int => i > 0
    case i: Float => i > 0
    case i: Double => i > 0
    case _ => false
  },
  "ratecodeid" -> {
    case i: Int => (i >= 1 && i <= 6) || i == 99
    case f: Float => val i = f.toInt; (i >= 1 && i <= 6) || i == 99
    case d: Double => val i = d.toInt; (i >= 1 && i <= 6) || i == 99
    case _ => false
  },
  "store_and_fwd_flag" -> {
    case i: String => i == "Y" || i == "N"
    case _ => false
  },
  "payment_type" -> {
    case i: Int => i >= 1 && i <= 6
    case f: Float => val i = f.toInt; i >= 1 && i <= 6
    case d: Double => val i = d.toInt; i >= 1 && i <= 6
    case _ => false
  },
  "fare_amount" -> {
    case i: Int => i > 0
    case i: Float => i > 0
    case i: Double => i > 0
    case _ => false
  },
  "tolls_amount" -> {
    case i: Int => i >= 0 && i < 200
    case i: Float => i >= 0 && i < 200
    case i: Double => i >= 0 && i < 200
    case _ => false
  }
)

val taxFilter: Any => Boolean = {
  case tax: Int => tax >= 0 && tax < 20
  case tax: Float => tax >= 0 && tax < 20
  case tax: Double => tax >= 0 && tax < 20
  case _ => false
}

featureFilters: Map[String,Any => Boolean] = Map(trip_distance -> $Lambda$5740/0x0000000801cb8040@7895dff9, tolls_amount -> $Lambda$5745/0x0000000801cbc040@758bf804, payment_type -> $Lambda$5743/0x0000000801cba840@7b0ab4a0, fare_amount -> $Lambda$5744/0x0000000801cbb040@402229d4, passenger_count -> $Lambda$5739/0x0000000801caf040@161e5c3e, store_and_fwd_flag -> $Lambda$5742/0x0000000801cb9840@3c7fcd21, ratecodeid -> $Lambda$5741/0x0000000801cb9040@12fed1ba)
taxFilter: Any => Boolean = $Lambda$5746/0x0000000801cbc840@43c336b8


### Utils functions for rdd

In [26]:
import java.time.temporal.ChronoUnit
import java.time.{DayOfWeek, LocalDate, LocalDateTime}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import scala.math.BigDecimal.RoundingMode

val projectDir: String = "/Users/luca/Desktop/Luca/Università/Magistrale/Corsi/BigData/Drivers"

def getDatasetPath(localPath: String): String = {
  "file://" + projectDir + "/" + localPath
}

def binColByStepValue(rdd: RDD[Row], indexOfColToDiscrete: Int, stepValue: Int = 5): RDD[Row] = {
  rdd.map { row =>
    val value: Double = row.get(indexOfColToDiscrete) match {
      case i: Int => i.toDouble
      case d: Double => d
      case l: Long => l.toDouble
      case s: String => try { s.toDouble } catch { case _: Throwable => Double.NaN}
      case _ => Double.NaN
    }

    val rawBin = (value / stepValue).toInt * stepValue
    val binBase = if (value < 0 && value % stepValue == 0) rawBin + stepValue else rawBin
    val label = if (value < 0) { s"[${(binBase - stepValue).toInt}|${binBase.toInt})" } else { s"[${binBase.toInt}|${(binBase + stepValue).toInt})" }

    Row.fromSeq(row.toSeq :+ label)
  }
}

val castForFilter: Any => Any = {
  case s: String => if (s.matches("""^-?\d+\.\d+$""")) s.toDouble else if (s.matches("""^-?\d+$""")) s.toInt else s.trim
  case d: Double => d
  case i: Int => i
  case l: Long => l.toDouble
  case f: Float => f.toDouble
  case b: Boolean => b
  case null => null
  case other => other.toString.trim
}

val preciseBucketUDF: (Map[String, (Int, Int)], LocalDateTime, LocalDateTime, Int) => Map[String, Double] = { (timeZones: Map[String, (Int, Int)], start: LocalDateTime, end: LocalDateTime, decimals: Int) =>

  val overlap: (LocalDateTime, LocalDateTime, LocalDateTime, LocalDateTime, Int) => Double = { (start1: LocalDateTime, end1: LocalDateTime, start2: LocalDateTime, end2: LocalDateTime, decimals: Int) =>
    val overlapStart = if (start1.isAfter(start2)) start1 else start2
    val overlapEnd = if (end1.isBefore(end2)) end1 else end2
    if (overlapEnd.isAfter(overlapStart)) BigDecimal(ChronoUnit.MILLIS.between(overlapStart, overlapEnd) / 60000.0).setScale(decimals, RoundingMode.HALF_UP).toDouble else 0.0
  }

  var result = timeZones.keys.map(_ -> 0.0).toMap

  if (!(start == null || end == null)) {

    if (!end.isBefore(start)) {

      var current = start.toLocalDate.atStartOfDay

      while (!current.isAfter(end)) {
        val nextDay = current.plusDays(1)

        timeZones.foreach {
          case (label, (startHour, endHour)) if startHour > endHour =>
            val bucketStartBeforeMidnight = current.withHour(startHour).withMinute(0).withSecond(0).withNano(0)
            val bucketEndBeforeMidnight = current.withHour(23).withMinute(59).withSecond(59)
            val bucketStartAfterMidnight = current.withHour(0).withMinute(0).withSecond(0).withNano(0)
            val bucketEndAfterMidnight = current.withHour(endHour).withMinute(0).withSecond(0).withNano(0)

            val minutesBeforeMidnight = overlap(start, end, bucketStartBeforeMidnight, bucketEndBeforeMidnight, decimals)
            val minutesAfterMidnight = overlap(start, end, bucketStartAfterMidnight, bucketEndAfterMidnight, decimals)

            result = result.updated(label, result(label) + minutesBeforeMidnight + minutesAfterMidnight)

          case (label, (startHour, endHour)) =>
            val bucketStart = current.withHour(startHour).withMinute(0).withSecond(0).withNano(0)
            val bucketEnd = if (endHour == 24) current.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0) else current.withHour(endHour).withMinute(0).withSecond(0).withNano(0)

            val minutes = overlap(start, end, bucketStart, bucketEnd, decimals)

            result = result.updated(label, result(label) + minutes)
        }

        current = nextDay
      }
    }
  }
  result
}

val isUSHolidayOrWeekend: LocalDate => Boolean = { date =>
  val month = date.getMonthValue
  val day = date.getDayOfMonth
  val dayOfWeek = date.getDayOfWeek

  val isIndependenceDay = month == 7 && day == 4
  val isChristmas = month == 12 && day == 25
  val isNewYear = month == 1 && day == 1
  val isLaborDay = month == 9 && dayOfWeek == DayOfWeek.MONDAY && day <= 7

  val isThanksgiving = month == 11 && dayOfWeek == DayOfWeek.THURSDAY && day >= 22 && day <= 28 && ((day - 1) / 7 + 1 == 4)

  isIndependenceDay || isChristmas || isNewYear || isLaborDay || isThanksgiving || dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY
}

import java.time.temporal.ChronoUnit
import java.time.{DayOfWeek, LocalDate, LocalDateTime}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import scala.math.BigDecimal.RoundingMode
projectDir: String = /Users/luca/Desktop/Luca/Università/Magistrale/Corsi/BigData/Drivers
getDatasetPath: (localPath: String)String
binColByStepValue: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], indexOfColToDiscrete: Int, stepValue: Int)org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
castForFilter: Any => Any = $Lambda$5747/0x0000000801cbf040@4e8178d0
preciseBucketUDF: (Map[String,(Int, Int)], java.time.LocalDateTime, java.time.LocalDateTime, Int) => Map[String,Double] = $Lambda$5748/0x0000000801cbfc40@1673a4c5
isUSHolidayOrWeekend: java.time.LocalDate => Boolean = $Lambda$57...


# Actual job

1) Select dataset [yellow or green]

In [27]:
val name: String = "yellow"
val (dropoff, pickup) = datasetIterator(name)

name: String = yellow
dropoff: String = tpep_dropoff_datetime
pickup: String = tpep_pickup_datetime


2) Load dataset

In [28]:
val startTime = System.currentTimeMillis()

val dataset = spark.read.parquet(getDatasetPath(datasetDirMap(name)))
var headers: Seq[String] = dataset.columns.map(_.toLowerCase)
val indexesToUse: Seq[Int] = headers.zipWithIndex.collect {
  case(h, i) if colToUse.contains(h.toLowerCase) => i
}

startTime: Long = 1750336187435
dataset: org.apache.spark.sql.DataFrame = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]
headers: Seq[String] = ArraySeq(vendorid, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, pulocationid, dolocationid, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee)
indexesToUse: Seq[Int] = ArraySeq(1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18)


3) Filter taxes and features based on filter conditions previously defined

In [29]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel

def transformRDD(dataset: DataFrame, headers: Seq[String], idxs: Seq[Int], castFunc: Any => Any): RDD[Row] = {
  dataset.rdd.map(row => Row.fromSeq(idxs.map(row.get).map(castFunc)))
}

val rdd = transformRDD(dataset, headers, indexesToUse, castForFilter).persist(StorageLevel.MEMORY_ONLY)
headers = headers.filter(head => colToUse.contains(head.toLowerCase))

def applyFilters(rdd: RDD[Row], headers: Seq[String], colOfFees: Set[String], taxFilter: Any => Boolean, featFilter: Map[String, Any => Boolean]): RDD[Row] = {
  rdd.filter { row =>
    headers.zip(row.toSeq).forall {case(header: String, value) =>
      val taxFilterCondition = if (colOfFees.contains(header.toLowerCase)) taxFilter(value) else true
      featFilter.get(header.toLowerCase) match {
        case Some(filterFunc) => taxFilterCondition && filterFunc(value)
        case None => taxFilterCondition // no filter defined for this column, so accept it
      }
    }
  }
}

val rddFiltered = applyFilters(rdd, headers, colFees, taxFilter, featureFilters)
rdd.unpersist()

import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
transformRDD: (dataset: org.apache.spark.sql.DataFrame, headers: Seq[String], idxs: Seq[Int], castFunc: Any => Any)org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[75] at map at <console>:59
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee)
applyFilters: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], headers: Seq[String], colOfFees: Set[String], taxFilter: Any => Boolean, featFilter: Map[Str...


4) Add duration and timezones

In [30]:
import java.time.format.DateTimeFormatter
import java.time.Duration

def addDuration(rdd: RDD[Row], headers: Seq[String], pickup: String, dropoff: String, decimals: Int): RDD[Row] = {
  rdd.map {row =>
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm[:ss]")

    val pickupStr = row.getAs[String](headers.indexOf(pickup)).trim
    val dropoffStr = row.getAs[String](headers.indexOf(dropoff)).trim

    val pickupTS = LocalDateTime.parse(pickupStr, formatter)
    val dropoffTS = LocalDateTime.parse(dropoffStr, formatter)
    val durationMillis = Duration.between(pickupTS, dropoffTS).toMillis
    val durationMinutes = BigDecimal(durationMillis / 60000.0).setScale(decimals, RoundingMode.HALF_UP).toDouble

    val pickupYear = pickupTS.getYear

    Row.fromSeq(row.toSeq ++ Seq(durationMinutes, pickupYear))
  }.filter { row => row.getAs[Double](row.toSeq.length - 2) > 0.0 }
}

val rddDuration = addDuration(rddFiltered, headers, pickup, dropoff, decimals)
headers = headers ++ Seq(colDurationMinutes, colYear)

val rddDurationBin = binColByStepValue(rddDuration, headers.indexOf(colDurationMinutes), 5)
headers = headers :+ colDurationMinutesBinLabel

def addTimeZones(rdd: RDD[Row], headers: Seq[String], timezones: Map[String, (Int, Int)], weekDaySurcharge: Double, colDuration: String, pickup: String, dropoff: String, decimals: Int, preciseBucketUDF: (Map[String, (Int, Int)], LocalDateTime, LocalDateTime, Int) => Map[String, Double], isUSHolidayOrWeekendTZ: LocalDate => Boolean): RDD[Row] = {
  rdd.map { row =>
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm[:ss]")

    val timeZonesDuration: Map[String, Double] = preciseBucketUDF(timezones, LocalDateTime.parse(row.getAs[String](headers.indexOf(pickup)).trim, formatter), LocalDateTime.parse(row.getAs[String](headers.indexOf(dropoff)).trim, formatter), decimals)

    val weekday_surcharge: Double = if (isUSHolidayOrWeekendTZ(LocalDateTime.parse(row.getAs[String](headers.indexOf(pickup)).trim, formatter).toLocalDate)) 0 else weekDaySurcharge
    val colsToAdd: Seq[Double] = timezones.keys.toSeq.flatMap { tz =>
      val duration = timeZonesDuration.getOrElse(tz, 0.0)
      val totalDuration = row.getAs[Double](headers.indexOf(colDuration))
      Seq(duration, BigDecimal(duration * 100 / totalDuration).setScale(decimals, RoundingMode.HALF_UP).toDouble)
    }
    Row.fromSeq((row.toSeq ++ colsToAdd) :+ weekday_surcharge)
  }
}

val rddTimeZones = addTimeZones(rddDurationBin, headers, timeZones, weekDaySurcharge, colDurationMinutes, pickup, dropoff, decimals, preciseBucketUDF, isUSHolidayOrWeekend)

val headersToAdd: Seq[String] = timeZones.keys.toSeq.flatMap { tz =>
  Seq(tz + "_duration", tz + "_duration_pcg")
} :+ colWeekdaySurcharge

headers = headers ++ headersToAdd

import java.time.format.DateTimeFormatter
import java.time.Duration
addDuration: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], headers: Seq[String], pickup: String, dropoff: String, decimals: Int)org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rddDuration: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[78] at filter at <console>:84
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday...


5) Add Aggregate fees and bins

In [31]:
def addAggregateFees(rdd: RDD[Row], headers: Seq[String], colOfFees: Set[String]): RDD[Row] = {
  rdd.map { row =>
    val fees = colOfFees
      .filter(col => headers.contains(col.toLowerCase))
      .map(col => row.getAs[Double](headers.indexOf(col.toLowerCase))).sum

    Row.fromSeq(row.toSeq :+ fees)
  }
}

val rddAggFees = addAggregateFees(rddTimeZones, headers, colFees)
headers = headers :+ colAggregateFee

val rddAggFeesBin = binColByStepValue(rddAggFees, headers.indexOf(colAggregateFee), 2)
headers = headers :+ colAggregateFeeBin

addAggregateFees: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], headers: Seq[String], colOfFees: Set[String])org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rddAggFees: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[81] at map at <console>:58
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday_surcharge, fees, agg_fee_bin_label)
rddAggFeesBin: org.apache.spark.rdd.RDD[org.apache.spa...


6) Add price per mile and minute

In [32]:
def addPricePerDistanceAndTime(rdd: RDD[Row], headers: Seq[String], colFareAmount: String, colDuration: String, colDistance: String): RDD[Row] = {
  rdd.map { row =>
    val pricePerTime = Math.round(row.getAs[Double](headers.indexOf(colFareAmount)) / row.getAs[Double](headers.indexOf(colDuration)) * 100) / 100.0
    val pricePerDistance = Math.round(row.getAs[Double](headers.indexOf(colFareAmount)) / row.getAs[Double](headers.indexOf(colDistance)) * 100) / 100.0

    Row.fromSeq(row.toSeq ++ Seq(pricePerTime, pricePerDistance))
  }
}

val rddPriced = addPricePerDistanceAndTime(rddAggFeesBin, headers, colFareAmount, colDurationMinutes, "trip_distance")
headers = headers ++ Seq(colPricePerTime, colPricePerDistance)

addPricePerDistanceAndTime: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], headers: Seq[String], colFareAmount: String, colDuration: String, colDistance: String)org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rddPriced: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[83] at map at <console>:57
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday_surcharge, fees, agg_fee_bin_label, cost...


7) Add distance bin and duration in overnight time zone

In [33]:
val rddDistBin = binColByStepValue(rddPriced, headers.indexOf("trip_distance"), 5)
headers = headers :+ colDistanceBin

val rddOvernightBin = binColByStepValue(rddDistBin, headers.indexOf(colDurationOvernightPcg), 5)
headers = headers :+ (colDurationOvernightPcg + "_label")

rddDistBin: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[84] at map at <console>:56
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday_surcharge, fees, agg_fee_bin_label, cost_per_time, cost_per_distance, distance_bin_label, overnight_duration_pcg_label)
rddOvernightBin: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[85] at map at <console>:56
headers: Seq[String] = Array...


8) Add key for average calculation based on columns for classification

In [34]:
import org.apache.spark.HashPartitioner

val numPartitions = spark.sparkContext.defaultParallelism
val partitioner = new HashPartitioner(numPartitions)

val actualHeader = headers
def addKey(rdd: RDD[Row], colsClassification: Seq[String], headers: Seq[String]): RDD[(String, Row)] = {
  rdd.map { row =>
    val key = colsClassification.filter(col => headers.contains(col.toLowerCase))
    .map(col => row.get(headers.indexOf(col.toLowerCase)))
    .mkString("_")
    (key, row)
  }
}

val rddWithKey = addKey(rddOvernightBin, colsForClassification, actualHeader).partitionBy(partitioner).persist(StorageLevel.MEMORY_ONLY)

import org.apache.spark.HashPartitioner
numPartitions: Int = 12
partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@c
actualHeader: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday_surcharge, fees, agg_fee_bin_label, cost_per_time, cost_per_distance, distance_bin_label, overnight_duration_pcg_label)
addKey: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], colsClassification: Seq[String], ...


9) Calculate prices per distance and time

In [35]:
def calculatePrices(rdd: RDD[(String, Row)], headers: Seq[String], colPriceDistance: String, colPriceTime: String): RDD[(String, (Double, Double, Long))] = {
  rdd.mapValues { row =>
    val costPerDistance = row.getAs[Double](headers.indexOf(colPriceDistance))
    val costPerTime = row.getAs[Double](headers.indexOf(colPriceTime))
    (costPerDistance, costPerTime, 1L)
  }
}

val rddForAvg = calculatePrices(rddWithKey, headers, colPricePerDistance, colPricePerTime)

calculatePrices: (rdd: org.apache.spark.rdd.RDD[(String, org.apache.spark.sql.Row)], headers: Seq[String], colPriceDistance: String, colPriceTime: String)org.apache.spark.rdd.RDD[(String, (Double, Double, Long))]
rddForAvg: org.apache.spark.rdd.RDD[(String, (Double, Double, Long))] = MapPartitionsRDD[88] at mapValues at <console>:56


10) Calculate average prices per distance and time

In [36]:
def calculateAvgPrices(rdd: RDD[(String, (Double, Double, Long))], decimals: Int): RDD[(String, (Double, Double))] = {
  rdd.aggregateByKey((0.0, 0.0, 0L))((acc, v) => (acc._1 + v._1, acc._2 + v._2, acc._3 + v._3), (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3)).mapValues {
    case(sumDist, sumTime, count) =>
      val avgDist = BigDecimal(sumDist / count).setScale(decimals, BigDecimal.RoundingMode.HALF_UP).toDouble
      val avgTime = BigDecimal(sumTime / count).setScale(decimals, BigDecimal.RoundingMode.HALF_UP).toDouble
      (avgDist, avgTime)
  }.filter { case(_, (dist, time)) => dist > 0.0 && time > 0.0 }
}

val rddWithAvgPrices = calculateAvgPrices(rddForAvg, decimals)

calculateAvgPrices: (rdd: org.apache.spark.rdd.RDD[(String, (Double, Double, Long))], decimals: Int)org.apache.spark.rdd.RDD[(String, (Double, Double))]
rddWithAvgPrices: org.apache.spark.rdd.RDD[(String, (Double, Double))] = MapPartitionsRDD[91] at filter at <console>:59


11) Join average prices to previous rdd

In [37]:
import org.apache.spark.broadcast.Broadcast

val broadcastAvgPrices: Broadcast[Map[String, (Double, Double)]] = spark.sparkContext.broadcast(rddWithAvgPrices.collectAsMap().toMap)

def applyJoin(rdd: RDD[(String, Row)], broadcastMap: Broadcast[Map[String, (Double, Double)]]): RDD[Row] = {
  rdd.flatMap { case (key, originalRow) =>
    broadcastMap.value.get(key).map { case(avgCostPerDistance, avgCostPerTime) =>
      Row.fromSeq(originalRow.toSeq ++ Seq(avgCostPerDistance, avgCostPerTime))
    }
  }
}

val rddJoined = applyJoin(rddWithKey, broadcastAvgPrices)

rddWithKey.unpersist()

headers = headers ++ Seq(colAvgPricePerDistance, colAvgPricePerTime)

import org.apache.spark.broadcast.Broadcast
broadcastAvgPrices: org.apache.spark.broadcast.Broadcast[Map[String,(Double, Double)]] = Broadcast(20)
applyJoin: (rdd: org.apache.spark.rdd.RDD[(String, org.apache.spark.sql.Row)], broadcastMap: org.apache.spark.broadcast.Broadcast[Map[String,(Double, Double)]])org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rddJoined: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[92] at flatMap at <console>:63
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_...


12) Add price comparison w.r.t. average price and actual price difference

In [38]:
def addPriceComparison(rdd: RDD[Row], headers: Seq[String], colPriceDistance: String, colAvgPriceDistance: String, colPriceTime: String, colAvgPriceTime: String, decimals: Int) = {
  rdd.map { row =>
    val priceColsToAdd: Seq[Double] = Seq((colPriceDistance, colAvgPriceDistance), (colPriceTime, colAvgPriceTime))
      .flatMap { case (colPrice, colAvgPrice) =>
        val price = row.getAs[Double](headers.indexOf(colPrice))
        val priceAvg = row.getAs[Double](headers.indexOf(colAvgPrice))
        val priceDiff = BigDecimal(price - priceAvg).setScale(decimals, BigDecimal.RoundingMode.HALF_UP).toDouble
        val priceDiffPcg = BigDecimal(priceDiff / priceAvg * 100).setScale(decimals, BigDecimal.RoundingMode.HALF_UP).toDouble

        Seq(priceDiff, priceDiffPcg)
      }
    Row.fromSeq(row.toSeq ++ priceColsToAdd)
  }
}

val rddPriceComparison = addPriceComparison(rddJoined, headers, colPricePerDistance, colAvgPricePerDistance, colPricePerTime, colAvgPricePerTime, decimals)
headers = headers ++ Seq(colPricePerDistanceDiff, colPricePerDistanceDiffPcg, colPricePerTimeDiff, colPricePerTimeDiffPcg)

addPriceComparison: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], headers: Seq[String], colPriceDistance: String, colAvgPriceDistance: String, colPriceTime: String, colAvgPriceTime: String, decimals: Int)org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
rddPriceComparison: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[93] at map at <console>:64
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_...


13) Bin price difference per time and distance

In [39]:
val rddPriceDistBin = binColByStepValue(rddPriceComparison,headers.indexOf(colPricePerDistanceDiffPcg), 5)
val rddPriceDistTimeBin = binColByStepValue(rddPriceDistBin, headers.indexOf(colPricePerTimeDiffPcg), 5)

headers = headers ++ Seq(colPricePerDistanceDiffPcgLabel, colPricePerTimeDiffPcgLabel)

rddPriceDistBin: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[94] at map at <console>:56
rddPriceDistTimeBin: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[95] at map at <console>:56
headers: Seq[String] = ArraySeq(tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, ratecodeid, store_and_fwd_flag, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee, duration_minutes, year, duration_minutes_bin_label, overnight_duration, overnight_duration_pcg, regular_duration, regular_duration_pcg, weekday_surcharge, fees, agg_fee_bin_label, cost_per_time, cost_per_distance, distance_bin_label, overnight_duration_pcg_label, avg_cost_per_distan...


14) Reduce to analysis columns only

In [40]:
val headersForAnalysis = headers.zipWithIndex.filter(head => colsForClassification.contains(head._1.toLowerCase))

val headersForAnalysisIdxs = headersForAnalysis.map(_._2)
val headersForAnalysisCols = headersForAnalysis.map(_._1)

def reduceToAnalysis(rdd: RDD[Row], idxs: Seq[Int]): RDD[Row] = {
  rdd.map { row =>
    Row.fromSeq(idxs.map(row.get))
  }
}

val rddAnalysis = reduceToAnalysis(rddPriceDistTimeBin, headersForAnalysisIdxs)

val totalCount = rddAnalysis.count()

headersForAnalysis: Seq[(String, Int)] = ArraySeq((passenger_count,2), (store_and_fwd_flag,5), (payment_type,6), (year,17), (duration_minutes_bin_label,18), (agg_fee_bin_label,25), (distance_bin_label,28), (overnight_duration_pcg_label,29), (cost_per_distance_diff_pcg_label,36), (cost_per_time_diff_pcg_label,37))
headersForAnalysisIdxs: Seq[Int] = ArraySeq(2, 5, 6, 17, 18, 25, 28, 29, 36, 37)
headersForAnalysisCols: Seq[String] = ArraySeq(passenger_count, store_and_fwd_flag, payment_type, year, duration_minutes_bin_label, agg_fee_bin_label, distance_bin_label, overnight_duration_pcg_label, cost_per_distance_diff_pcg_label, cost_per_time_diff_pcg_label)
reduceToAnalysis: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], idxs: Seq[Int])org.apache.spark.rdd.RDD[org.apache.spark.sql...


15) Group by feature value

In [41]:
def groupByFeatures(rdd: RDD[Row], colForValuesAnalysis: Seq[String], colPriceDistanceDiffPcgLabel: String, colPriceTimeDiffPcgLabel: String, headersAnalysis: Seq[String], decimals: Int, totalCount: Long): Seq[RDD[Row]] = {
  colForValuesAnalysis.map { colName =>
    val groupCols = Seq(colPriceDistanceDiffPcgLabel, colPriceTimeDiffPcgLabel):+ colName
    val grouped = rdd.map { row =>
      val key = groupCols.map(col => row.get(headersAnalysis.indexOf(col.toLowerCase)))
      (key, 1)
    }.reduceByKey(_ + _).map { case (keySeq, count) =>
      val value = keySeq.last.toString
      val costDistLabel = keySeq(0).toString
      val costTimeLabel = keySeq(1).toString
      val pcg = BigDecimal(count.toDouble / totalCount * 100).setScale(decimals, BigDecimal.RoundingMode.HALF_UP).toDouble
      Row.fromSeq(Seq(colName, value, count, pcg, costDistLabel, costTimeLabel))
    }
    grouped
  }
}

val rddFeatures = groupByFeatures(rddAnalysis, colsForValuesAnalysis, colPricePerDistanceDiffPcgLabel, colPricePerTimeDiffPcgLabel, headersForAnalysisCols, decimals, totalCount)

groupByFeatures: (rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row], colForValuesAnalysis: Seq[String], colPriceDistanceDiffPcgLabel: String, colPriceTimeDiffPcgLabel: String, headersAnalysis: Seq[String], decimals: Int, totalCount: Long)Seq[org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]]
rddFeatures: Seq[org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = List(MapPartitionsRDD[99] at map at <console>:65, MapPartitionsRDD[102] at map at <console>:65, MapPartitionsRDD[105] at map at <console>:65, MapPartitionsRDD[108] at map at <console>:65, MapPartitionsRDD[111] at map at <console>:65, MapPartitionsRDD[114] at map at <console>:65, MapPartitionsRDD[117] at map at <console>:65, MapPartitionsRDD[120] at map at <console>:65)


16) Reduce to single rdd and write output

In [42]:
import org.apache.spark.sql.types._

val headersForSchema = Seq(
  StructField("feature", StringType),
  StructField("value", StringType),
  StructField("count", IntegerType),
  StructField("pcg", DoubleType),
  StructField("cost_distance_label", StringType),
  StructField("cost_time_label", StringType)
)

val schema = StructType(headersForSchema)

val dfForAnalysis = spark.createDataFrame(rddFeatures.reduce(_ union _).coalesce(1), schema)

dfForAnalysis.show(1)
val endTime = System.currentTimeMillis()
val durationMs = endTime - startTime

println(s"Job $name-dataset optimized executed in $durationMs ms")

dfForAnalysis.write.mode("overwrite").parquet(getDatasetPath(outputDir + f"/$name"))

+---------------+-----+-----+---+-------------------+---------------+
|        feature|value|count|pcg|cost_distance_label|cost_time_label|
+---------------+-----+-----+---+-------------------+---------------+
|passenger_count|  1.0|    2|0.0|          [565|570)|      [-40|-35)|
+---------------+-----+-----+---+-------------------+---------------+
only showing top 1 row

Job yellow-dataset optimized executed in 165740 ms


import org.apache.spark.sql.types._
headersForSchema: Seq[org.apache.spark.sql.types.StructField] = List(StructField(feature,StringType,true), StructField(value,StringType,true), StructField(count,IntegerType,true), StructField(pcg,DoubleType,true), StructField(cost_distance_label,StringType,true), StructField(cost_time_label,StringType,true))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(feature,StringType,true),StructField(value,StringType,true),StructField(count,IntegerType,true),StructField(pcg,DoubleType,true),StructField(cost_distance_label,StringType,true),StructField(cost_time_label,StringType,true))
dfForAnalysis: org.apache.spark.sql.DataFrame = [feature: string, value: string ... 4 more fields]
endTime: Long = 1750336353175
durationMs: Long = 165740
