# Data Exploration and Data Pre-processing

This Notebook sets up and configures a Spark session that involves processing large datasets from Swiss public transportation systems (SBB). The code includes detailed data cleaning, filtering, and transformation steps to prepare the data for further analysis.The goal is to prepare the data for robust journey planning applications by filtering out irrelevant records, managing data types, and ensuring data integrity and accuracy.


# Initialize the Spark environment

In [3]:
%%configure -f
{ "conf": {
        "mapreduce.input.fileinputformat.input.dir.recursive": true,
        "spark.sql.extensions": "com.hortonworks.spark.sql.rule.Extensions",
        "spark.kryo.registrator": "com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator",
        "spark.sql.hive.hiveserver2.jdbc.url": "jdbc:hive2://iccluster065.iccluster.epfl.ch:2181,iccluster080.iccluster.epfl.ch:2181,iccluster066.iccluster.epfl.ch:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2",
        "spark.datasource.hive.warehouse.read.mode": "JDBC_CLUSTER",
        "spark.driver.extraClassPath": "/opt/cloudera/parcels/SPARK3/lib/hwc_for_spark3/hive-warehouse-connector-spark3-assembly-1.0.0.3.3.7190.2-1.jar",
        "spark.executor.extraClassPath": "/opt/cloudera/parcels/SPARK3/lib/hwc_for_spark3/hive-warehouse-connector-spark3-assembly-1.0.0.3.3.7190.2-1.jar",
        "spark.kryoserializer.buffer.max": "2000m"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5770,application_1713270977862_6554,pyspark,idle,Link,Link,,


In [4]:
print(f'Start Spark name:{spark._sc.appName}, version:{spark.version}')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5772,application_1713270977862_6556,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Start Spark name:livy-session-5772, version:3.3.2.3.3.7190.2-1

In [5]:
%%local
import os
username=os.getenv('USER', 'anonymous')
hadoop_fs=os.getenv('HADOOP_DEFAULT_FS', 'hdfs://iccluster067.iccluster.epfl.ch:8020')
print(f"local username={username}\nhadoop_fs={hadoop_fs}")

local username=malahlou
hadoop_fs=hdfs://iccluster067.iccluster.epfl.ch:8020


In [6]:
 # (prevent deprecated np.bool error since numpy 1.24, until a new version of pandas/Spark fixes this)
import numpy as np
np.bool = np.bool_

username=spark.conf.get('spark.executorEnv.USERNAME', 'anonymous')
hadoop_fs=spark.conf.get('spark.executorEnv.HADOOP_DEFAULT_FS','hdfs://iccluster067.iccluster.epfl.ch:8020')
print(f"remote username={username}\nhadoop_fs={hadoop_fs}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

remote username=malahlou
hadoop_fs=hdfs://iccluster067.iccluster.epfl.ch:8020

# Start the pre-processing steps

From the exercises,we know that the stops data contains the information about the stops geographical locations. The stops are provided with the SBB timetables and are updated once a week (usually on Wednesdays).
Since we know that it is updated usually on wednesdays, we picked February 21st.

In [7]:
base_path = '/data/sbb/orc/timetables/'
date = 'year=2024/month=2/day=21/'

def generate_path(base_path, dataset, date):
    return f"{base_path}{dataset}/{date}"

stops_path = generate_path(base_path, 'stops', date)
stop_times_path = generate_path(base_path, 'stop_times', date)
trips_path = generate_path(base_path, 'trips', date)
calendar_path = generate_path(base_path, 'calendar', date)

stops = spark.read.orc(stops_path)
stop_times = spark.read.orc(stop_times_path)
trips = spark.read.orc(trips_path)
calendar = spark.read.orc(calendar_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Keep only services that operate on weekdays

In [8]:
from functools import reduce
from pyspark.sql.functions import col

weekdays = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday']
condition = reduce(lambda acc, day: acc & (col(day) == 'TRUE'), weekdays[1:], col(weekdays[0]) == 'TRUE')
weekday_ids = calendar.filter(condition).select('service_id')
weekday_trips = trips.join(weekday_ids, on='service_id', how='inner').distinct()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Merge with the stops

In [9]:
nodes = weekday_trips.join(stop_times, on='trip_id', how='inner')
final_nodes = stops.join(nodes, on='stop_id', how='inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
from pyspark.sql.functions import *
final_nodes_arr = (final_nodes.drop("departure_time")
             .withColumnRenamed("arrival_time","time")
             .withColumn("is_arrival",lit(1)))

final_nodes_dep = (final_nodes.drop("arrival_time")
             .withColumnRenamed("departure_time","time")
             .withColumn("is_arrival", lit(0)))

final_nodes = final_nodes_arr.union(final_nodes_dep)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Enable support for ESRI UDF

In [11]:
spark.sql(f"""
ADD JARS
    {hadoop_fs}/data/jars/esri-geometry-api-2.2.4.jar
    {hadoop_fs}/data/jars/spatial-sdk-hive-2.2.0.jar
    {hadoop_fs}/data/jars/spatial-sdk-json-2.2.0.jar
""")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_Point AS 'com.esri.hadoop.hive.ST_Point'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_Distance AS 'com.esri.hadoop.hive.ST_Distance'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_SetSRID AS 'com.esri.hadoop.hive.ST_SetSRID'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_GeodesicLengthWGS84 AS 'com.esri.hadoop.hive.ST_GeodesicLengthWGS84'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_LineString AS 'com.esri.hadoop.hive.ST_LineString'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_AsBinary AS 'com.esri.hadoop.hive.ST_AsBinary'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_PointFromWKB AS 'com.esri.hadoop.hive.ST_PointFromWKB'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_GeomFromWKB AS 'com.esri.hadoop.hive.ST_GeomFromWKB'")
spark.sql("CREATE OR REPLACE TEMPORARY FUNCTION ST_Contains AS 'com.esri.hadoop.hive.ST_Contains'")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

### Choose the region : Lausanne

Find stops within a region, here we choose Lausanne region, but to change the Lausanne region, you need to change the stop objectid, equals right now to 1.

In [12]:
object_id = 1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
final_nodes.createOrReplaceTempView("final_nodes")
geo_shapes = spark.read.table("com490.geo_shapes")
geo_shapes.createOrReplaceTempView("geo_shapes")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

9801210

In [14]:
stops_area = spark.sql(f"""
SELECT
    n.stop_id,
    n.stop_name,
    n.stop_lat,
    n.stop_lon,
    n.location_type,
    n.parent_station,
    n.trip_id,
    n.service_id,
    n.route_id,
    n.trip_headsign,
    n.trip_short_name,
    n.direction_id,
    n.time,
    n.stop_sequence,
    n.pickup_type,
    n.drop_off_type,
    n.is_arrival
FROM final_nodes n
JOIN com490.geo_shapes g
ON ST_Contains(g.geometry, ST_Point(n.stop_lon, n.stop_lat))
WHERE g.objectid = {object_id}
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
nodes_area = stops_area.select(stops_area.stop_id,
                               stops_area.stop_name,
                               stops_area.stop_lat,
                               stops_area.stop_lon,
                               stops_area.parent_station).distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We choose unique ids such that it gives information about the stop id, the time, the trip id and if the nodes corresponds to an arrival or a departure.

In [16]:
import pyspark.sql.functions as F
stops_area = stops_area.withColumn("unique_stop_id",F.concat_ws("_", stops_area.stop_id,stops_area.time,
                                                stops_area.trip_id,stops_area.is_arrival))
filtered_data = stops_area.select("stop_id", "trip_id","route_id","unique_stop_id","time").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Building edges
We add edges to connect arrival nodes and departure nodes within the same station.

In [17]:
# Select relevant columns for arrival and departure
arrival_data = filtered_data.filter(F.col("is_arrival") == 1).select(
    F.col("stop_id"),
    F.col("trip_id"),
    F.col("route_id"),
    F.col("unique_stop_id"),
    F.col("time").alias("arrival_time")
)


departure_data = filtered_data.filter(F.col("is_arrival") == 0).select(
    F.col("stop_id"),
    F.col("trip_id"),
    F.col("route_id"),
    F.col("unique_stop_id"),
    F.col("time").alias("departure_time")
)


# Join stop_times with filtered arrival and departure data
filtered_data_arr = stop_times.join(arrival_data, on=["stop_id", "trip_id", "arrival_time"], how="inner")
filtered_data_dep = stop_times.join(departure_data, on=["stop_id", "trip_id", "departure_time"], how="inner")

# Union the results
filtered_data = filtered_data_dep.union(filtered_data_arr)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_spec = Window.partitionBy('trip_id').orderBy([
    col('stop_sequence').asc(),
    col('departure_time').asc(),
    col('unique_stop_id').desc()
])

filtered_data_pairs = filtered_data.withColumn('stop_id_dest', F.lead('stop_id').over(window_spec)) \
    .withColumn('arrival_time_dest', F.lead('arrival_time').over(window_spec)) \
    .withColumn('unique_stop_id_dest', F.lead('unique_stop_id').over(window_spec))

filtered_data_pairs = filtered_data_pairs.drop('arrival_time') \
    .withColumnRenamed('arrival_time_dest', 'arrival_time')

filtered_data_pairs = filtered_data_pairs.dropna(subset=['stop_id_dest'])

filtered_data_pairs = filtered_data_pairs.withColumn(
    'expected_travel_time',
    F.unix_timestamp('arrival_time', 'HH:mm:ss') - F.unix_timestamp('departure_time', 'HH:mm:ss')
)

filtered_data_pairs = filtered_data_pairs.filter(col('stop_id') != col('stop_id_dest'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
from pyspark.sql.functions import col, split, lit

filtered_data_pairs = filtered_data_pairs.select(
    col("unique_stop_id").alias("start_id"),
    col("unique_stop_id_dest").alias("end_id"),
    col("expected_travel_time")
)

# Split start_id column into separate columns
filtered_data_pairs = filtered_data_pairs.withColumn("start_stop_id", split(col("start_id"), "_")[0]) \
    .withColumn("start_time", split(col("start_id"), "_")[1]) \
    .withColumn("trip_id", split(col("start_id"), "_")[2])

# Split end_id column into separate columns
filtered_data_pairs = filtered_data_pairs.withColumn("end_stop_id", split(col("end_id"), "_")[0]) \
    .withColumn("end_time", split(col("end_id"), "_")[1])

filtered_data_pairs = filtered_data_pairs.drop("start_id", "end_id")

filtered_data_pairs = filtered_data_pairs.withColumn("is_walking", lit(0))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Walking edges
We add edges between close stations. 

In [20]:
from pyspark.sql.functions import col, lit

time_in_station = 2 * 60

# Filter stops with non-null parent_station
filtered_stops = stops_area.filter(col("parent_station").isNotNull())

# Self-join to find possible transfers
station_transfer_edges = (
    filtered_stops.alias("a")
    .join(
        filtered_stops.alias("b"),
        (col("a.parent_station") == col("b.parent_station")) &
        (col("a.stop_id") != col("b.stop_id")),
        "inner"
    )
    .select(
        col("a.stop_id").alias("stop_1"),
        col("b.stop_id").alias("stop_2")
    )
    .withColumn("transfer_time", lit(time_in_station))
    .distinct()
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
from pyspark.sql.functions import col, expr, round

max_walking_distance = 500  # meters
walking_speed = 50.0  # meters per minute

# Filter and prepare stops data
filtered_stops = stops_area.select(
    col("stop_id").alias("stop_1"),
    col("stop_lat").alias("lat_1"),
    col("stop_lon").alias("lon_1"),
    col("parent_station").alias("par_1")
).distinct()

# Self-join to find pairs of stops within walking distance
walking_distance_station_edges = (
    filtered_stops.alias("a")
    .crossJoin(
        filtered_stops.alias("b")
    )
    .filter(expr("a.stop_1 != b.stop_1"))
    .withColumn("line", expr("ST_SetSRID(ST_LineString(a.lon_1, a.lat_1, b.lon_1, b.lat_1), 4326)"))
    .withColumn("distance", expr("ST_GeodesicLengthWGS84(line)"))
    .filter((col("distance") <= max_walking_distance) & (col("distance") > 0.0))
    .withColumn("transfer_time", round(col("distance") * (60.0 / walking_speed), 0))
    .select(
        col("a.stop_1"),
        col("b.stop_1").alias("stop_2"),
        "transfer_time"
    )
    .distinct()
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
#station_transfer_edges.show(2, vertical=True, truncate=True)
#station_transfer_edges.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
#walking_distance_station_edges.show(2, vertical=True, truncate=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3474

In [24]:
walking_edges = (walking_distance_station_edges.union(station_transfer_edges)).withColumn("is_walking",lit(1))#
walking_edges = walking_edges.withColumnRenamed("stop_1","start_stop_id").withColumnRenamed("stop_2","end_stop_id").withColumnRenamed("transfer_time","expected_travel_time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
walking_edges_copy = walking_edges.withColumn("trip_id", lit(None)).withColumn("start_time", lit(None)).withColumn("end_time", lit(None))
all_edges = filtered_data_pairs.unionByName(walking_edges_copy)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
all_edges.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[expected_travel_time: double, start_stop_id: string, start_time: string, trip_id: string, end_stop_id: string, end_time: string, is_walking: int]

In [27]:
nodes_area.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[stop_id: string, stop_name: string, stop_lat: decimal(15,12), stop_lon: decimal(15,12), parent_station: string]

In [28]:
nodes_area_path = f"/user/{username}/graph/nodes_area"
nodes_area.write.mode("overwrite").format("orc").save(nodes_area_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
all_edges_path = f"/user/{username}/graph/all_edges"
all_edges.write.mode("overwrite").format("orc").save(all_edges_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
py4j does not exist in the JVM
Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.2-1-1.p0.46867244/lib/spark3/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save
    self._jwrite.save(path)
  File "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.2-1-1.p0.46867244/lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.2-1-1.p0.46867244/lib/spark3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 192, in deco
    converted = convert_exception(e.java_exception)
  File "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.2-1-1.p0.46867244/lib/spark3/python/lib/pyspark.zip/pyspark/sql/utils.py", line 156, in convert_exception
    elif is_instance_of(gw, e, "org.apache.spark.sql.AnalysisException"):
  File "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.2-1-1.p0.46867244/lib/spark3/python/lib/py4j-0.10.9.

In [30]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…