# Citibike Network Analysis Using GraphFrames

This notebook analyzes the bicycle flow network of New York City's CitiBike system using **Apache Spark** and **GraphFrames**, applying graph analytics to identify station importance and local clustering structure.

We construct a directed and undirected graph of station-to-station flows, run PageRank to measure inflow and outflow importance, and compute clustering coefficients to identify tightly connected neighborhoods within the bike network.


In [1]:
%AddJar https://repos.spark-packages.org/graphframes/graphframes/0.8.4-spark3.5-s_2.12/graphframes-0.8.4-spark3.5-s_2.12.jar

Starting download from https://repos.spark-packages.org/graphframes/graphframes/0.8.4-spark3.5-s_2.12/graphframes-0.8.4-spark3.5-s_2.12.jar
Finished download of graphframes-0.8.4-spark3.5-s_2.12.jar
Using cached version of graphframes-0.8.4-spark3.5-s_2.12.jar


In [2]:
//GraphFrame imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


//GraphX imports
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD




## Load and Preprocess CitiBike Trip Data and constructing the Graph

We begin by loading a preprocessed dataset of Citibike trips.  
Each trip contains:

- start station  
- end station  
- usage counts  

We aggregate the data to produce weighted edges of the form: (start_station → end_station, number_of_trips)
This will serve as the edge list for constructing the CitiBike network graph.

We build the graph using:

- A **vertex DataFrame** representing all unique stations  
- An **edge DataFrame** representing station-to-station flows  
- A **weight column** (trip counts) representing flow magnitude

The result is a directed graph capturing mobility patterns between stations.



In [3]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.graphframes.GraphFrame

val spark = SparkSession.builder()
  .appName("CitiBike Graph")
  .getOrCreate()

val text = sc.textFile("201710-citibike-tripdata.csv")


val header = text.first()
val data = text.filter(row => row != header)


val rows = data.map(line => line.split(","))

val df = rows.map(r => (
  r(3).toInt,  // start station id
  r(4),        // start station name
  r(7).toInt,  // end station id
  r(8),        // end station name
  r(0).toInt   // tripduration
)).toDF("start_id", "start_name", "end_id", "end_name", "duration")


val startStations = df.select(
  col("start_id").alias("id"),
  col("start_name").alias("name")
)

val endStations = df.select(
  col("end_id").alias("id"),
  col("end_name").alias("name")
)

val vertices = startStations.union(endStations).distinct()


val edges = df.select(
  col("start_id").alias("src"),
  col("end_id").alias("dst"),
  col("duration")
)


val g = GraphFrame(vertices, edges)


spark = org.apache.spark.sql.SparkSession@7b66a474
text = 201710-citibike-tripdata.csv MapPartitionsRDD[1] at textFile at <console>:61
header = "tripduration","starttime","stoptime","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender"
data = MapPartitionsRDD[2] at filter at <console>:65
rows = MapPartitionsRDD[3] at map at <console>:68


df: org.apache.spark.sql.Data...


MapPartitionsRDD[3] at map at <console>:68

##  Graph Summary Statistics

We explore basic graph properties such as:

- Number of vertices (stations)
- Number of edges (unique station-to-station flows)
- Degree distribution
- In-degree and out-degree rankings

These metrics help reveal which stations act as major hubs in the network.

In [4]:

def make_undirected_graph(g: GraphFrame) = {
    val u_edge_df = g.find("(a)-[]->(b)")
        .select($"a.id".as("src"),$"b.id".as("dst"))
        .withColumn("swap",when(col("src")<col("dst"),col("dst")))
        .withColumn("dst",
                    when(col("swap").isNotNull,col("src"))
                    .otherwise(col("dst")))
        .withColumn("src",
                    when(col("swap").isNotNull,col("swap"))
                   .otherwise(col("src")))
        .drop(col("swap"))
        .distinct
    val u_vertices_df = g.vertices
    val u_g = GraphFrame(u_vertices_df,u_edge_df)    
    u_g
}

make_undirected_graph: (g: org.graphframes.GraphFrame)org.graphframes.GraphFrame


In [5]:
val ug = make_undirected_graph(g)


val numStations = g.vertices.count()


