# Apache Spark GCP Citibike Analysis #

# Citibike Data Engineering with Apache Spark and Google Cloud Platform

This project demonstrates large-scale data processing using **Apache Spark (RDD API)** and **Scala**, applied to a ~15GB Citibike dataset stored on **Google Cloud Storage (GCS)**.  
It includes data ingestion, preprocessing, time calculations, performance comparisons using different partitioning strategies, and the creation of a custom geospatial partitioner using zipcode polygons.

The goal of this notebook is to simulate real-world data engineering tasks involving massive datasets, geo-coordinate handling, and partition optimization for distributed computing systems.


### Required Files ###

In [4]:
val citibike_data_file = "gs://analytics_on_cloud_bucket/data.csv"

val zipcode_geojson_file = "gs://analytics_on_cloud_bucket/data/zipcode_coordinates.json"


citibike_data_file = gs://analytics_on_cloud_bucket/data.csv
zipcode_geojson_file = gs://analytics_on_cloud_bucket/data/zipcode_coordinates.json


gs://analytics_on_cloud_bucket/data/zipcode_coordinates.json

In [5]:
val df = spark.read.option("header","true").csv(citibike_data_file)
df.show(5)

+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|         ride_id|rideable_type|          started_at|            ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|
+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+
|7FB44ACE772F2B5E| classic_bike|2020-01-16 11:29:...|2020-01-16 11:38:...|E 47 St & Park Ave|         6584.12|     5 Ave & E 63 St|       6904.06|40.755103|-73.974987|40.766368|-73.971518|       member|
|514091422D32E0D4| classic_bike|2020-01-15 12:49:...|2020-01-15 13:23:...|Carmine St & 6 Ave|         5763.03| Mott St & Prince St|       5561.04|40.730386| -74.00215| 40.72318|  -73.9948|

df = [ride_id: string, rideable_type: string ... 11 more fields]


[ride_id: string, rideable_type: string ... 11 more fields]

## We define several helper functions used throughout the analysis:

- `timer`: measure execution time of a code block  
- `time_difference`: compute duration between timestamps  
- `get_lat_lon`: extract latitude and longitude safely  
- `read_zipcode_polygons`: read zipcode geojson and parse polygon boundaries  
- `find_zipcode`: identify zipcode using point-in-polygon lookup  

These abstractions make the rest of the notebook cleaner and modular.

In [6]:
//times the execution of the enclosed block
def timer[R](block: => R): R = { //R is the placeholder return type for the block
  val t0 = System.nanoTime() //Returns the system time
  val result = block //executes the block and stores the result
  val t1 = System.nanoTime()
  println(s"Elapsed time: ${(t1 - t0)/1e9} seconds") //the time difference 
  result //return the result
}


timer: [R](block: => R)R


In [7]:
//Example

timer {
    val seq = (1 to 1000000).toSeq
    seq.reduce(_+_)
}

Elapsed time: 0.02721878 seconds


1784293664

In [8]:
//This function returns the difference between two times 
//The format of the times is the citibike format
//I.e., "yyyy-MM-dd HH:mm:ss.SSS" 

def time_difference(t1:String, t2:String) = {
    import java.time.LocalDateTime
    import java.time.format.DateTimeFormatter
    import java.time.Duration    

    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
    // Parse into LocalDateTime objects
    val startTime = LocalDateTime.parse(t1, formatter)
    val endTime   = LocalDateTime.parse(t2, formatter)
    
    //Find the difference
    val diffSeconds = Duration.between(startTime, endTime).toMillis / 1000.0
    
    //Return the difference
    diffSeconds
}

time_difference: (t1: String, t2: String)Double


In [9]:
//Example
time_difference("2020-01-16 11:29:00.511","2020-01-16 11:35:30.200")

389.689

In [10]:
//The latitude and longitude data in the file needs to be converted into Double, Double
//However, there is some bad data in it
//This function returns (Double,Double) if the data is good and (0.0,0.0) if the data is bad
def get_lat_lon(lat: Any, lon: Any): (Double,Double) = {
    try {
       (lat.toString.toDouble,lon.toString.toDouble)
    } catch {
        case e: Throwable => (0.0,0.0)
    }
}
val invalid_lat_lon = get_lat_lon("aa", -73.974987)
val valid_lat_lon = get_lat_lon("40.755103",-73.974987)

