Mount Google Drive

In [None]:
from google.colab import drive

# Set the base path to your lab folder in Drive
base_path = "/content/drive/MyDrive/"
station_path = base_path + "station_data.csv"
trip_path    = base_path + "trip_data.csv"


Install required packages

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install graphframes
!pip install tqdm


E: Failed to fetch http://security.ubuntu.com/ubuntu/pool/main/o/openjdk-lts/openjdk-11-jre-headless_11.0.28%2b6-1ubuntu1%7e22.04.1_amd64.deb  404  Not Found [IP: 185.125.190.82 80]
E: Failed to fetch http://security.ubuntu.com/ubuntu/pool/main/o/openjdk-lts/openjdk-11-jdk-headless_11.0.28%2b6-1ubuntu1%7e22.04.1_amd64.deb  404  Not Found [IP: 185.125.190.82 80]
E: Unable to fetch some archives, maybe run apt-get update or try with --fix-missing?
Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl.metadata (934 bytes)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


Imports and device setup

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, lit
from graphframes import GraphFrame
from tqdm.notebook import tqdm

# Device agnostic for PyTorch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


Using device: cpu


Start SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("TP05_GraphFrames") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
    .getOrCreate()

print("Spark session started. Version:", spark.version)


Spark session started. Version: 3.5.1


Load CSV files

In [None]:
stations_df = spark.read.csv(station_path, header=True, inferSchema=True)
trips_df    = spark.read.csv(trip_path, header=True, inferSchema=True)

stations_df.show(5, truncate=False)
trips_df.show(5, truncate=False)


+----------+---------------------------------+---------+-----------+---------+--------+------------+
|station_id|name                             |lat      |long       |dockcount|landmark|installation|
+----------+---------------------------------+---------+-----------+---------+--------+------------+
|2         |San Jose Diridon Caltrain Station|37.329732|-121.901782|27       |San Jose|8/6/2013    |
|3         |San Jose Civic Center            |37.330698|-121.888979|15       |San Jose|8/5/2013    |
|4         |Santa Clara at Almaden           |37.333988|-121.894902|11       |San Jose|8/6/2013    |
|5         |Adobe on Almaden                 |37.331415|-121.8932  |19       |San Jose|8/5/2013    |
|6         |San Pedro Square                 |37.336721|-121.894074|15       |San Jose|8/7/2013    |
+----------+---------------------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

+-------+--------+---------------+--------------------------------

Prepare vertices and edges for GraphFrame

In [None]:
# Vertices: rename 'name' -> 'id'
vertices = stations_df.withColumnRenamed("name", "id")
vertices.show(5, truncate=False)

# Edges: rename 'Start Station' -> 'src', 'End Station' -> 'dst'
edges = trips_df.withColumnRenamed("Start Station", "src") \
                .withColumnRenamed("End Station", "dst")
edges.show(5, truncate=False)


+----------+---------------------------------+---------+-----------+---------+--------+------------+
|station_id|id                               |lat      |long       |dockcount|landmark|installation|
+----------+---------------------------------+---------+-----------+---------+--------+------------+
|2         |San Jose Diridon Caltrain Station|37.329732|-121.901782|27       |San Jose|8/6/2013    |
|3         |San Jose Civic Center            |37.330698|-121.888979|15       |San Jose|8/5/2013    |
|4         |Santa Clara at Almaden           |37.333988|-121.894902|11       |San Jose|8/6/2013    |
|5         |Adobe on Almaden                 |37.331415|-121.8932  |19       |San Jose|8/5/2013    |
|6         |San Pedro Square                 |37.336721|-121.894074|15       |San Jose|8/7/2013    |
+----------+---------------------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

+-------+--------+---------------+--------------------------------

Create GraphFrame

In [None]:
g = GraphFrame(vertices, edges)

print("Number of stations:", g.vertices.count())
print("Number of trips:", g.edges.count())




Number of stations: 70
Number of trips: 99


Query 5: Trips count between each source-destination

In [None]:
trip_counts = edges.groupBy("src", "dst").count().orderBy(desc("count"))
trip_counts.show(50, truncate=False)