val numTrips = g.edges.count()


val sameStationTrips = g.edges.filter("src = dst").count()


val uniqueConnections = ug.edges.filter("src != dst").count()

println("===== CitiBike Network Statistics =====")
println(s"Number of stations: $numStations")
println(s"Number of trips: $numTrips")
println(s"Trips starting and ending at same station: $sameStationTrips")
println(s"Unique station-to-station connections: $uniqueConnections")

===== CitiBike Network Statistics =====
Number of stations: 785
Number of trips: 1897592
Trips starting and ending at same station: 33245
Unique station-to-station connections: 107524


ug = GraphFrame(v:[id: int, name: string], e:[src: int, dst: int])
numStations = 785
numTrips = 1897592
sameStationTrips = 33245
uniqueConnections = 107524


107524

In [9]:

val maxOut = outDeg.orderBy(desc("outDegree")).limit(1)

outDeg.show()

+----+---------+
|  id|outDegree|
+----+---------+
| 496|     7169|
| 471|     1644|
| 392|     2692|
| 540|     5398|
| 243|     1517|
|3179|      420|
|3226|     2667|
| 516|     1966|
|3352|      322|
| 251|     5735|
|3490|      308|
|3377|     1113|
| 458|     6038|
|3561|      286|
|3425|      947|
|3566|      307|
|3042|      734|
|3105|      846|
| 481|     2151|
|3242|     1700|
+----+---------+
only showing top 20 rows



maxOut = [id: int, outDegree: int]


[id: int, outDegree: int]

In [11]:

val outDeg = g.outDegrees

val maxOut = outDeg.orderBy(desc("outDegree")).limit(1)

val mostActiveStation = maxOut
  .join(g.vertices, maxOut("id") === g.vertices("id"))
  .select(g.vertices("name"), maxOut("outDegree"))

println("===== Station with Most Originating Trips =====")
mostActiveStation.show(false)

===== Station with Most Originating Trips =====
+-----------------------+---------+
|name                   |outDegree|
+-----------------------+---------+
|"Pershing Square North"|17995    |
+-----------------------+---------+



outDeg = [id: int, outDegree: int]
maxOut = [id: int, outDegree: int]
mostActiveStation = [name: string, outDegree: int]


[name: string, outDegree: int]

## STEP 4: Proportion of trips for each station that start and end at that same station
* Create a GraphX graph from the GraphFrames graph (use the method that retains datatypes)
* Use aggregateMessages to calculate the number of trips that start and end at the same vertex (for each vertex)
* Convert the resulting (VertexRDD) to a DataFrame
* Using join add the location of the station column to the result df from the previous step and then use select to create a dataframe with the schema (vertex, location, trips)
* Join this df to the out degrees df created earlier
* Divide the "same trips" column by the "out degrees column" 
* Sort the resulting df by this proportion in descending order


In [12]:
val v = g.vertices.rdd.map(r => (r(0).toString.toLong,r(1).toString))
val e = g.edges.rdd.map(r => Edge(r(0).toString.toLong,r(1).toString.toLong,r(2).toString.toDouble))
val gx: Graph[String, Double] = Graph(v, e)

v = MapPartitionsRDD[100] at map at <console>:58
e = MapPartitionsRDD[106] at map at <console>:59
gx = org.apache.spark.graphx.impl.GraphImpl@5b34b8e


org.apache.spark.graphx.impl.GraphImpl@5b34b8e

In [13]:
import org.apache.spark.graphx._
import spark.implicits._


val sameTrips: VertexRDD[Int] = gx.aggregateMessages[Int](
  ctx => {
    if (ctx.srcId == ctx.dstId) {
      ctx.sendToSrc(1)
    }
  },
  _ + _
)


val sameTripsDF = sameTrips.toDF("vertex", "sameTrips")


val stationDF = sameTripsDF
  .join(g.vertices, sameTripsDF("vertex") === g.vertices("id"))
  .select(
    sameTripsDF("vertex"),
    col("name").alias("location"),
    col("sameTrips")
  )


val combinedDF = stationDF
  .join(outDeg, stationDF("vertex") === outDeg("id"))