invalid_lat_lon = (0.0,0.0)
valid_lat_lon = (40.755103,-73.974987)


get_lat_lon: (lat: Any, lon: Any)(Double, Double)


(40.755103,-73.974987)

In [11]:
//Zipcode data is in the geojson file zipcode_coordinates.json
//The file contains information about the zipcode including the polygon of coordinates that defines each zipcode
//This function reads the file and returns an rdd containing:
/* 
each zipcode, a unique id for each zipcode (0...261), and an Array containing the coordiates of the polygon
*/
def read_zipcode_polygons(file: String) = {
    val df = spark.read.json(file)

    // convert to RDD 
    val rdd = df.rdd.map { row =>
      val zipcode = row.getAs[String]("zipcode")
      val coords = row.getAs[Seq[Seq[Seq[Double]]]]("coords")
      (zipcode, coords)
    }

    // now take a few

    val polygon_rdd = rdd.map(t=>(t._1,t._2(0).toArray.map(a=>a.toArray).map(a=>(a(0),a(1)))))
    .zipWithUniqueId()
    .map(t => (t._1._1,t._2.toInt,t._1._2))
    polygon_rdd
}


read_zipcode_polygons: (file: String)org.apache.spark.rdd.RDD[(String, Int, Array[(Double, Double)])]


In [12]:
//given a (lon,lat) and a polygon this function returns the zipcode id if the coordinates
//. are in the polygon or -1 if they are not
def find_zipcode(
    p: (String, Int, Array[(Double, Double)]), //The Polygon
    c: (Double, Double)                        //The coordinates (lat, lon)
): Int = {
  val (zipcode, id, polygon) = p
  val (x, y) = c

  var inside = false //start by assuming it is not in the polygon
  var j = polygon.length - 1

  for (i <- polygon.indices) {
    val (xi, yi) = polygon(i)
    val (xj, yj) = polygon(j)

    // Check if the ray crosses the edge
    val intersects =
      ((yi > y) != (yj > y)) &&
      (x < (xj - xi) * (y - yi) / (yj - yi + 1e-12) + xi)

    if (intersects) inside = !inside
    j = i
  }

  if (inside) id else -1 //if it is inside the polygon, return the id (partition number)
}

val r1 = find_zipcode( //random zipcode match (should return -1)
("11372",0,Array((-73.86942457284177,40.74915687096788), (-73.89143129977276,40.74684466041932), 
(-73.89507143240859,40.746465470812154), (-73.8961873786782,40.74850942518088), 
(-73.8958395418514,40.74854687570604), (-73.89525242774397,40.748306609450246), 
(-73.89654041085562,40.75054199814359), (-73.89579868613829,40.75061972133262), 
(-73.89652230661434,40.75438879610903), (-73.88164812188481,40.75595161704187), 
(-73.87221855882478,40.75694324806748), (-73.87167992356792,40.75398717439604), 
(-73.8720704651389,40.753862007052064), (-73.86942457284177,40.74915687096788))),
    (40.766368, -73.971518)
)


val r2 = find_zipcode( //Times Square - should return 105
("10036",105,Array((-73.98134106844775,40.758645323851304), (-73.9781160605814,40.7572844526636), (-73.98088709157031,40.753481009691896), (-73.98411754582567,40.754842071375435), (-73.98459583212677,40.754176960006426), (-73.99273700488898,40.75760450929245), (-73.99395968106315,40.758252769648045), (-74.00170164222186,40.76138075542977), (-74.00116297247455,40.76202186185266), (-73.99939972778823,40.76337781582372), (-73.99894124905622,40.76387032527473), (-73.99773597314044,40.765557990235976), (-73.98134106844775,40.758645323851304))),
    (-73.985130, 40.758896)
)

r1 = -1
r2 = 105


find_zipcode: (p: (String, Int, Array[(Double, Double)]), c: (Double, Double))Int


105

In [13]:
//How to add an element to an array
val sample_rdd = sc.parallelize(Array(Array(1,"jack",37.2),Array(2,"Jill",23.1)))
val new_sample_rdd = sample_rdd.map(a => a:+ a(2).toString.toDouble*2)
new_sample_rdd.collect

sample_rdd = ParallelCollectionRDD[14] at parallelize at <console>:25
new_sample_rdd = MapPartitionsRDD[15] at map at <console>:26