+---------------------------------------------+---------------------------------------------+-----+
|src                                          |dst                                          |count|
+---------------------------------------------+---------------------------------------------+-----+
|San Francisco Caltrain 2 (330 Townsend)      |Townsend at 7th                              |4    |
|5th at Howard                                |San Francisco Caltrain 2 (330 Townsend)      |3    |
|San Francisco Caltrain 2 (330 Townsend)      |Powell Street BART                           |2    |
|2nd at Townsend                              |Market at Sansome                            |2    |
|Spear at Folsom                              |2nd at Townsend                              |2    |
|Market at Sansome                            |Broadway St at Battery St                    |2    |
|Davis at Jackson                             |Embarcadero at Sansome                       |2    |


Query 6: Trips starting or ending at 'Townsend at 7th'

In [None]:
station = "Townsend at 7th"

# Trips starting from the station
trips_from = edges.filter(edges.src == station).groupBy("dst").count() \
                  .withColumnRenamed("dst", "other_station") \
                  .withColumn("direction", lit("from")) \
                  .orderBy(desc("count"))

# Trips ending at the station
trips_to = edges.filter(edges.dst == station).groupBy("src").count() \
                .withColumnRenamed("src", "other_station") \
                .withColumn("direction", lit("to")) \
                .orderBy(desc("count"))

trips_from.show(50, truncate=False)
trips_to.show(50, truncate=False)


+------------------------------------+-----+---------+
|other_station                       |count|direction|
+------------------------------------+-----+---------+
|Spear at Folsom                     |1    |from     |
|Harry Bridges Plaza (Ferry Building)|1    |from     |
+------------------------------------+-----+---------+

+---------------------------------------+-----+---------+
|other_station                          |count|direction|
+---------------------------------------+-----+---------+
|San Francisco Caltrain 2 (330 Townsend)|4    |to       |
|Spear at Folsom                        |1    |to       |
|5th at Howard                          |1    |to       |
|Howard at 2nd                          |1    |to       |
+---------------------------------------+-----+---------+



Query 7: Stations never a destination from 'Spear at Folsom'

In [None]:
src_station = "Spear at Folsom"

dsts_from_spear = edges.filter(edges.src == src_station).select("dst").distinct()
never_dest = vertices.join(dsts_from_spear, vertices.id == dsts_from_spear.dst, how="left_anti")
never_dest.show(50, truncate=False)


+----------+---------------------------------------------+----------+------------+---------+-------------+------------+
|station_id|id                                           |lat       |long        |dockcount|landmark     |installation|
+----------+---------------------------------------------+----------+------------+---------+-------------+------------+
|2         |San Jose Diridon Caltrain Station            |37.329732 |-121.901782 |27       |San Jose     |8/6/2013    |
|3         |San Jose Civic Center                        |37.330698 |-121.888979 |15       |San Jose     |8/5/2013    |
|4         |Santa Clara at Almaden                       |37.333988 |-121.894902 |11       |San Jose     |8/6/2013    |
|5         |Adobe on Almaden                             |37.331415 |-121.8932   |19       |San Jose     |8/5/2013    |
|6         |San Pedro Square                             |37.336721 |-121.894074 |15       |San Jose     |8/7/2013    |
|7         |Paseo de San Antonio        

Query 8: Station with max incoming trips

In [None]:
incoming_counts = edges.groupBy("dst").count().orderBy(desc("count"))
incoming_counts.show(10, truncate=False)

top_station = incoming_counts.limit(1)
top_station.show()


+----------------------------------------+-----+
|dst                                     |count|
+----------------------------------------+-----+
|San Francisco Caltrain 2 (330 Townsend) |9    |
|2nd at Townsend                         |8    |
|Townsend at 7th                         |7    |
|Embarcadero at Sansome                  |7    |
|San Francisco Caltrain (Townsend at 4th)|7    |
|Market at Sansome                       |5    |
|Spear at Folsom                         |5    |
|Steuart at Market                       |5    |
|2nd at South Park                       |4    |
|Harry Bridges Plaza (Ferry Building)    |3    |
+----------------------------------------+-----+
only showing top 10 rows

+--------------------+-----+
|                 dst|count|
+--------------------+-----+
|San Francisco Cal...|    9|
+--------------------+-----+



Query 9: Trip with max duration

In [None]:
duration_col = "Duration"

edges.orderBy(desc(duration_col)).limit(1).show(truncate=False)