val resultDF = combinedDF.withColumn(
  "proportion",
  col("sameTrips") / col("outDegree")
)


val finalDF = resultDF.orderBy(desc("proportion"))


println("===== Same-Station Trip Proportion per Station =====")
finalDF.select("vertex", "location", "sameTrips", "outDegree", "proportion").show(20, false)


===== Same-Station Trip Proportion per Station =====
+------+-----------------------------------------------+---------+---------+-------------------+
|vertex|location                                       |sameTrips|outDegree|proportion         |
+------+-----------------------------------------------+---------+---------+-------------------+
|3488  |"8D QC Station 01"                             |1        |1        |1.0                |
|3245  |"NYCBS DEPOT - DELANCEY"                       |1        |2        |0.5                |
|3182  |"Yankee Ferry Terminal"                        |309      |900      |0.3433333333333333 |
|3254  |"Soissons Landing"                             |358      |1100     |0.32545454545454544|
|3342  |"Pioneer St & Richards St"                     |59       |299      |0.19732441471571907|
|3477  |"39 St & 2 Ave - Citi Bike HQ at Industry City"|45       |245      |0.1836734693877551 |
|3532  |"Ditmars Blvd & 19 St"                         |70       |407     

sameTrips = VertexRDDImpl[126] at RDD at VertexRDD.scala:57
sameTripsDF = [vertex: bigint, sameTrips: int]
stationDF = [vertex: bigint, location: string ... 1 more field]
combinedDF = [vertex: bigint, location: string ... 3 more fields]
resultDF = [vertex: bigint, location: string ... 4 more fields]
finalDF = [vertex: bigint, location: string ... 4 more fields]


[vertex: bigint, location: string ... 4 more fields]

## Create a new graph that contains all edges except for those between the same station


In [17]:

val new_g = g.edges.filter("src != dst")


val g_noSelf = GraphFrame(g.vertices, new_g)

g_noSelf.edges.show()


new_g = [src: int, dst: int ... 1 more field]
g_noSelf = GraphFrame(v:[id: int, name: string], e:[src: int, dst: int ... 1 more field])


+----+----+--------+
| src| dst|duration|
+----+----+--------+
| 479| 478|     457|
| 279| 307|    6462|
| 504| 350|     761|
|3236|3233|    1193|
|2006| 469|    2772|
|3443| 468|     260|
| 478|3224|     715|
|3305| 305|     808|
| 284| 355|    1143|
| 432| 295|     527|
|3430|3090|     516|
|2006| 469|    2764|
| 470| 458|     439|
|3414|3346|     292|
|3328|3314|     242|
|3414|3346|     284|
|3430|3109|     573|
| 161| 537|    1289|
|2006| 469|    2739|
| 440| 445|     949|
+----+----+--------+
only showing top 20 rows



GraphFrame(v:[id: int, name: string], e:[src: int, dst: int ... 1 more field])

## Calculate the average duration between every pair of stations


In [16]:
g_noSelf.edges.createOrReplaceTempView("edges")
val avgDurationDF = spark.sql("""
  SELECT 
    src,
    dst,
    AVG(duration) AS avg_duration
  FROM edges
  GROUP BY src, dst

""")
avgDurationDF.show(20)

+----+----+------------------+
| src| dst|      avg_duration|
+----+----+------------------+
| 504| 350| 772.7647058823529|
| 433| 527| 532.9677419354839|
| 434| 470|316.52272727272725|
| 438| 151|  546.195652173913|
| 445| 507| 553.3947368421053|
|2021| 446| 827.6904761904761|
| 116| 518| 1115.857142857143|
|3435| 358| 895.6666666666666|
|3402|3414| 634.1666666666666|
| 498| 495| 801.2272727272727|
|3637| 418|             889.7|
| 380|3260|419.42105263157896|
|3360| 507|            1442.0|
| 326| 247|             713.0|
|3358| 467| 500.6666666666667|
|3164| 457|502.97241379310344|
| 498| 528| 434.3207547169811|
| 405|3256| 843.2168674698795|
| 477|2000|2672.3333333333335|
|3226|3163| 686.4444444444445|
+----+----+------------------+
only showing top 20 rows



avgDurationDF = [src: int, dst: int ... 1 more field]