Array(Array(1, jack, 37.2, 74.4), Array(2, Jill, 23.1, 46.2))

### Scala help: Type safety ###
* When you construct a type implicitly, i.e. you don't explicitly give a type, scala tries to figure out what the type is
* It doesn't always do a good job and you might find, while working on the code below, things getting converted into Any for no reason
* Sometimes you can fix this by converting things back to String or Double
* But, not always. It is better to be safe and specify types always!

In [14]:
//Implicit specification
val implicit_data = Array(
  ("Jack", 34.5, 22.7),
  ("Jill", 31.3, 41.2)
)

//Explicit specification
//Try to do this as often as possible
val explicit_data: Array[(String,Double,Double)] = Array(
  ("Jack", 34.5, 22.7),
  ("Jill", 31.3, 41.2)
)

//When doing pattern matching, try to give explicit types in the pattern
explicit_data.map(t => t match {
    case (name: String, d1: Double, d2: Double) => (name, d1 + d2)
    case _ => ("Missing",0.0,0.0)
})



implicit_data = Array((Jack,34.5,22.7), (Jill,31.3,41.2))
explicit_data = Array((Jack,34.5,22.7), (Jill,31.3,41.2))


Array((Jack,57.2), (Jill,72.5))

### Scala help: ClassTag ###

In [15]:
//If you're confused about the type of a data item
//Use the ClassTag function

val implicit_data = Array(
  ("Jack", 34.5, 22.7),
  ("Jill", 31.3, 41.2)
)

import scala.reflect.ClassTag
val type_of_implicit_data = ClassTag(implicit_data.getClass)
val type_of_data_of_array_element = ClassTag(implicit_data(0).getClass)
val type_of_data_of_first_element_of_tuple = ClassTag(implicit_data(0)._1.getClass)
val type_of_data_of_second_element_of_tuple = ClassTag(implicit_data(0)._2.getClass)

implicit_data = Array((Jack,34.5,22.7), (Jill,31.3,41.2))
type_of_implicit_data = Array[scala.Tuple3]
type_of_data_of_array_element = scala.Tuple3
type_of_data_of_first_element_of_tuple = java.lang.String
type_of_data_of_second_element_of_tuple = Double


Double

### Scala help: Mutable Map ###
* Map objects in Scala are like Python dictionaries. They store (key,value) pairs and can access data by key
* However, Scala Map objects are immutable unlike Python dictionaries
* Scala provides a Mutable version of Map that functions more like Python dictionaries (you can add new keys and can change the value of a key)
* Map throws an exception if the key being accessed is not in the Map object

In [16]:
//Immutable map
val departments = Map(("Jack","Sales"),("Jill","Pails"))
val jack_dept = departments("Jack")

//departments("Jack")="Accounting" //Error since departments is immutable
//val john_dept = departments("John")//throws an exception since John is not in the map


departments = Map(Jack -> Sales, Jill -> Pails)
jack_dept = Sales


Sales

In [17]:
//mutable map
import scala.collection._ 

val m_departments = mutable.Map(("Jack","Sales"),("Jill","Pails"))
val jack_dept = m_departments("Jack")
//val john_dept = m_departments("John")//throws an exception
m_departments("Jack") = "Accounting" //Can change the department since it is mutable
m_departments

m_departments = Map(Jill -> Pails, Jack -> Accounting)
jack_dept = Sales


Map(Jill -> Pails, Jack -> Accounting)

In [18]:
//Add John to m_departments
m_departments("John") = m_departments.getOrElse("John","Accounting")
m_departments

Map(Jill -> Pails, Jack -> Accounting, John -> Accounting)

In [15]:
//Add John to departments
//Throws an error because departments is immutable
//uncoment to run

//departments("John") = departments.getOrElse("John","Accounting")

Syntax Error.: 

### Scala help: Converting Any to a specific type ###
* Before converting Any to Double or Int, you need to convert it into String
* because Any to Int and Any to Double don't exist

In [19]:
val x1:Any = "342.56"
val x2:Any = "42"

val x1_double = x1.toString.toDouble
val x2_int = x2.toString.toInt



//x1.toDouble  //this won't work

x1 = 342.56
x2 = 42
x1_double = 342.56
x2_int = 42


42

