Graph Analytics with Apache Spark and GraphFrames
Interactive Practical Session

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Please make sure GraphFrames is installed in your Spark environment.
# Install GraphFrames if necessary:
! pip install graphframes

In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from graphframes import GraphFrame

In [3]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("GraphFrames Example") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12") \
    .getOrCreate()

24/10/25 20:29:16 WARN Utils: Your hostname, mouhcine resolves to a loopback address: 127.0.1.1; using 192.168.11.103 instead (on interface wlo1)
24/10/25 20:29:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/yuguerten/.ivy2/cache
The jars for the packages stored in: /home/yuguerten/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-42f910af-ae15-4a01-af4c-41d27e56fa76;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/yuguerten/studies/Semestre_3/Big_data_2/Tps/tp4/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 104ms :: artifacts dl 5ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-42f910af-ae15-4a01-af4c-41d27e56fa76
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/4ms)
24/10/25 20:29:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your

## Task 1: Load the Data
Load the station and trip data as DataFrames
Question: Inspect the loaded data and understand its structure.

In [5]:
# Load station data
station_data_path = "data/201508_station_data.csv"
bikeStations = spark.read.option("header", "true").csv(station_data_path)

In [7]:
# Load trip data
trip_data_path = "data/201508_trip_data.csv"
tripData = spark.read.option("header", "true").csv(trip_data_path)

In [8]:
# Display the first few rows of each DataFrame
print("Station Data:")
bikeStations.show(5)
print("Trip Data:")
tripData.show(5)

Station Data:
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

Trip Data:
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip

## Task 2: Define Vertices and Edges
Prepare vertices and edges for the graph using the GraphFrames conventions.
Rename columns to match GraphFrames requirements.

In [9]:
stationVertices = bikeStations.withColumnRenamed("name", "id").distinct()
tripEdges = tripData \
    .withColumnRenamed("Start Station", "src") \
    .withColumnRenamed("End Station", "dst")

In [10]:
# Display the prepared vertices and edges
print("Vertices (Stations):")
stationVertices.show(5)
print("Edges (Trips):")
tripEdges.show(5)

Vertices (Stations):
+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|        51|Embarcadero at Fo...|37.791464|-122.391034|       19|San Francisco|   8/20/2013|
|        58|San Francisco Cit...| 37.77865|-122.418235|       19|San Francisco|   8/21/2013|
|        60|Embarcadero at Sa...| 37.80477|-122.403234|       15|San Francisco|   8/21/2013|
|        65|     Townsend at 7th|37.771058|-122.402717|       15|San Francisco|   8/22/2013|
|        63|       Howard at 2nd|37.786978|-122.398108|       19|San Francisco|   8/22/2013|
+----------+--------------------+---------+-----------+---------+-------------+------------+
only showing top 5 rows

Edges (Trips):
+-------+--------+---------------+--------------------+--------------+---------------+----------------

## Task 3: Construct the Graph
Use the GraphFrames library to create a graph using the vertices and edges dataframes.

In [11]:
stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])

In [12]:
# Confirm graph statistics
print("Total Number of Stations:", stationGraph.vertices.count())
print("Total Number of Trips in Graph:", stationGraph.edges.count())

                                                                                

Total Number of Stations: 70




Total Number of Trips in Graph: 354152


                                                                                

## Task 4: Basic Graph Queries
Count the number of trips between popular stations.

In [13]:
popular_trips = stationGraph.edges \
    .groupBy("src", "dst") \
    .count() \
    .orderBy(desc("count"))

In [14]:
# Display the top 10 popular trips
print("Top 10 Popular Trips:")
popular_trips.show(10)

Top 10 Popular Trips:
+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|Harry Bridges Pla...|Embarcadero at Sa...| 3145|
|     2nd at Townsend|Harry Bridges Pla...| 2973|
|     Townsend at 7th|San Francisco Cal...| 2734|
|Harry Bridges Pla...|     2nd at Townsend| 2640|
|Embarcadero at Fo...|San Francisco Cal...| 2439|
|   Steuart at Market|     2nd at Townsend| 2356|
|Embarcadero at Sa...|   Steuart at Market| 2330|
|     Townsend at 7th|San Francisco Cal...| 2192|
|Temporary Transba...|San Francisco Cal...| 2184|
+--------------------+--------------------+-----+
only showing top 10 rows



QUESTION: Modify the query to filter trips from or to a specific station.

## Task 5: PageRank Calculation
Use PageRank to identify influential stations.

In [15]:
ranks = stationGraph.pageRank(resetProbability=0.15, maxIter=10)
# Display stations with highest PageRank values
print("Top 10 Stations by PageRank:")
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(10)

24/10/25 20:40:55 WARN MemoryStore: Not enough space to cache rdd_148_2 in memory! (computed 30.7 MiB so far)
24/10/25 20:40:55 WARN BlockManager: Block rdd_148_2 could not be removed as it was not found on disk or in memory
24/10/25 20:40:55 WARN BlockManager: Putting block rdd_148_2 failed
                                                                                