[src: int, dst: int ... 1 more field]

## PageRank for Over-Fullness Propensity and for Emptiness Propensity

A station is likely to become **over-full** if:

- Many riders **arrive** at the station  
- And those riders come from stations that also have high arrival rates  

This behavior aligns with PageRank logic, where importance is based on weighted incoming edges.

We run weighted PageRank on the directed graph and report the **top 5 stations most prone to over-fullness**.

A station is likely to become **empty** if:

- Many riders **leave** the station  
- And they go to stations with high departures  

To capture this behavior, we **reverse all edges** and run PageRank again.

We report the **top 5 stations most prone to emptiness**.

In [4]:

val results = g.pageRank
  .resetProbability(0.15)
  .tol(0.0001)   
  .run()

val top5 = results.vertices
  .orderBy(desc("pagerank"))
  .limit(5)

top5.show(false)

+---+-----------------------+------------------+
|id |name                   |pagerank          |
+---+-----------------------+------------------+
|519|"Pershing Square North"|4.930887390071603 |
|426|"West St & Chambers St"|3.7410934274030576|
|402|"Broadway & E 22 St"   |3.58520147183096  |
|497|"E 17 St & Broadway"   |3.537658018512581 |
|435|"W 21 St & 6 Ave"      |3.438585855241344 |
+---+-----------------------+------------------+



results = GraphFrame(v:[id: int, name: string ... 1 more field], e:[src: int, dst: int ... 2 more fields])
top5 = [id: int, name: string ... 1 more field]


[id: int, name: string ... 1 more field]

In [7]:

val results = g.pageRank
  .resetProbability(0.15)
  .tol(0.0001)   
  .run()
  .edges


results.show()

+---+---+--------+--------------------+
|src|dst|duration|              weight|
+---+---+--------+--------------------+
| 72|236|    1748|2.259887005649717...|
| 72|236|    1659|2.259887005649717...|
| 72|236|    1413|2.259887005649717...|
| 72|236|    1396|2.259887005649717...|
| 72|236|    1473|2.259887005649717...|
| 72|236|    1748|2.259887005649717...|
| 72|236|    1659|2.259887005649717...|
| 72|236|    1413|2.259887005649717...|
| 72|236|    1396|2.259887005649717...|
| 72|236|    1473|2.259887005649717...|
| 72|236|    1748|2.259887005649717...|
| 72|236|    1659|2.259887005649717...|
| 72|236|    1413|2.259887005649717...|
| 72|236|    1396|2.259887005649717...|
| 72|236|    1473|2.259887005649717...|
| 72|236|    1748|2.259887005649717...|
| 72|236|    1659|2.259887005649717...|
| 72|236|    1413|2.259887005649717...|
| 72|236|    1396|2.259887005649717...|
| 72|236|    1473|2.259887005649717...|
+---+---+--------+--------------------+
only showing top 20 rows



results = [src: int, dst: int ... 2 more fields]


[src: int, dst: int ... 2 more fields]

In [31]:
val reversedEdges = g.edges.select(
  col("dst").alias("src"),
  col("src").alias("dst"),
  col("duration")
)

val g_reversed = GraphFrame(g.vertices, reversedEdges)

val emptyRanks = g_reversed.pageRank
  .resetProbability(0.15)
  .tol(0.0001)
  .run()

val topEmpty = emptyRanks.vertices
  .select("id","name","pagerank")
  .orderBy(col("pagerank").desc)
  .limit(5)

topEmpty.show(false)


+----+-----------------------+------------------+
|id  |name                   |pagerank          |
+----+-----------------------+------------------+
|3197|"Hs Don't Use"         |5.710640869520749 |
|519 |"Pershing Square North"|5.012823444592198 |
|3480|"WS Don't Use"         |4.2722846432845945|
|402 |"Broadway & E 22 St"   |3.45152110690382  |
|497 |"E 17 St & Broadway"   |3.334725974545746 |
+----+-----------------------+------------------+



reversedEdges = [src: int, dst: int ... 1 more field]
g_reversed = GraphFrame(v:[id: int, name: string], e:[src: int, dst: int ... 1 more field])
emptyRanks = GraphFrame(v:[id: int, name: string ... 1 more field], e:[src: int, dst: int ... 2 more fields])
topEmpty = [id: int, name: string ... 1 more field]