### Scala help: Scala find function ###
* ***find***, applied to a collection, finds the first element of the collection that satisfies the supplied connection
* find returns an Option object. You need to de-option it 

In [20]:
//
val sample = Array((1,"jack",37.2),(2,"Jill",23.1))
val result = {
    sample.find(t => {
                val (_,name:String,_) = t 
                t._2 == "jack"}) 
    } match {
    case Some((_,_,d)) => d
    case None => 0
}


sample = Array((1,jack,37.2), (2,Jill,23.1))
result = 37.2


37.2

### Spark help: sortBy functions ###

In [21]:
val rdd = sc.parallelize(Array((1,"jack",37.2),(2,"Jill",23.1),(2,"John",11.7),(4,"Qing",42.1)))
val sorted_by_3 = rdd.sortBy(_._3).collect
val sorted_by_3_descending = rdd.sortBy(_._3,ascending=false).collect
val sorted_by_sum = rdd.sortBy(t => t._1.toDouble+t._3).collect



rdd = ParallelCollectionRDD[16] at parallelize at <console>:28
sorted_by_3 = Array((2,John,11.7), (2,Jill,23.1), (1,jack,37.2), (4,Qing,42.1))
sorted_by_3_descending = Array((4,Qing,42.1), (1,jack,37.2), (2,Jill,23.1), (2,John,11.7))
sorted_by_sum = Array((2,John,11.7), (2,Jill,23.1), (1,jack,37.2), (4,Qing,42.1))


Array((2,John,11.7), (2,Jill,23.1), (1,jack,37.2), (4,Qing,42.1))

### Scala help: maxBy function ###
* Scala has a maxBy function that can flexibly find the max of a collection
* Spark doesn't have this because functions work within partitions and max requires all partitions
* Instead, use reduce

In [22]:
val arr = Array((1,"jack",37.2),(2,"Jill",23.1),(2,"John",11.7),(4,"Qing",42.1))
val highest_3 = arr.maxBy(_._3)
val highest_sum = arr.maxBy(t => t._1 + t._3)

arr = Array((1,jack,37.2), (2,Jill,23.1), (2,John,11.7), (4,Qing,42.1))
highest_3 = (4,Qing,42.1)
highest_sum = (4,Qing,42.1)


(4,Qing,42.1)

In [23]:
//maxBy for RDDs
val rdd = sc.parallelize(Array((1,"jack",37.2),(2,"Jill",23.1),(2,"John",11.7),(4,"Qing",42.1)))

val highest_3 = rdd.reduce((a,b) => if (a._3>b._3) a else b)
val highest_sum = rdd.reduce((a,b) => if ((a._1+a._3)>(b._1+b._3)) a else b)

rdd = ParallelCollectionRDD[32] at parallelize at <console>:30
highest_3 = (4,Qing,42.1)
highest_sum = (4,Qing,42.1)


(4,Qing,42.1)

### STEP 1: Data Preparation ###

Write a function ***prepare_data(file)*** that:

1. reads a citibike data file into an RDD
2. Splits the data on comma
3. Removes header lines - the file contains several copies of the header line
* ride_id, rideable_type, started_at, ended_at, start_station_name, start_station_id, end_station_name, end_station_id, start_lat, start_lng, end_lat, end_lng, member_casual
4. Computes the difference between the bike pickup time and the bike dropoff time and add it as the last element in the rdd
5. Save the prepared data in an rdd named ***data_rdd***


In [24]:
def prepare_data(file: String) = {

  val header = "ride_id"

  sc.textFile(file)
    .map(_.split(","))
    // remove all header lines
    .filter(row => !row(0).contains(header))
    // keep only rows with proper length (防止 malformed rows)
    .filter(row => row.length >= 13)
    // add duration
    .map(row => {
      val start = row(2)  // started_at
      val end = row(3)    // ended_at
      val duration = time_difference(start, end)
      row :+ duration
    })
}

val data_rdd = prepare_data(citibike_data_file)


data_rdd = MapPartitionsRDD[38] at map at <console>:41


prepare_data: (file: String)org.apache.spark.rdd.RDD[Array[Any]]


MapPartitionsRDD[38] at map at <console>:41

### STEP 2: Report the following statistics using data_rdd ###
1. The count of the records (***use count***)
2. The average duration that a bike is out (use ***reduce***)
3. The number of partitions

