## 0. Create Spark session

In [63]:
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext

os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--packages org.apache.hadoop:hadoop-aws:3.2.0,org.apache.hadoop:hadoop-common:3.2.0 '
    'pyspark-shell'
)
os.environ['S3_ENDPOINT'] = "http://minio:9000"
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = "minio123"

if 'spark' in globals():
    spark.stop()
    print("Spark session stopped.")
else:
    print("No Spark session to stop.")

spark = (
    SparkSession.builder.master("spark://spark:7077")
    .appName("notebook-velib-to-siler")
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
    .config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT"))
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "1")
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000")
    .getOrCreate()
)

Spark session stopped.


## 1. Extract from object storage

In [64]:
os.environ['S3_INPUT_PATH'] = "s3a://velib/silver/velib-disponibilite-en-temps-reel"

spark.sparkContext.setLogLevel("WARN")

df = spark.read.parquet(os.getenv("S3_INPUT_PATH"))
df.printSchema()
print(f'COUNT = {df.count()}')
df.limit(10).toPandas()

                                                                                

root
 |-- capacity: long (nullable = true)
 |-- code_insee_commune: string (nullable = true)
 |-- duedate: string (nullable = true)
 |-- duedate_timestamp_minute: timestamp (nullable = true)
 |-- ebike: long (nullable = true)
 |-- fill_percentage: double (nullable = true)
 |-- fill_ratio: double (nullable = true)
 |-- is_installed: string (nullable = true)
 |-- is_renting: string (nullable = true)
 |-- is_returning: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- mechanical: long (nullable = true)
 |-- name: string (nullable = true)
 |-- nom_arrondissement_communes: string (nullable = true)
 |-- numbikesavailable: long (nullable = true)
 |-- numdocksavailable: long (nullable = true)
 |-- part_minute: string (nullable = true)
 |-- part_month: string (nullable = true)
 |-- polldate: string (nullable = true)
 |-- polldate_timestamp_minute: timestamp (nullable = true)
 |-- stationcode: string (nullable = true)
 |-- part_day: date (nullabl

                                                                                

COUNT = 1127673


Unnamed: 0,capacity,code_insee_commune,duedate,duedate_timestamp_minute,ebike,fill_percentage,fill_ratio,is_installed,is_renting,is_returning,lat,lon,mechanical,name,nom_arrondissement_communes,numbikesavailable,numdocksavailable,part_minute,part_month,polldate,polldate_timestamp_minute,stationcode,part_day
0,35,75056,2024-10-29T10:05:10+00:00,2024-10-29 10:05:00,0,0.0,0.0,OUI,OUI,OUI,48.865983,2.275725,0,Benjamin Godard - Victor Hugo,Paris,0,35,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,16107,2024-10-29
1,21,75056,2024-10-29T10:10:45+00:00,2024-10-29 10:10:00,1,4.8,0.048,OUI,OUI,OUI,48.879296,2.33736,0,Toudouze - Clauzel,Paris,1,20,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,9020,2024-10-29
2,20,94081,2024-10-29T10:08:38+00:00,2024-10-29 10:08:00,6,50.0,0.5,OUI,OUI,OUI,48.778193,2.396302,4,Rouget de L'isle - Watteau,Vitry-sur-Seine,10,9,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,44015,2024-10-29
3,25,75056,2024-10-29T10:06:12+00:00,2024-10-29 10:06:00,0,0.0,0.0,OUI,OUI,OUI,48.837526,2.336035,0,Cassini - Denfert-Rochereau,Paris,0,20,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,14111,2024-10-29
4,20,75056,2024-10-29T10:09:41+00:00,2024-10-29 10:09:00,0,5.0,0.05,OUI,OUI,OUI,48.855908,2.392571,1,Charonne - Robert et Sonia Delaunay,Paris,1,19,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,11104,2024-10-29
5,60,75056,2024-10-29T10:10:07+00:00,2024-10-29 10:10:00,13,33.3,0.333,OUI,OUI,OUI,48.819428,2.343335,7,Jourdan - Stade Charléty,Paris,20,36,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,14014,2024-10-29
6,12,75056,2024-10-29T10:09:20+00:00,2024-10-29 10:09:00,9,91.7,0.917,OUI,OUI,OUI,48.875448,2.315508,2,Messine - Place Du Pérou,Paris,11,1,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,8026,2024-10-29
7,22,93066,2024-10-29T10:10:19+00:00,2024-10-29 10:10:00,4,22.7,0.227,OUI,OUI,OUI,48.936269,2.358867,1,Basilique,Saint-Denis,5,16,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,32017,2024-10-29
8,48,75056,2024-10-29T10:06:49+00:00,2024-10-29 10:06:00,2,8.3,0.083,OUI,OUI,OUI,48.835093,2.353468,2,Le Brun - Gobelins,Paris,4,44,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,13007,2024-10-29
9,21,75056,2024-10-29T10:09:23+00:00,2024-10-29 10:09:00,8,81.0,0.81,OUI,OUI,OUI,48.851654,2.330808,9,Saint-Sulpice,Paris,17,3,2024-10-29T11:05,2024-10,2024-10-29T11:05:00+00:00,2024-10-29 11:05:00,6003,2024-10-29


## 2. Transform

In [79]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("stationcode", "ten_minute_interval").orderBy("polldate_timestamp_minute")

df_transformed = df#.dropDuplicates(["stationcode", "polldate_timestamp_minute"])
df_transformed = (
    df_transformed
    .withColumn("ten_minute_interval", F.substring("polldate", 1, 15))
    .withColumn("numbikesavailable_abs_lag_diff",
        F.abs(F.col("numbikesavailable") - lag("numbikesavailable", 1).over(windowSpec)))
    )
df_transformed = (
    df_transformed
    .groupBy("stationcode", "ten_minute_interval")
    .agg(
        F.sum(F.col("numbikesavailable_abs_lag_diff")).alias("turnover_rate_10min"),
        F.first(F.col("name")).alias("name"),
        F.first(F.col("capacity")).alias("capacity"),
        F.first(F.col("part_day")).alias("part_day"),
        F.first(F.col("polldate_timestamp_minute")).alias("polldate_timestamp_minute")
    )
    .filter(F.col("turnover_rate_10min") < 500)
    .na.fill({'turnover_rate_10min': .0})
)
df_transformed = df_transformed.select(*(sorted(df_transformed.columns)))
df_transformed.printSchema()
print(f'COUNT = {df_transformed.count()}')

import pandas as pd
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', 1000)
# df_transformed.orderBy("ten_minute_interval", ascending=True).limit(10).toPandas()
# df_transformed.select("turnover_rate_10min").distinct().orderBy("turnover_rate_10min", ascending=False).limit(100).toPandas()
df_transformed.limit(10).toPandas()
# df_transformed.summary().show()

root
 |-- capacity: long (nullable = true)
 |-- name: string (nullable = true)
 |-- part_day: date (nullable = true)
 |-- polldate_timestamp_minute: timestamp (nullable = true)
 |-- stationcode: string (nullable = true)
 |-- ten_minute_interval: string (nullable = true)
 |-- turnover_rate_10min: long (nullable = true)



                                                                                

COUNT = 126759


                                                                                

Unnamed: 0,capacity,name,part_day,polldate_timestamp_minute,stationcode,ten_minute_interval,turnover_rate_10min
0,48,Metz - Faubourg Saint-Denis,2024-10-29,2024-10-29 21:20:00,10004,2024-10-29T21:2,4
1,48,Metz - Faubourg Saint-Denis,2024-10-29,2024-10-29 22:50:00,10004,2024-10-29T22:5,5
2,18,Cité Riverin - Château d'Eau,2024-10-30,2024-10-30 02:10:00,10008,2024-10-30T02:1,0
3,15,Gare de l'Est - Fidélité,2024-10-30,2024-10-30 01:10:00,10017,2024-10-30T01:1,0
4,11,Chabrol - d'Hauteville,2024-10-29,2024-10-29 09:10:00,10020,2024-10-29T09:1,0
5,11,Chabrol - d'Hauteville,2024-10-29,2024-10-29 20:31:00,10020,2024-10-29T20:3,11
6,19,Buisson Saint-Louis - Saint-Maur,2024-10-29,2024-10-29 14:30:00,10024,2024-10-29T14:3,1
7,59,Gare de l'Est - Chateau Landon,2024-10-30,2024-10-30 00:30:00,10026,2024-10-30T00:3,0
8,24,Dunkerque - Rocroy,2024-10-29,2024-10-29 10:01:00,10029,2024-10-29T10:0,0
9,43,Saint-Denis - Rivoli,2024-10-29,2024-10-29 22:20:00,1003,2024-10-29T22:2,9


## 3. Load into object storage

In [80]:
os.environ['S3_OUTPUT_PATH'] = "s3a://velib/gold/turnover-rate-by-station-10min"

spark.sparkContext.setLogLevel("WARN")

# Write DataFrame to S3
(
    df_transformed.write
    .partitionBy("part_day")
    .format("parquet")
    .mode("overwrite")
    .save(os.getenv('S3_OUTPUT_PATH'))
)

                                                                                

## 4. Update hive metastore

In [5]:
!pip install trino



In [81]:
from pyspark.sql import DataFrame

def generate_trino_create_table(
    df: DataFrame, 
    catalog: str, 
    schema: str, 
    table: str, 
    partitioned_by: str, 
    external_location: str
) -> str:
    # Comprehensive mapping of Spark SQL types to Trino types
    type_mapping = {
        "bigint": "BIGINT",
        "binary": "VARBINARY",
        "boolean": "BOOLEAN",
        "decimal": "DECIMAL",  # Precision and scale will need to be handled separately if defined
        "double": "DOUBLE",
        "float": "REAL",
        "int": "INTEGER",
        "smallint": "SMALLINT",
        "string": "VARCHAR",
        "timestamp": "TIMESTAMP",
        "tinyint": "TINYINT"
    }
    
    # Separate columns to ensure partition column is placed last
    columns = []
    partition_column = None
    for field in df.schema.fields:
        spark_type = field.dataType.simpleString()
        trino_type = type_mapping.get(spark_type, "VARCHAR")  # default to VARCHAR if no match
        # DEBUG # print(f'field.name={field.name}, spark_type={spark_type}, trino_type={trino_type}')
        if field.name == partitioned_by:
            partition_column = f"{field.name} {trino_type}"
        else:
            columns.append(f"{field.name} {trino_type}")
    
    # Add the partition column to the end if it exists in the schema
    if partition_column:
        columns.append(partition_column)
    
    # Join column definitions into a single string
    columns_definition = ",\n    ".join(columns)
    
    # Generate the final CREATE TABLE statement
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table} (
        {columns_definition}
    ) 
    WITH (
        format = 'PARQUET',
        partitioned_by = ARRAY['{partitioned_by}'],
        external_location = '{external_location}'
    )
    """
    
    return create_table_sql.strip()


In [82]:
import trino

host, port, user = 'trino-coordinator', 8080, 'trino'
conn = trino.dbapi.connect(host=host, port=port, user=user)
cur = conn.cursor()

catalog, schema, table = 'minio', 'velib_gold', 'turnover_rate_by_station_10min'
schema_location = 's3a://velib/gold/'
partitioned_by = 'part_day'
external_location = 's3a://velib/gold/turnover-rate-by-station-10min'

# List of queries to execute
queries = [
    f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema} WITH (location = '{schema_location}')",
    f"DROP TABLE IF EXISTS {catalog}.{schema}.{table}",
    
    generate_trino_create_table(df_transformed, catalog, schema, table, partitioned_by, external_location),
    
    f"USE {catalog}.{schema}",
    f"CALL system.sync_partition_metadata('{schema}', '{table}', 'ADD')",
    f"SELECT * FROM {catalog}.{schema}.{table} LIMIT 5"
]

# Execute each query in the list
for query in queries:
    try:
        cur.execute(query)
        # Check if the query is a SELECT query to fetch results
        if query.startswith("SELECT"):
            results = cur.fetchall()
            for row in results:
                print(row)
        else:
            print(f"Executed: {query}")
    except Exception as e:
        print(f"Error executing query: {query}. Error: {e}")

# Close the cursor and connection
cur.close()
conn.close()


Executed: CREATE SCHEMA IF NOT EXISTS minio.velib_gold WITH (location = 's3a://velib/gold/')
Executed: DROP TABLE IF EXISTS minio.velib_gold.turnover_rate_by_station_10min
Executed: CREATE TABLE IF NOT EXISTS minio.velib_gold.turnover_rate_by_station_10min (
        capacity BIGINT,
    name VARCHAR,
    polldate_timestamp_minute TIMESTAMP,
    stationcode VARCHAR,
    ten_minute_interval VARCHAR,
    turnover_rate_10min BIGINT,
    part_day VARCHAR
    ) 
    WITH (
        format = 'PARQUET',
        partitioned_by = ARRAY['part_day'],
        external_location = 's3a://velib/gold/turnover-rate-by-station-10min'
    )
Executed: USE minio.velib_gold
Executed: CALL system.sync_partition_metadata('velib_gold', 'turnover_rate_by_station_10min', 'ADD')
[62, 'René Boulanger - Lancry', datetime.datetime(2024, 10, 29, 14, 40), '10001', '2024-10-29T14:4', 0, '2024-10-29']
[62, 'René Boulanger - Lancry', datetime.datetime(2024, 10, 29, 16, 0), '10001', '2024-10-29T16:0', 0, '2024-10-29']
[28, 