Top 10 Stations by PageRank:
+--------------------+------------------+
|                  id|          pagerank|
+--------------------+------------------+
|San Jose Diridon ...| 4.051504835989957|
|San Francisco Cal...| 3.351183296428704|
|Mountain View Cal...| 2.514390771015558|
|Redwood City Calt...|2.3263087713711688|
|San Francisco Cal...|2.2311442913698567|
|Harry Bridges Pla...|1.8251120118882902|
|     2nd at Townsend|1.5821217785039197|
|Santa Clara at Al...| 1.573007408490752|
|     Townsend at 7th| 1.568456580534067|
|Embarcadero at Sa...|1.5414242087748948|
+--------------------+------------------+
only showing top 10 rows



QUESTION: Try adjusting the `resetProbability` and `maxIter` parameters.

## Task 6: In-Degree and Out-Degree Analysis
Calculate the in-degree and out-degree for each station.
In-degree represents incoming trips, out-degree represents outgoing trips.

In [16]:
inDeg = stationGraph.inDegrees
outDeg = stationGraph.outDegrees

In [17]:
# Display stations with the highest in-degree and out-degree
print("Top 5 Stations by In-Degree:")
inDeg.orderBy(desc("inDegree")).show(5)

Top 5 Stations by In-Degree:
+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|San Francisco Cal...|   34810|
|San Francisco Cal...|   22523|
|Harry Bridges Pla...|   17810|
|     2nd at Townsend|   15463|
|     Townsend at 7th|   15422|
+--------------------+--------+
only showing top 5 rows



In [18]:
print("Top 5 Stations by Out-Degree:")
outDeg.orderBy(desc("outDegree")).show(5)

Top 5 Stations by Out-Degree:
+--------------------+---------+
|                  id|outDegree|
+--------------------+---------+
|San Francisco Cal...|    26304|
|San Francisco Cal...|    21758|
|Harry Bridges Pla...|    17255|
|Temporary Transba...|    14436|
|Embarcadero at Sa...|    14158|
+--------------------+---------+
only showing top 5 rows



QUESTION: What does the degree ratio (inDegree/outDegree) tell us about the station’s activity?

## Task 7: Breadth-First Search (BFS)
Use BFS to find shortest paths between two stations.
Set `fromExpr` and `toExpr` to identify specific stations for the BFS traversal.

In [19]:
bfs_result = stationGraph.bfs(
    fromExpr="id = 'Townsend at 7th'",
    toExpr="id = 'Spear at Folsom'",
    maxPathLength=2
)

In [20]:
# Display the BFS result
print("BFS Path from 'Townsend at 7th' to 'Spear at Folsom':")
bfs_result.show(5)

BFS Path from 'Townsend at 7th' to 'Spear at Folsom':
+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|{65, Townsend at ...|{913371, 663, 8/3...|{49, Spear at Fol...|
|{65, Townsend at ...|{913265, 658, 8/3...|{49, Spear at Fol...|
|{65, Townsend at ...|{911919, 722, 8/3...|{49, Spear at Fol...|
|{65, Townsend at ...|{910777, 704, 8/2...|{49, Spear at Fol...|
|{65, Townsend at ...|{908994, 1115, 8/...|{49, Spear at Fol...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



QUESTION: Experiment with different stations to find paths between them.

## Task 8: Connected Components
Use connected components to find subgraphs with connected stations.
This method assumes an undirected graph. Checkpointing is necessary for long processes.

In [23]:
spark.sparkContext.setCheckpointDir("tmp/checkpoints")
sampledGraph = GraphFrame(stationVertices, tripEdges.sample(False, 0.1))
cc = sampledGraph.connectedComponents()

24/10/25 20:43:40 WARN CacheManager: Asked to cache already cached data.

Py4JJavaError: An error occurred while calling o158.run.
: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:101)
	at java.base/java.lang.StringBuilder.<init>(StringBuilder.java:119)
	at org.apache.spark.sql.catalyst.util.StringConcat.toString(StringUtils.scala:62)
	at org.apache.spark.sql.catalyst.util.StringUtils$PlanStringConcat.toString(StringUtils.scala:152)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:254)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:777)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$9(AdaptiveSparkPlanExec.scala:373)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda/0x0000702c40fdfa30.apply$mcVJ$sp(Unknown Source)
	at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:373)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda/0x0000702c40f717f8.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3575)
	at org.apache.spark.sql.Dataset$$Lambda/0x0000702c4113d4b8.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.Dataset$$Lambda/0x0000702c40d2fd30.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset$$Lambda/0x0000702c40c4e3a0.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda/0x0000702c40c51c08.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda/0x0000702c40c4e668.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)


In [None]:
# Display connected components
print("Connected Components:")
cc.show(5)

QUESTION: How many unique connected components are found? Does the result align with your expectation?

End of Practical Notebook Session

In [24]:
# Stop the Spark session after the practical session is complete.
spark.stop()