In [22]:
val total_count =   //76531242
val avg_duration =   //1689.2749273438058
val numPartitions =  //110

total_count = 76531242
avg_duration = 1689.2749273438058
numPartitions = 110


110

In [26]:

val total_count = data_rdd.count()

val avg_duration = data_rdd.map(row => row.last.toString.toDouble).reduce(_ + _) / total_count.toDouble

val numPartitions = data_rdd.getNumPartitions

total_count = 76531242
avg_duration = 1689.2749273438053
numPartitions = 110


110

### STEP 3: Combine By Key ###

**Part 1**

Starting with data_rdd, create a (key,value) paired data that contains:

(starting station,duration)

For example, if the record is:

Array(7FB44ACE772F2B5E, classic_bike, 2020-01-16 11:29:00.511, 2020-01-16 11:38:38.211, E 47 St & Park Ave, 6584.12, 5 Ave & E 63 St, 6904.06, 40.755103, -73.974987, 40.766368, -73.971518, member, 577.7)

The corresponding (key,value) pair is:

(E 47 St & Park Ave,577.7)

Store the key,value pairs in a variable named ***station_time_pair***

**Part 2**
1. Using ***combineByKey*** calculate the average duration a bike is borrowed from each station
2. Using ***sortBy*** sort the rdd in descending order
3. Report the top ten results (using ***take***, ***foreach***, and ***println***)



In [27]:
val station_time_pair = data_rdd.map(row => {
  val station = row(4).toString
  val duration = row.last.toString.toDouble
  (station, duration)
})


val combiner = (x: Double) => (1, x)   // (count, sum)
val merger = (x: (Int, Double), y: Double) => {
  val (c, acc) = x
  (c + 1, acc + y)
}
val mergeAndCombiner = (x1: (Int, Double), x2: (Int, Double)) => {
  val (c1, acc1) = x1
  val (c2, acc2) = x2
  (c1 + c2, acc1 + acc2)
}


timer {
  // Combine durations by station
  val result = station_time_pair.combineByKey(combiner, merger, mergeAndCombiner)

  // Compute average, sort, take top 10
  val average = result
    .mapValues{ case (count, sum) => sum / count }
    .sortBy(_._2, ascending = false)
    .take(10)

  average.foreach(println)
}

(Lafayette St & Jersey St S,3.0070220633E7)
(Amsterdam Ave & W 189 St old,5406588.302)
(Propsect Ave & E 151 St,157955.35202380954)
(Liberty State Park,150289.825)
(,77299.60101684534)
(Morris Ave & E 153 St,23464.733022277564)
(Southern Blvd & E 172 St,22514.056611127464)
(Clinton St & Newark St,22424.762615384618)
(Grand Concourse & E 144 St,21538.461101160196)
(E 12 St & 4 Av,20832.740200941913)
Elapsed time: 56.233019626 seconds


station_time_pair = MapPartitionsRDD[41] at map at <console>:32
combiner = > (Int, Double) = $Lambda$5970/0x0000000802209840@744a5e63
merger = > (Int, Double) = $Lambda$5971/0x000000080220a840@469df542
mergeAndCombiner = > (Int, Double) = $Lambda$5972/0x000000080220b040@71dca092


> (Int, Double) = $Lambda$5972/0x000000080220b040@71dca092

### STEP 4: Hash Partition and Combine By Key ###
1. Get the total number of starting stations in the data
2. Hash Partition the data into those many partitions and store the result in hash_data
3. Run combineByKey again and time the time it takes to return the top ten stations (Elapsed time: 123.860593854 seconds)
4. Run it a second time on hash_data, without including the HashPartitioner statement and report the time (Elapsed time: 27.922944971 seconds)
5. Explain the three different times (station_time_data, hashing + hash_data, hash_data alone)

In [28]:
val numStations = station_time_pair.keys.distinct().count().toInt
println(s"Number of stations = $numStations")

Number of stations = 1863


numStations = 1863


1863

In [29]:
import org.apache.spark.HashPartitioner
val hash_data = station_time_pair.partitionBy(new HashPartitioner(numStations))
timer {
  val h_avg_times = hash_data
    .combineByKey(combiner, merger, mergeAndCombiner)
    .mapValues{case (count, sum) => sum / count}
    .sortBy(_._2, ascending = false)
    .take(10)

  h_avg_times.foreach(println)
}


