# Define region names here

In [25]:
region_names = ['Lausanne', 'Ouest lausannois']
region_tuple_sql = "(" + ",".join(f"'{r}'" for r in region_names) + ")"

# Environment settings

In [2]:
groupName='L1'

In [3]:
!pip install networkx
import networkx as nx

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [4]:
import os
import warnings

warnings.simplefilter(action='ignore', category=UserWarning)
warnings.filterwarnings("ignore", category=UserWarning, message="pandas only supports SQLAlchemy connectable .*")

In [5]:
import base64 as b64
import json
import time
import re

def getUsername():
    payload = os.environ.get('EPFL_COM490_TOKEN').split('.')[1]
    payload=payload+'=' * (4 - len(payload) % 4)
    obj = json.loads(b64.urlsafe_b64decode(payload))
    if (time.time() > int(obj.get('exp')) - 3600):
        raise Exception('Your credentials have expired, please restart your Jupyter Hub server:'
                        'File>Hub Control Panel, Stop My Server, Start My Server.')
    time_left = int((obj.get('exp') - time.time())/3600)
    return obj.get('sub'), time_left

In [6]:
username, validity_h = getUsername()
hadoopFS = os.environ.get('HADOOP_FS')
namespace = 'iceberg.' + username
sharedNS = 'iceberg.com490_iceberg'

if not re.search('[A-Z][0-9]', groupName):
    raise Exception('Invalid group name {groupName}')

print(f"you are: {username}")
print(f"credentials validity: {validity_h} hours left.")
print(f"shared namespace is: {sharedNS}")
print(f"your namespace is: {namespace}")
print(f"your group is: {groupName}")

you are: valat
credentials validity: 167 hours left.
shared namespace is: iceberg.com490_iceberg
your namespace is: iceberg.valat
your group is: L1


In [7]:
import trino
from contextlib import closing
from urllib.parse import urlparse
from trino.dbapi import connect
from trino.auth import BasicAuthentication, JWTAuthentication

trinoAuth = JWTAuthentication(os.environ.get('EPFL_COM490_TOKEN'))
trinoUrl  = urlparse(os.environ.get('TRINO_URL'))
Query=[]

print(f"Warehouse URL: {trinoUrl.scheme}://{trinoUrl.hostname}:{trinoUrl.port}/")

conn = connect(
    host=trinoUrl.hostname,
    port=trinoUrl.port,
    auth=trinoAuth,
    http_scheme=trinoUrl.scheme,
    verify=True
)

print('Connected!')

Warehouse URL: https://iccluster028.iccluster.epfl.ch:8443/
Connected!


In [8]:
import pandas as pd
pd.read_sql(f"""SHOW TABLES IN {sharedNS}""", conn)

Unnamed: 0,Table
0,geo
1,sbb_calendar
2,sbb_calendar_dates
3,sbb_istdaten
4,sbb_routes
5,sbb_stop_times
6,sbb_stops
7,sbb_stops_lausanne_region
8,sbb_transfers
9,sbb_trips


In [9]:
pd.read_sql(f"""SHOW TABLES IN {namespace}""", conn)

Unnamed: 0,Table
0,sbb_stop_times_lausanne_region
1,sbb_stops_cleaned
2,sbb_stops_lausanne_region
3,sbb_stops_to_stops_lausanne_region


In [10]:
def sql_fetch(queries, conn, batch_size=100, include_header=True):
    if isinstance(queries, str):
        queries = [queries]
        
    with closing(conn.cursor()) as cur:
        for query in queries:
            cur.execute(query)
            # Only attempt to fetch results if the query is a SELECT statement
            # or another statement that returns data
            if cur.description:  # Check if description exists (not None)
                if include_header:
                    yield [desc[0] for desc in cur.description]
                while (rows := cur.fetchmany(batch_size)):
                    for row in rows:
                        yield row
            else:
                # For non-SELECT queries (like CREATE, DROP), just yield a status message
                yield ["Query executed successfully"]

# Create and save in HDFS tables related to selected region(s)

### First drop all tables

In [26]:
list(sql_fetch([
    f"""DROP SCHEMA IF EXISTS {namespace} CASCADE""", # CASCADE will drop all the tables
    f"""CREATE SCHEMA IF NOT EXISTS {namespace}""",
], conn))

cursor = conn.cursor()
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {namespace}")
query = f"SHOW TABLES IN {namespace}"
pd.read_sql(query, conn)

Unnamed: 0,Table


### Delete already saved tables from hdfs 

In [27]:
!hdfs dfs -rm -r /user/com-490/group/L1/sbb_stops_cleaned
!hdfs dfs -rm -r /user/com-490/group/L1/sbb_stops_selected_regions
!hdfs dfs -rm -r /user/com-490/group/L1/sbb_stops_to_stops_selected_regions
!hdfs dfs -rm -r /user/com-490/group/L1/sbb_stop_times_selected_regions

rm: `/user/com-490/group/L1/sbb_stops_cleaned': No such file or directory
rm: `/user/com-490/group/L1/sbb_stops_selected_regions': No such file or directory
rm: `/user/com-490/group/L1/sbb_stops_to_stops_selected_regions': No such file or directory
rm: `/user/com-490/group/L1/sbb_stop_times_selected_regions': No such file or directory


### sbb_stops_cleaned

In [28]:
cursor.execute(f"DROP TABLE IF EXISTS {namespace}.sbb_stops_cleaned")