+-------+--------+---------------+---------------------+--------------+---------------+------------------------------------+------------+------+---------------+--------+
|Trip ID|Duration|Start Date     |src                  |Start Terminal|End Date       |dst                                 |End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+---------------------+--------------+---------------+------------------------------------+------------+------+---------------+--------+
|913386 |1808    |8/31/2015 20:23|Embarcadero at Bryant|54            |8/31/2015 20:53|Harry Bridges Plaza (Ferry Building)|50          |524   |Subscriber     |94105   |
+-------+--------+---------------+---------------------+--------------+---------------+------------------------------------+------------+------+---------------+--------+



# Query 10: Subgraph for 'Townsend at 7th'

In [None]:
from graphframes import GraphFrame

station = "Townsend at 7th"

# Filter edges
sub_edges = edges.filter((edges.src == station) | (edges.dst == station))

# Get all station IDs from sub_edges
station_ids = (
    sub_edges.select("src").union(sub_edges.select("dst"))
    .distinct()
    .withColumnRenamed("src", "id")   # standardize column name
)

# Join correctly
sub_vertices = vertices.join(station_ids, "id", "inner").distinct()

# Build subgraph
sub_g = GraphFrame(sub_vertices, sub_edges)

# Show results
print("Subgraph edges:")
sub_g.edges.show(truncate=False)

print("Subgraph vertices:")
sub_g.vertices.show(truncate=False)


Subgraph edges:
+-------+--------+---------------+---------------------------------------+--------------+---------------+------------------------------------+------------+------+---------------+--------+
|Trip ID|Duration|Start Date     |src                                    |Start Terminal|End Date       |dst                                 |End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+---------------------------------------+--------------+---------------+------------------------------------+------------+------+---------------+--------+
|913434 |283     |8/31/2015 21:19|San Francisco Caltrain 2 (330 Townsend)|69            |8/31/2015 21:24|Townsend at 7th                     |65          |521   |Subscriber     |94107   |
|913404 |273     |8/31/2015 20:39|San Francisco Caltrain 2 (330 Townsend)|69            |8/31/2015 20:44|Townsend at 7th                     |65          |287   |Subscriber     |94107   |
|913382 |645     |8/31/2015 20:20|Townsend a

Query 11: Triangle motifs

In [None]:
triangles = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)") \
             .filter("a.id != b.id AND b.id != c.id AND c.id != a.id")
triangles.select("a.id","b.id","c.id").show(50, truncate=False)




+---------------------------------------+---------------------------------------+---------------------------------------+
|id                                     |id                                     |id                                     |
+---------------------------------------+---------------------------------------+---------------------------------------+
|Embarcadero at Sansome                 |Steuart at Market                      |2nd at Townsend                        |
|Embarcadero at Sansome                 |Market at Sansome                      |2nd at Townsend                        |
|San Francisco Caltrain 2 (330 Townsend)|Townsend at 7th                        |Spear at Folsom                        |
|Spear at Folsom                        |San Francisco Caltrain 2 (330 Townsend)|2nd at Townsend                        |
|Spear at Folsom                        |San Francisco Caltrain 2 (330 Townsend)|Townsend at 7th                        |
|Spear at Folsom        

Query 12: Paths of length 3 from 'Townsend at 7th'

In [None]:
paths3 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)") \
          .filter(f"a.id = '{station}'") \
          .filter("a.id != b.id AND b.id != c.id AND c.id != d.id")
paths3.select("a.id","b.id","c.id","d.id").show(50, truncate=False)


+---------------+------------------------------------+----------------------------------------+---------------------------------------------+
|id             |id                                  |id                                      |id                                           |
+---------------+------------------------------------+----------------------------------------+---------------------------------------------+
|Townsend at 7th|Harry Bridges Plaza (Ferry Building)|2nd at Townsend                         |Spear at Folsom                              |
|Townsend at 7th|Harry Bridges Plaza (Ferry Building)|2nd at Townsend                         |Market at Sansome                            |
|Townsend at 7th|Harry Bridges Plaza (Ferry Building)|2nd at Townsend                         |Howard at 2nd                                |
|Townsend at 7th|Harry Bridges Plaza (Ferry Building)|2nd at Townsend                         |Market at Sansome                            |
|Towns