(Lafayette St & Jersey St S,3.0070220633E7)
(Amsterdam Ave & W 189 St old,5406588.302)
(Propsect Ave & E 151 St,157955.35202380954)
(Liberty State Park,150289.825)
(,77299.60101684532)
(Morris Ave & E 153 St,23464.733022277527)
(Southern Blvd & E 172 St,22514.056611127475)
(Clinton St & Newark St,22424.762615384618)
(Grand Concourse & E 144 St,21538.461101160232)
(E 12 St & 4 Av,20832.740200941917)
Elapsed time: 86.413022711 seconds


hash_data = ShuffledRDD[53] at partitionBy at <console>:34


ShuffledRDD[53] at partitionBy at <console>:34

In [30]:
//Second run, without creating hash_data since it is already created in the previous cell
timer {
  val h_avg_times2 = hash_data
    .combineByKey(combiner, merger, mergeAndCombiner)
    .mapValues{case (count, sum) => sum / count}
    .sortBy(_._2, ascending = false)
    .take(10)

  h_avg_times2.foreach(println)
}

(Lafayette St & Jersey St S,3.0070220633E7)
(Amsterdam Ave & W 189 St old,5406588.302)
(Propsect Ave & E 151 St,157955.35202380954)
(Liberty State Park,150289.825)
(,77299.6010168454)
(Morris Ave & E 153 St,23464.733022277564)
(Southern Blvd & E 172 St,22514.056611127442)
(Clinton St & Newark St,22424.762615384618)
(Grand Concourse & E 144 St,21538.4611011603)
(E 12 St & 4 Av,20832.740200941917)
Elapsed time: 14.954544727 seconds


***Explanation***

In the first run using station_time_pair, Spark performs a full shuffle because the data is not pre-partitioned. This causes high communication overhead when grouping by starting station.
In the second run, we applied a HashPartitioner with the number of unique stations. This reduces shuffle cost because records for the same key are already colocated on the same executors before combineByKey, improving performance.
In the third run, we reused the already-partitioned hash_data without calling HashPartitioner again. Since the data was already distributed by key, Spark avoided an additional shuffle, leading to the fastest time among the three.

***In summary, no-partition > hash partition > reusing hash partition, because reducing shuffle significantly speeds up the aggregation.***


### STEP 5: Repartition the data using a custom partitioner ###
In this step, you will repartition the data as follows:
1. Each bike trip has a starting station and that starting station is in a New York City zipcode
2. We don't know the zipcode but, using the ***find_zipcode** function, you can get it given the latitude and longitude of the starting station
3. We'll construct a new key,value paired data as follows:
    * (starting station, duration, (lat, lon))
4. You'll then write a custom partitoner that uses this data to allocate each record to its zipcode partition
5. The function ***read_zipcode_polygons*** reads zipcode polygons and returns data in the format explained above 
6. In the custom partitioner, find the zipcode for each record and use the id of the zipcode as the partition number

**NOTES**

* Custom partitioners only have access to the key of a key value pair. Since you need to partition the data using the latitude and longitude, change the key to the latitude and longitude before passing it to the partitioner
* The final form of the data in each custom partition should be (starting station, (duration, (lat, lon))). We want the data to be stored with the starting station as the key. Use a map transformation after partitioning the data
* The data contains a glitch that you will need to handle:
    * the latitude and longitude sometimes contains bad data (characters or an empty string). Use the ***get_lat_lon*** function to get the latitude and longitude as Doubles. Bad values are converted to 0.0. Therefore, after you've constructed the (starting station, duration, (lat, lon)), filter out any trips that have either latitude or longitude equal to 0.0

**Steps**