query = f"""
CREATE TABLE {namespace}.sbb_stops_cleaned
WITH (
  format   = 'PARQUET',
  location = '{hadoopFS}/user/com-490/group/L1/sbb_stops_cleaned/'
)
AS
SELECT 
    stop_id,
    SPLIT(
        REGEXP_REPLACE(
            REGEXP_REPLACE(stop_id, '^Parent', ''),
            'P$', ''
        ), ':'
    )[1] AS stop_id_cleaned,
    stop_name,
    stop_lat,
    stop_lon,
    pub_date
FROM iceberg.com490_iceberg.sbb_stops
WHERE stop_id IS NOT NULL
"""

cursor = conn.cursor()
cursor.execute(query)

<trino.dbapi.Cursor at 0x7f2845a8e4a0>

### sbb_stops_region 

In [29]:
# Drop the table if exists
drop_ctas = f"""
    DROP TABLE IF EXISTS {namespace}.sbb_stops_selected_regions
"""

# Create the required table
create_ctas = f"""
CREATE TABLE {namespace}.sbb_stops_selected_regions
WITH (
  format   = 'PARQUET',
  location = '{hadoopFS}/user/com-490/group/L1/sbb_stops_selected_regions/'
)
AS
SELECT DISTINCT
    stops.stop_id_cleaned,
    stops.stop_name,
    stops.stop_lat,
    stops.stop_lon
FROM {namespace}.sbb_stops_cleaned AS stops
JOIN {sharedNS}.geo AS g
  ON g.name IN {region_tuple_sql}
WHERE 
      stops.pub_date >= DATE '2024-07-01'
  AND stops.pub_date <  DATE '2024-07-08'
  AND ST_Contains(
        ST_GeomFromBinary(g.wkb_geometry),
        ST_Point(stops.stop_lon, stops.stop_lat)
      )
"""

result = list(sql_fetch([drop_ctas, create_ctas], conn))

## sbb_stops_to_stops_region
Contains stops within 500m of each other, as directed pairs.

In [30]:
%%time
drop_table_sql = f"""
    DROP TABLE IF EXISTS {namespace}.sbb_stops_to_stops_selected_regions
"""

create_table_sql = f"""
    CREATE TABLE {namespace}.sbb_stops_to_stops_selected_regions
    WITH (
      format   = 'PARQUET',
      location = '{hadoopFS}/user/com-490/group/L1/sbb_stops_to_stops_selected_regions/'
    )
    AS
    WITH cte_stops AS (
        SELECT 
            stop_id_cleaned,
            stop_name,
            stop_lat,
            stop_lon
        FROM {namespace}.sbb_stops_selected_regions
    ),
    pairs_with_distance AS (
        SELECT
            A.stop_id_cleaned AS stop_id_a,
            B.stop_id_cleaned AS stop_id_b,
            (
                6371000.0 * 2 * ASIN(
                    SQRT(
                        SIN(RADIANS((B.stop_lat - A.stop_lat) / 2)) * SIN(RADIANS((B.stop_lat - A.stop_lat) / 2)) +
                        COS(RADIANS(A.stop_lat)) * COS(RADIANS(B.stop_lat)) *
                        SIN(RADIANS((B.stop_lon - A.stop_lon) / 2)) * SIN(RADIANS((B.stop_lon - A.stop_lon) / 2))
                    )
                )
            ) AS distance
        FROM cte_stops A
        CROSS JOIN cte_stops B
        WHERE A.stop_id_cleaned <> B.stop_id_cleaned
    )
    SELECT *
    FROM pairs_with_distance
    WHERE distance <= 500
"""

list(sql_fetch([drop_table_sql, create_table_sql], conn))

CPU times: user 32.7 ms, sys: 10 μs, total: 32.7 ms
Wall time: 1.13 s


[['Query executed successfully'], ['rows'], [2920]]

## sbb_stop_times_region 
It contains the stop times and weekdays of trips (trip_id) servicing stops found previously in the selected region(s).

In [31]:
%%time
drop_table_sql = f"""
    DROP TABLE IF EXISTS {namespace}.sbb_stop_times_selected_regions
"""

query = f"""
CREATE TABLE {namespace}.sbb_stop_times_selected_regions
    WITH (
      format   = 'PARQUET',
      location = '{hadoopFS}/user/com-490/group/L1/sbb_stop_times_selected_regions/'
    )
    AS
    SELECT 
        st.trip_id,
        st.stop_id,
        st.departure_time,
        st.arrival_time,
        c.monday,
        c.tuesday,
        c.wednesday,
        c.thursday,
        c.friday,
        c.saturday,
        c.sunday
    FROM {sharedNS}.sbb_stop_times st
    JOIN {sharedNS}.sbb_trips t
        ON st.trip_id = t.trip_id
        AND st.pub_date = t.pub_date
    JOIN {sharedNS}.sbb_calendar c 
        ON t.service_id = c.service_id 
        AND t.pub_date = c.pub_date
    JOIN {namespace}.sbb_stops_selected_regions slr 
        ON st.stop_id = slr.stop_id_cleaned
    WHERE st.pub_date = (
        SELECT MAX(pub_date) 
        FROM {sharedNS}.sbb_calendar 
        WHERE pub_date <= DATE '2024-07-07'
    )
"""

list(sql_fetch([drop_table_sql, query], conn))

CPU times: user 56.6 ms, sys: 193 μs, total: 56.8 ms
Wall time: 7.29 s


[['Query executed successfully'], ['rows'], [989352]]