In [1]:
import os
import findspark
import boto3
import clickhouse_connect
from clickhouse_driver import Client
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/temurin-22.jdk/Contents/Home"
os.environ["SPARK_HOME"] = "spark-3.5.1-bin-hadoop3"
findspark.init()
findspark.find() # Should return '/content/spark-3.5.1-bin-hadoop3'

'spark-3.5.1-bin-hadoop3'

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

SparkContext.setSystemProperty('spark.executor.memory', '2g')

spark = (
    SparkSession.builder.master("local")
    .appName("Colab")
    .config("spark.ui.port", "4051")
    .getOrCreate()
)
sc = spark.sparkContext

sc

24/05/12 22:20:45 WARN Utils: Your hostname, Joses-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.20.10.4 instead (on interface en0)
24/05/12 22:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 22:20:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
session = boto3.Session(
    aws_access_key_id=os.environ["CUBBIT_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["CUBBIT_SECRET_ACCESS_KEY"],
    region_name="eu-west-1",
)

s3 = session.client("s3", endpoint_url="https://s3.cubbit.eu")

In [4]:
drivers_parquet = s3.download_file(Bucket="f1-bucket", Key="drivers.parquet", Filename="./data/drivers.parquet")

In [5]:
df = spark.read.parquet('./data/drivers.parquet')

                                                                                

In [10]:
non_duplicates = df.dropDuplicates(['driver_number'])
non_duplicates.show(47)

+-------------+-----------------+------------+---------------+------------+-----------+-----------+
|driver_number|        full_name|name_acronym|      team_name|country_code|session_key|meeting_key|
+-------------+-----------------+------------+---------------+------------+-----------+-----------+
|            1|   Max VERSTAPPEN|         VER|Red Bull Racing|         NED|       7763|       1140|
|            2|   Logan SARGEANT|         SAR|       Williams|         USA|       7763|       1140|
|            3| Daniel RICCIARDO|         RIC|     AlphaTauri|         AUS|       9127|       1215|
|            4|     Lando NORRIS|         NOR|        McLaren|         GBR|       7763|       1140|
|            5|Gabriel BORTOLETO|         BOR|           NULL|        NULL|       9223|       1215|
|            6|    Oliver GOETHE|         GOE|           NULL|        NULL|       9223|       1215|
|            7| Kaylen FREDERICK|         FRE|           NULL|        NULL|       9223|       1215|


In [9]:
drivers = df.toPandas()
drivers['team_name'].value_counts()

team_name
McLaren            180
Alpine             180
Ferrari            180
Mercedes           180
Haas F1 Team       179
Red Bull Racing    178
Aston Martin       178
Williams           174
AlphaTauri         130
Alfa Romeo         128
RB                  50
Kick Sauber         50
Name: count, dtype: int64

In [8]:
pandas_drivers = non_duplicates.toPandas()
pandas_drivers['team_name'].value_counts()

team_name
AlphaTauri         5
Red Bull Racing    4
Williams           3
Alpine             3
Aston Martin       3
Ferrari            3
Haas F1 Team       3
Alfa Romeo         3
Mercedes           3
McLaren            2
Name: count, dtype: int64

In [8]:
s3.download_file(Bucket="f1-bucket", Key="pit.parquet", Filename="./data/pit.parquet")

df_pit = spark.read.parquet('./data/pit.parquet')
df_pit.show()

+------------+----------+-------------+--------------------+-----------+-----------+
|pit_duration|lap_number|driver_number|                date|session_key|meeting_key|
+------------+----------+-------------+--------------------+-----------+-----------+
|        35.3|         3|           14|2023-06-02T11:33:...|       9095|       1211|
|        40.8|         4|           18|2023-06-02T11:35:...|       9095|       1211|
|       155.2|         3|           21|2023-06-02T11:36:...|       9095|       1211|
|       277.0|         2|            2|2023-06-02T11:37:...|       9095|       1211|
|        27.6|         5|           16|2023-06-02T11:37:...|       9095|       1211|
|       343.6|         2|           81|2023-06-02T11:38:...|       9095|       1211|
|        24.9|         6|           11|2023-06-02T11:38:...|       9095|       1211|
|        47.4|         6|           20|2023-06-02T11:38:...|       9095|       1211|
|       381.5|         2|            4|2023-06-02T11:39:...|     

In [9]:
cols_to_drop = ['date', 'meeting_key']

filtered_df_pit = df_pit.drop(*cols_to_drop)
filtered_df_pit.show()

+------------+----------+-------------+-----------+
|pit_duration|lap_number|driver_number|session_key|
+------------+----------+-------------+-----------+
|        35.3|         3|           14|       9095|
|        40.8|         4|           18|       9095|
|       155.2|         3|           21|       9095|
|       277.0|         2|            2|       9095|
|        27.6|         5|           16|       9095|
|       343.6|         2|           81|       9095|
|        24.9|         6|           11|       9095|
|        47.4|         6|           20|       9095|
|       381.5|         2|            4|       9095|
|        23.5|         6|            1|       9095|
|        40.8|         6|           27|       9095|
|       517.4|         3|           77|       9095|
|       298.0|         5|           18|       9095|
|        25.1|         9|           23|       9095|
|       623.1|         3|           24|       9095|
|       245.8|         8|           22|       9095|
|        23.

In [3]:
# s3.download_file(Bucket="f1-bucket", Key="sessions.parquet", Filename="./data/sessions.parquet")

df_sessions = spark.read.parquet('./data/sessions.parquet')
df_sessions.show()

                                                                                

+---------+-----------+------------+------------+-----------+------------------+------------+------------+--------------------+--------------------+----------+-----------+-----------+----+
| location|country_key|country_code|country_name|circuit_key|circuit_short_name|session_type|session_name|          date_start|            date_end|gmt_offset|session_key|meeting_key|year|
+---------+-----------+------------+------------+-----------+------------------+------------+------------+--------------------+--------------------+----------+-----------+-----------+----+
|   Sakhir|         36|         BRN|     Bahrain|         63|            Sakhir|    Practice|  Practice 1|2023-02-23T07:00:...|2023-02-23T16:30:...|  03:00:00|       9222|       1140|2023|
|   Sakhir|         36|         BRN|     Bahrain|         63|            Sakhir|    Practice|  Practice 2|2023-02-24T07:00:...|2023-02-24T16:30:...|  03:00:00|       7763|       1140|2023|
|   Sakhir|         36|         BRN|     Bahrain|      

In [3]:
# s3.download_file(Bucket="f1-bucket", Key="position.parquet", Filename="./data/position.parquet")

df_positions = spark.read.parquet('./data/position.parquet')
df_positions.show(500)
display(df_positions.count())

                                                                                

+--------+-------------+--------------------+-----------+-----------+
|position|driver_number|                date|session_key|meeting_key|
+--------+-------------+--------------------+-----------+-----------+
|       1|            1|2023-02-23T06:54:...|       9222|       1140|
|       2|            1|2023-02-23T07:00:...|       9222|       1140|
|       3|            1|2023-02-23T07:00:...|       9222|       1140|
|       4|            1|2023-02-23T07:00:...|       9222|       1140|
|       5|            1|2023-02-23T07:00:...|       9222|       1140|
|       6|            1|2023-02-23T07:00:...|       9222|       1140|
|       1|            1|2023-02-23T07:04:...|       9222|       1140|
|       2|            1|2023-02-23T07:33:...|       9222|       1140|
|       1|            1|2023-02-23T07:34:...|       9222|       1140|
|       2|            1|2023-02-23T07:56:...|       9222|       1140|
|       3|            1|2023-02-23T08:09:...|       9222|       1140|
|       4|          

104587

In [4]:
_ = spark.sql("DROP TABLE IF EXISTS positions_parquet_v4")
df_positions.write.saveAsTable("positions_parquet_v5")

24/05/10 14:19:22 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
query = """
WITH ranked_positions AS (
    SELECT 
        driver_number, 
        session_key, 
        date, 
        position,
        ROW_NUMBER() OVER (PARTITION BY driver_number, session_key ORDER BY date DESC) as rn
    FROM positions_parquet_v5
)
SELECT 
    driver_number, 
    session_key, 
    date,
    position
FROM ranked_positions
WHERE rn = 1
"""

df_positions_max_date = spark.sql(query)
df_positions_max_date = df_positions_max_date.withColumn("date", df_positions_max_date["date"].cast("date"))
df_positions_max_date.show(500)

+-------------+-----------+----------+--------+
|driver_number|session_key|      date|position|
+-------------+-----------+----------+--------+
|            1|       7763|2023-02-24|       2|
|            1|       7765|2023-03-03|       3|
|            1|       7766|2023-03-03|       2|
|            1|       7767|2023-03-04|       2|
|            1|       7768|2023-03-04|       1|
|            1|       7772|2023-03-17|       1|
|            1|       7773|2023-03-17|       1|
|            1|       7774|2023-03-18|       1|
|            1|       7775|2023-03-18|      15|
|            1|       7779|2023-03-19|       2|
|            1|       7780|2023-03-31|       1|
|            1|       7781|2023-03-31|       3|
|            1|       7782|2023-04-01|       1|
|            1|       7783|2023-04-01|       1|
|            1|       7787|2023-04-02|       1|
|            1|       7953|2023-03-05|       1|
|            1|       9063|2023-04-28|       1|
|            1|       9064|2023-04-28|  

In [None]:
query = """
SELECT MAX(date), session_key, driver_number, position
FROM position
GROUP BY session_key, driver_number, position;
"""
results = spark.sql(query)
results.show(500)

+--------------------+-----------+-------------+--------+
|           max(date)|session_key|driver_number|position|
+--------------------+-----------+-------------+--------+
|2023-02-24T12:43:...|       7763|            1|       1|
|2023-02-24T16:00:...|       7763|            1|       2|
|2023-02-24T12:21:...|       7763|            1|       3|
|2023-02-24T07:00:...|       7763|            1|       4|
|2023-02-24T07:00:...|       7763|            1|       5|
|2023-02-24T07:00:...|       7763|            1|       6|
|2023-02-24T07:01:...|       7763|            1|       7|
|2023-02-24T07:01:...|       7763|            1|       8|
|2023-02-24T07:02:...|       7763|            1|       9|
|2023-02-24T07:02:...|       7763|            1|      10|
|2023-02-24T07:08:...|       7763|            1|      11|
|2023-02-24T10:31:...|       7763|            2|       2|
|2023-02-24T12:28:...|       7763|            2|       3|
|2023-02-24T13:57:...|       7763|            2|       4|
|2023-02-24T15

In [6]:
# s3.download_file(Bucket="f1-bucket", Key="weather.parquet", Filename="./data/weather.parquet")

df_weather = spark.read.parquet('./data/weather.parquet')
df_weather.show()

+---------------+--------+--------+--------+-----------------+--------------+----------+--------------------+-----------+-----------+
|air_temperature|humidity|pressure|rainfall|track_temperature|wind_direction|wind_speed|                date|session_key|meeting_key|
+---------------+--------+--------+--------+-----------------+--------------+----------+--------------------+-----------+-----------+
|           23.8|    26.0|  1012.2|       0|             29.7|           193|       4.9|2023-02-23T06:54:...|       9222|       1140|
|           23.8|    26.0|  1012.2|       0|             29.7|           193|       4.9|2023-02-23T06:55:...|       9222|       1140|
|           23.8|    27.0|  1012.2|       0|             30.0|           193|       4.3|2023-02-23T06:56:...|       9222|       1140|
|           23.8|    27.0|  1012.2|       0|             30.1|           186|       5.5|2023-02-23T06:57:...|       9222|       1140|
|           23.8|    27.0|  1012.2|       0|             30.1|

In [7]:
df_weather_with_monotonically_increasing_id = df_weather.withColumn("id", monotonically_increasing_id())
df_weather_with_monotonically_increasing_id.show()

+---------------+--------+--------+--------+-----------------+--------------+----------+--------------------+-----------+-----------+---+
|air_temperature|humidity|pressure|rainfall|track_temperature|wind_direction|wind_speed|                date|session_key|meeting_key| id|
+---------------+--------+--------+--------+-----------------+--------------+----------+--------------------+-----------+-----------+---+
|           23.8|    26.0|  1012.2|       0|             29.7|           193|       4.9|2023-02-23T06:54:...|       9222|       1140|  0|
|           23.8|    26.0|  1012.2|       0|             29.7|           193|       4.9|2023-02-23T06:55:...|       9222|       1140|  1|
|           23.8|    27.0|  1012.2|       0|             30.0|           193|       4.3|2023-02-23T06:56:...|       9222|       1140|  2|
|           23.8|    27.0|  1012.2|       0|             30.1|           186|       5.5|2023-02-23T06:57:...|       9222|       1140|  3|
|           23.8|    27.0|  1012.2

In [8]:
df_weather_with_monotonically_increasing_id.dtypes

[('air_temperature', 'double'),
 ('humidity', 'double'),
 ('pressure', 'double'),
 ('rainfall', 'bigint'),
 ('track_temperature', 'double'),
 ('wind_direction', 'bigint'),
 ('wind_speed', 'double'),
 ('date', 'string'),
 ('session_key', 'bigint'),
 ('meeting_key', 'bigint'),
 ('id', 'bigint')]

In [9]:
def add_weather_reference(
    current_race_df: DataFrame, weather_df: DataFrame
) -> DataFrame:
    
    filtered_weather_df = weather_df.drop("session_key").drop("meeting_key")

    cols_to_drop = [x for x in filtered_weather_df.columns if x not in current_race_df.columns and x != "id"]

    try:
        weather_referenced_df = current_race_df.select("*").join(
            filtered_weather_df, how="left", on=["date"]
        )
        return weather_referenced_df.drop(*cols_to_drop)
    except Exception as e:
        print(e)
    
res = add_weather_reference(weather_df=df_weather_with_monotonically_increasing_id, current_race_df=df_positions_max_date)
res = res.withColumnRenamed("session_key", "race_id").withColumnRenamed("driver_number", "driver_id").withColumnRenamed("id", "weather_id")
res.show()

+----------+---------+-------+--------+----------+
|      date|driver_id|race_id|position|weather_id|
+----------+---------+-------+--------+----------+
|2023-02-24|        1|   7763|       2|      1179|
|2023-02-24|        1|   7763|       2|      1178|
|2023-02-24|        1|   7763|       2|      1177|
|2023-02-24|        1|   7763|       2|      1176|
|2023-02-24|        1|   7763|       2|      1175|
|2023-02-24|        1|   7763|       2|      1174|
|2023-02-24|        1|   7763|       2|      1173|
|2023-02-24|        1|   7763|       2|      1172|
|2023-02-24|        1|   7763|       2|      1171|
|2023-02-24|        1|   7763|       2|      1170|
|2023-02-24|        1|   7763|       2|      1169|
|2023-02-24|        1|   7763|       2|      1168|
|2023-02-24|        1|   7763|       2|      1167|
|2023-02-24|        1|   7763|       2|      1166|
|2023-02-24|        1|   7763|       2|      1165|
|2023-02-24|        1|   7763|       2|      1164|
|2023-02-24|        1|   7763| 

In [10]:
res.dtypes

[('date', 'date'),
 ('driver_id', 'bigint'),
 ('race_id', 'bigint'),
 ('position', 'bigint'),
 ('weather_id', 'bigint')]

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Assuming df_weather is your DataFrame and it has been loaded correctly
# Convert the 'date' column to a date type if it's not already
df_weather = df_weather.withColumn("date", df_weather["date"].cast("date"))


# Now, perform your aggregation on df_with_id instead of df_weather
agg_df_weather = df_weather.groupBy(["date"]).agg(
    {"air_temperature": "avg", "humidity": "avg", "pressure": "avg", "rainfall": "sum", "track_temperature": "avg", "wind_direction": "avg", "wind_speed": "avg"}
)

sorted_agg_df_weather = agg_df_weather.sort(agg_df_weather.date.asc())

sorted_agg_df_weather_with_monotonic_id = sorted_agg_df_weather.withColumn("id", monotonically_increasing_id())

# Show the result
sorted_agg_df_weather_with_monotonic_id.show()


+----------+------------------+----------------------+--------------------+------------------+------------------+-------------+-------------------+---+
|      date|     avg(pressure)|avg(track_temperature)|avg(air_temperature)|     avg(humidity)|   avg(wind_speed)|sum(rainfall)|avg(wind_direction)| id|
+----------+------------------+----------------------+--------------------+------------------+------------------+-------------+-------------------+---+
|2023-02-23|1009.6552542372937|     35.97016949152542|  26.834406779661045|21.859322033898305| 2.112881355932195|            0| 205.49830508474577|  0|
|2023-02-24|1009.9096610169514|     36.44491525423729|   26.83525423728812|26.076271186440678|2.0916949152542292|            0| 215.82542372881355|  1|
|2023-02-25|1013.1206540447558|     36.38468158347681|  26.641996557659162|42.820998278829606| 1.154216867469881|            0| 242.63511187607574|  2|
|2023-03-03|1017.9171597633137|     33.68757396449705|  24.918934911242626| 17.822485207

In [12]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Assuming df_weather is your DataFrame and it has been loaded correctly
# Convert the 'date' column to a date type if it's not already
df_weather = df_weather.withColumn("date", df_weather["date"].cast("date"))

# Group by 'date' only to aggregate metrics for each day
agg_df_weather = df_weather.groupBy("date").agg(
    {"air_temperature": "avg", "humidity": "avg", "pressure": "avg", "rainfall": "sum", "track_temperature": "avg", "wind_direction": "avg", "wind_speed": "avg"}
)

# Show the result
# agg_df_weather.show()


windowSpec = Window.partitionBy("date").orderBy("date")

# Add the new 'id' column
df_with_id = agg_df_weather.withColumn("id", row_number().over(windowSpec))

# Show the updated DataFrame
df_with_id.show()

+----------+------------------+----------------------+--------------------+------------------+------------------+-------------+-------------------+---+
|      date|     avg(pressure)|avg(track_temperature)|avg(air_temperature)|     avg(humidity)|   avg(wind_speed)|sum(rainfall)|avg(wind_direction)| id|
+----------+------------------+----------------------+--------------------+------------------+------------------+-------------+-------------------+---+
|2023-02-23|1009.6552542372937|     35.97016949152542|  26.834406779661045|21.859322033898305| 2.112881355932195|            0| 205.49830508474577|  1|
|2023-02-24|1009.9096610169514|     36.44491525423729|   26.83525423728812|26.076271186440678|2.0916949152542292|            0| 215.82542372881355|  1|
|2023-02-25|1013.1206540447558|     36.38468158347681|  26.641996557659162|42.820998278829606| 1.154216867469881|            0| 242.63511187607574|  1|
|2023-03-03|1017.9171597633137|     33.68757396449705|  24.918934911242626| 17.822485207

In [13]:
res = res.withColumnRenamed("position", "final_position")
res.show()

+----------+---------+-------+--------------+----------+
|      date|driver_id|race_id|final_position|weather_id|
+----------+---------+-------+--------------+----------+
|2023-02-24|        1|   7763|             2|      1179|
|2023-02-24|        1|   7763|             2|      1178|
|2023-02-24|        1|   7763|             2|      1177|
|2023-02-24|        1|   7763|             2|      1176|
|2023-02-24|        1|   7763|             2|      1175|
|2023-02-24|        1|   7763|             2|      1174|
|2023-02-24|        1|   7763|             2|      1173|
|2023-02-24|        1|   7763|             2|      1172|
|2023-02-24|        1|   7763|             2|      1171|
|2023-02-24|        1|   7763|             2|      1170|
|2023-02-24|        1|   7763|             2|      1169|
|2023-02-24|        1|   7763|             2|      1168|
|2023-02-24|        1|   7763|             2|      1167|
|2023-02-24|        1|   7763|             2|      1166|
|2023-02-24|        1|   7763| 

In [None]:
clickhouse_client.insert_df(df=res.toPandas(), table="ResultsFactsFT")

In [12]:
import pandas as pd

team_values = [
  {"team_id": 1, "name": "AlphaTauri", "country_code": "GB", "acronym": "AT"},
  {"team_id": 2, "name": "Red Bull Racing", "country_code": "AU", "acronym": "RBR"},
  {"team_id": 3, "name": "Williams", "country_code": "GB", "acronym": "WILL"},
  {"team_id": 4, "name": "Alpine", "country_code": "FR", "acronym": "ALP"},
  {"team_id": 5, "name": "Aston Martin", "country_code": "GB", "acronym": "AM"},
  {"team_id": 6, "name": "Ferrari", "country_code": "IT", "acronym": "FER"},
  {"team_id": 7, "name": "Haas F1 Team", "country_code": "US", "acronym": "HAA"},
  {"team_id": 8, "name": "Alfa Romeo", "country_code": "IT", "acronym": "AR"},
  {"team_id": 9, "name": "Mercedes", "country_code": "DE", "acronym": "MER"},
  {"team_id": 10, "name": "McLaren", "country_code": "GB", "acronym": "MCL"}
]

team_df = pd.DataFrame(team_values)
team_df

Unnamed: 0,team_id,name,country_code,acronym
0,1,AlphaTauri,GB,AT
1,2,Red Bull Racing,AU,RBR
2,3,Williams,GB,WILL
3,4,Alpine,FR,ALP
4,5,Aston Martin,GB,AM
5,6,Ferrari,IT,FER
6,7,Haas F1 Team,US,HAA
7,8,Alfa Romeo,IT,AR
8,9,Mercedes,DE,MER
9,10,McLaren,GB,MCL


In [13]:
from io import BytesIO

buffer = BytesIO()
team_df.to_parquet(buffer, compression="snappy")

s3.put_object(
    Bucket="f1-bucket",
    Key="teams.parquet",
    Body=buffer.getvalue()
)

{'ResponseMetadata': {'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 12 May 2024 20:42:11 GMT',
   'content-length': '0',
   'connection': 'keep-alive',
   'access-control-allow-headers': '*',
   'access-control-allow-methods': 'GET, HEAD, POST, PUT, PATCH, DELETE',
   'access-control-allow-origin': '*',
   'access-control-expose-headers': '*',
   'etag': '16fb8a0c13f84e9941fd95c9939c1fc1',
   'strict-transport-security': 'max-age=15724800; includeSubDomains',
   'x-cbt-tenant-name': 'ngc',
   'x-cbt-tenant-id': '00000000-0000-0000-0000-000000000000'},
  'RetryAttempts': 0},
 'ETag': '16fb8a0c13f84e9941fd95c9939c1fc1'}