In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType, DoubleType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.appName("F1 Data Analysis").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()
 # .config("spark.hadoop.fs.s3a.access.key", "AKIA6FXCAGDVQXS4E6WW") \
    # .config("spark.hadoop.fs.s3a.secret.key", "zRNKGEOArshxuzKBtMu5Wxi9OF5meB++kYf05Apt") \
    # .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \

24/09/25 19:48:09 WARN Utils: Your hostname, Admins-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.10.104 instead (on interface en0)
24/09/25 19:48:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/25 19:48:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
circuit_schema = StructType([
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])

In [6]:
circuits = spark.read.schema(circuit_schema).format("csv").option("header","true").load("data/circuits.csv")

In [7]:
circuits.show(6)

                                                                                

+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng|alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968| 10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738| 18|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|  7|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|109|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|130|http://en.wikiped...|
|        6|     monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347|7.42056|  7|http://en.wikiped...|
+---------

In [8]:
races = spark.read.format("csv").option("header","true").option("inferSchema","True").load("data/races.csv")

In [9]:
races.columns
#races.schema

['raceId',
 'year',
 'round',
 'circuitId',
 'name',
 'date',
 'time',
 'url',
 'fp1_date',
 'fp1_time',
 'fp2_date',
 'fp2_time',
 'fp3_date',
 'fp3_time',
 'quali_date',
 'quali_time',
 'sprint_date',
 'sprint_time']

In [10]:
circuits.createOrReplaceTempView("circuits")
races.createOrReplaceTempView("races")

In [11]:
races_in__each_circuits = spark.sql("select count(*),c.name from races r join circuits c on c.circuitId=r.circuitId group by c.name order by count(*) desc")


In [12]:
# most popular circuits
races_in__each_circuits.show(10)


+--------+--------------------+
|count(1)|                name|
+--------+--------------------+
|      74|Autodromo Naziona...|
|      70|   Circuit de Monaco|
|      59| Silverstone Circuit|
|      57|Circuit de Spa-Fr...|
|      43|Circuit Gilles Vi...|
|      41|Autódromo José Ca...|
|      41|         Nürburgring|
|      39|         Hungaroring|
|      38|       Red Bull Ring|
|      37|      Hockenheimring|
+--------+--------------------+
only showing top 10 rows



In [13]:
# races per year 
races.groupBy('year').count().sort(col("year").desc()).show(10)

+----+-----+
|year|count|
+----+-----+
|2024|   24|
|2023|   22|
|2022|   22|
|2021|   22|
|2020|   17|
|2019|   21|
|2018|   21|
|2017|   20|
|2016|   21|
|2015|   19|
+----+-----+
only showing top 10 rows



In [21]:
# average races per decade 
races_in__each_decade = spark.sql("select count(*),(int(year/10))*10 from races r join circuits c on c.circuitId=r.circuitId group by (int(year/10))*10 order by (int(year/10))*10 desc")

In [22]:
races_in__each_decade.show()

+--------+------------------+
|count(1)|((year / 10) * 10)|
+--------+------------------+
|     107|              2020|
|     198|              2010|
|     174|              2000|
|     162|              1990|
|     156|              1980|
|     144|              1970|
|     100|              1960|
|      84|              1950|
+--------+------------------+