[id: int, name: string ... 1 more field]

##  Clustering Coefficient Analysis

We compute the clustering coefficient of each station in the **undirected** version of the graph.

For each station:
- Let *k* be the degree (number of neighboring stations)
- Actual triangles = number of closed triplets involving the station
- Possible triangles = k * (k - 1) / 2

The clustering coefficient is: C = actual_triangles / possible_triangles

We report the **top 20 stations with the highest clustering coefficients**, representing highly interconnected sub-networks.



In [49]:
val ug = make_undirected_graph(g)


// GraphFrames -> GraphX
val v = ug.vertices.rdd.map(r => (r.getInt(0).toLong, r.getString(1)))
val e = ug.edges.rdd.map(r => Edge(r.getInt(0).toLong, r.getInt(1).toLong, 1))
val gx = Graph(v, e)

// triangles
val triangleCounts = gx.triangleCount().vertices.toDF("vertex","triangles")

// degrees
val degrees = gx.degrees.toDF("vertex", "degree")

// clustering coefficient
val clusteringDF = triangleCounts
  .join(degrees, "vertex")
  .withColumn("possible", col("degree") * (col("degree") - 1) / 2)
  .withColumn("clusteringCoefficient",
     when(col("possible") > 0, col("triangles") / col("possible"))
       .otherwise(0)
  )

val finalResult = clusteringDF
  .join(ug.vertices, clusteringDF("vertex") === ug.vertices("id"))
  .select(
    col("vertex").alias("id"),
    col("name").alias("location"),
    col("clusteringCoefficient").alias("coeff")
  )
  .orderBy(col("coeff").desc)
  .limit(20)

finalResult.show(false)


+----+--------------------------+------------------+
|id  |location                  |coeff             |
+----+--------------------------+------------------+
|3647|"48 Ave & 30 Pl"          |1.0               |
|3485|"NYCBS Depot - RIS"       |1.0               |
|153 |"E 40 St & 5 Ave"         |1.0               |
|3040|"GOW Tech Shop"           |1.0               |
|3192|"Liberty Light Rail"      |1.0               |
|3186|"Grove St PATH"           |1.0               |
|3279|"Dixon Mills"             |1.0               |
|3639|"Harborside"              |1.0               |
|339 |"Avenue D & E 12 St"      |0.877201420748853 |
|3464|"W 37 St & Broadway"      |0.8679573382796197|
|247 |"Perry St & Bleecker St"  |0.8602079768329604|
|3175|"W 70 St & Amsterdam Ave" |0.8592469808193227|
|3176|"W 64 St & West End Ave"  |0.8568452539928423|
|3623|"W 120 St & Claremont Ave"|0.8549019607843137|
|3491|"E 118 St & 1 Ave"        |0.854122621564482 |
|266 |"Avenue D & E 8 St"       |0.84921898025

ug = GraphFrame(v:[id: int, name: string], e:[src: int, dst: int])
v = MapPartitionsRDD[5455] at map at <console>:78
e = MapPartitionsRDD[5477] at map at <console>:79
gx = org.apache.spark.graphx.impl.GraphImpl@11f060eb
triangleCounts = [vertex: bigint, triangles: int]
degrees = [vertex: bigint, degree: int]
clusteringDF = [vertex: bigint, triangles: int ... 3 more fields]
finalResult = [id: bigint, location: string ... 1 more field]


[id: bigint, location: string ... 1 more field]

##  Summary & Insights

This graph analysis provides several operational insights for CitiBike:

### • Over-fullness Stations (High Inflow PageRank)
These stations attract riders from other highly active stations and may require **frequent bike removal**.

### • Emptiness Stations (High Outflow PageRank on Reversed Graph)
These stations consistently send bikes elsewhere and may need **bike replenishment**.

### • High Clustering Coefficient Stations
These stations sit within highly interconnected neighborhoods, suggesting **dense local mobility** patterns.

Such insights can support:
- Resource deployment planning  
- Rebalancing operations  
- Infrastructure upgrades  
- Urban mobility optimization  