1. Create a ***station_latlon_rdd*** from ***data_rdd** with the format specified above (see the notes above as well!)
2. Get the zipcode polygons and save them in a variable ***polygons***. Note that though the function returns an rdd, you should run ***collect*** and save this as a Scala object (to avoid serialization errors!)
3. Write a Custom Partitioner ***zipcodePartitioner*** that takes the polygons as an initializer (i.e., it is an argument to the class definition) and does the following:
    * sets the number of partitions to the number of zipcodes. Note that .length will not work because the partitioner works locally and cannot calculate the global length of a scala object. Instead, use ***distinct*** and ***size*** to get the number of zipcodes
    * write getPartition 
        * Use Scala's find function [Scala find](https://www.geeksforgeeks.org/scala/scala-iterator-find-method-with-example/) to apply ***find_zipcode*** to each key (the key should be (lat, lon)) to find the zipcode. If the zipcode is found, return the id of the zipcode (the 0, 1, ... - second element in the polygon object). Otherwise, return 0 (if we can't find the zipcode, we'll just add the trip to the 0th partition). 
        * Note that find returns an Option object. You'll need to remove the Some or the None. Use ***match*** to do this
        * the value returned by find after removing Some or None is the partition number and that's what getPartition returns
4. Do the partitioning
    * remember to use map to convert the key to (lat,lon)
    * and remember to use map on the result to convert the data into (station,(duration,(lat,lon)))
        
        
    


In [33]:
val station_latlon_rdd = data_rdd.map(row => {
  val station = row(4).toString
  val duration = row(13).asInstanceOf[Double]

  // get_lat_lon expects (lat, lon) as two inputs!
  val (lat, lon) = get_lat_lon(row(8), row(9))  // returns (Double, Double)

  (station, duration, (lat, lon))
}).filter{ case (_,_,(lat,lon)) => lat != 0.0 && lon != 0.0 }
val polygons = read_zipcode_polygons(zipcode_geojson_file).collect

station_latlon_rdd = MapPartitionsRDD[71] at filter at <console>:40
polygons = Array((11372,0,Array((-73.86942457284177,40.74915687096788), (-73.89143129977276,40.74684466041932), (-73.89507143240859,40.746465470812154), (-73.8961873786782,40.74850942518088), (-73.8958395418514,40.74854687570604), (-73.89525242774397,40.748306609450246), (-73.89654041085562,40.75054199814359), (-73.89579868613829,40.75061972133262), (-73.89652230661434,40.75438879610903), (-73.88164812188481,40.75595161704187), (-73.87221855882478,40.75694324806748), (-73.87167992356792,40.75398717439604), (-73.8720704651389,40.753862007052064), (-73.86942457284177,40.74915687096788))), (11004,1,Array((-73.711329...


Array((11372,0,Array((-73.86942457284177,40.74915687096788), (-73.89143129977276,40.74684466041932), (-73.89507143240859,40.746465470812154), (-73.8961873786782,40.74850942518088), (-73.8958395418514,40.74854687570604), (-73.89525242774397,40.748306609450246), (-73.89654041085562,40.75054199814359), (-73.89579868613829,40.75061972133262), (-73.89652230661434,40.75438879610903), (-73.88164812188481,40.75595161704187), (-73.87221855882478,40.75694324806748), (-73.87167992356792,40.75398717439604), (-73.8720704651389,40.753862007052064), (-73.86942457284177,40.74915687096788))), (11004,1,Array((-73.711329...

In [38]:
import org.apache.spark.Partitioner

class ZipcodePartitioner(polygons: Array[(String, Int, Array[(Double, Double)])])
  extends org.apache.spark.Partitioner {

  override val numPartitions: Int = polygons.map(_._2).distinct.size

  def findZip(lat: Double, lon: Double): Int = {
    polygons.find { case (zip, id, coords) =>
      find_zipcode((zip, id, coords), (lon, lat)) != -1
    } match {
      case Some((_, id, _)) => id
      case None => 0
    }
  }

  override def getPartition(key: Any): Int = {
    key match {
      case (lat: Double, lon: Double) => findZip(lat, lon)
      case _ => 0
    }
  }
}


defined class ZipcodePartitioner


In [39]:
import org.apache.spark.rdd.RDD

val custom_partition_rdd =
  station_latlon_rdd
    .map { case (station, duration, (lat, lon)) =>
      ((lat, lon), (station, duration))
    }
    .partitionBy(new ZipcodePartitioner(polygons))
    .map { case ((lat, lon), (station, duration)) =>
      (station, (duration, (lat, lon)))
    }

custom_partition_rdd = MapPartitionsRDD[87] at map at <console>:42


MapPartitionsRDD[87] at map at <console>:42

In [40]:
//Test: You should get 112 partitions with non-zero elements (not every zipcode has a citibike station!)
custom_partition_rdd.mapPartitions(iter => Array(iter.size).iterator).filter(v => v!=0).count

112

### STEP 6: Find the most active stations for each zipcode ###
* Write a function ***mostActiveStationPerZip*** that returns the most active station in each zipcode
* Since the data for each zipcode is in its own partition the summing up can be done separately on each partition
* You can't use byKey operations because they ALWAYS end up globally and involve a shuffle operation
* Instead, since the reduce operation needs to be done on each partition separatelyv use  ***mapPartitionswithIndex***, which works on each partition separately 
* Once we've found the most active, you will need to lookup the polygons object to match the partition number with the zipcode


In [41]:
import org.apache.spark.rdd.RDD
import scala.collection.mutable

// Computes the most active (highest total duration) station per partition.
def mostActiveStationPerZip(
  rdd: RDD[(String, (Double, (Double, Double)))]
): RDD[(Int, (String, Double))] = {

  rdd.mapPartitionsWithIndex { (pid, iter) =>

    // Create a mutable Map: station -> totalDuration
    val stationTotals = mutable.Map[String, Double]()

    // Aggregate duration per station
    iter.foreach { case (station, (duration, (lat, lon))) =>
      val current = stationTotals.getOrElse(station, 0.0)
      stationTotals(station) = current + duration
    }

    // Return max station from this partition
    if (stationTotals.nonEmpty) {
      val (maxStation, maxDuration) = stationTotals.maxBy(_._2)
      Iterator((pid, (maxStation, maxDuration)))
    } else {
      Iterator.empty
    }
  }
}


mostActiveStationPerZip: (rdd: org.apache.spark.rdd.RDD[(String, (Double, (Double, Double)))])org.apache.spark.rdd.RDD[(Int, (String, Double))]


In [44]:
val mostActives = mostActiveStationPerZip(custom_partition_rdd)

mostActives = MapPartitionsRDD[91] at mapPartitionsWithIndex at <console>:41


MapPartitionsRDD[91] at mapPartitionsWithIndex at <console>:41

In [46]:
// mostActives.take(5).foreach(println)

(0,(South St & Gouverneur Ln,2.2749358650000027E8))
(21,(Lewis Ave & Madison St,2.350225190540018E8))
(25,(Hancock St & Bedford Ave,2.5918673494900116E8))
(27,(Fulton St & Utica Ave,3.386516076930045E8))
(29,(Brooklyn Ave & Prospect Pl,839592.471000001))


In [51]:
val final_result = mostActives.map{ case (pid, (station, duration)) =>
  val zip = polygons.map{ case (zip, id, _) => (id, zip) }.toMap.getOrElse(pid, "Unknown")
  (zip, pid, (station, duration))
}

final_result = MapPartitionsRDD[94] at map at <console>:37


MapPartitionsRDD[94] at map at <console>:37

In [52]:
final_result.collect

Array((11372,0,(South St & Gouverneur Ln,2.2749358649999747E8)), (11221,21,(Lewis Ave & Madison St,2.350225190539995E8)), (11216,25,(Hancock St & Bedford Ave,2.5918673494900003E8)), (11233,27,(Fulton St & Utica Ave,3.3865160769300246E8)), (11213,29,(Brooklyn Ave & Prospect Pl,839592.4710000005)), (11225,31,(Franklin Ave & Empire Blvd,3.3806142728299993E8)), (11218,32,(Ocean Pkwy & Church Ave,3.1160455834999934E7)), (11226,33,(Parade Pl & Crooke Ave,1.2468014570000012E8)), (11230,36,(E 14 St & 1 Ave,1345.643)), (10467,41,(Jerome Ave & E Mosholu Parkway S,1.1223364132000007E7)), (10463,42,(Bailey Ave & W 193 St,3898469.234999996)), (10468,46,(Jerome Ave & W 193 St,2.141050999199991E7)), (10458,48,(Marion Ave & Mosholu Pkwy,3.826985452299983E...

#  Summary

This notebook demonstrates advanced Spark and data engineering techniques:

- Loading large datasets from Google Cloud Storage  
- Distributed data preparation & time computations  
- Aggregations using RDD API (`combineByKey`, `reduce`)  
- Evaluating partitioning strategies and shuffle behavior  
- Implementing a custom geospatial partitioner for zipcode-based locality  

The project highlights how proper partitioning and geospatial awareness can significantly improve performance in distributed systems.
