# Install PySpark

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=e5760341495ce787563017298262035f4876fe306fe60e04b12b31207711d547
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
import pyspark

sc = pyspark.SparkContext()
sql_sc = pyspark.SQLContext(sc)



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType, DateType, FloatType
from pyspark.sql.functions import current_timestamp, lit, col, to_timestamp, concat

In [5]:
spark = (pyspark.sql.SparkSession
         .builder
         .appName('FormulaOne')
         .getOrCreate()
)

# Import datasets 

In [6]:
!curl -O http://ergast.com/downloads/f1db_csv.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 5836k  100 5836k    0     0  27.1M      0 --:--:-- --:--:-- --:--:-- 27.1M


In [7]:
!unzip /content/f1db_csv.zip -d /content/f1_datasets

Archive:  /content/f1db_csv.zip
  inflating: /content/f1_datasets/circuits.csv  
  inflating: /content/f1_datasets/constructor_results.csv  
  inflating: /content/f1_datasets/constructors.csv  
  inflating: /content/f1_datasets/constructor_standings.csv  
  inflating: /content/f1_datasets/drivers.csv  
  inflating: /content/f1_datasets/driver_standings.csv  
  inflating: /content/f1_datasets/lap_times.csv  
  inflating: /content/f1_datasets/pit_stops.csv  
  inflating: /content/f1_datasets/qualifying.csv  
  inflating: /content/f1_datasets/races.csv  
  inflating: /content/f1_datasets/results.csv  
  inflating: /content/f1_datasets/seasons.csv  
  inflating: /content/f1_datasets/sprint_results.csv  
  inflating: /content/f1_datasets/status.csv  


In [8]:
!ls -lh /content/f1_datasets

total 20M
-rw-rw-r-- 1 root root 9.8K Oct 31 08:51 circuits.csv
-rw-rw-r-- 1 root root 206K Oct 31 08:51 constructor_results.csv
-rw-rw-r-- 1 root root  17K Oct 31 08:51 constructors.csv
-rw-rw-r-- 1 root root 298K Oct 31 08:51 constructor_standings.csv
-rw-rw-r-- 1 root root  92K Oct 31 08:51 drivers.csv
-rw-rw-r-- 1 root root 837K Oct 31 08:51 driver_standings.csv
-rw-rw-r-- 1 root root  16M Oct 31 08:51 lap_times.csv
-rw-rw-r-- 1 root root 363K Oct 31 08:51 pit_stops.csv
-rw-rw-r-- 1 root root 408K Oct 31 08:51 qualifying.csv
-rw-rw-r-- 1 root root 151K Oct 31 08:51 races.csv
-rw-rw-r-- 1 root root 1.6M Oct 31 08:51 results.csv
-rw-rw-r-- 1 root root 4.4K Oct 31 08:51 seasons.csv
-rw-r--r-- 1 root root 6.8K Oct 31 08:51 sprint_results.csv
-rw-rw-r-- 1 root root 2.1K Oct 31 08:51 status.csv


# Ingestion

## Load circuits file, using InferSchema 

In [9]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)
])

circuits_df = (spark.read
               .schema(circuits_schema)
               .csv('/content/f1_datasets/circuits.csv', sep=',', header = True)
)

circuits_selected_df = circuits_df.select(col("circuitId"),col("circuitRef"),col("name"),col("location"),col("country"),\
                                          col("lat"),col("lng"),col("alt"))

circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId","circuit_id")\
.withColumnRenamed("circuitRef","circuit_ref")\
.withColumnRenamed("lat","latitude")\
.withColumnRenamed("lng","longitude")\
.withColumnRenamed("alt","altitude")

circuits_final_df = circuits_renamed_df.withColumn("ingestion_date",current_timestamp())\
.withColumn("env",lit("Production"))

### Write data as parquet 

In [10]:
circuits_df.write.mode("overwrite").parquet("/content/f1_datasets/circuits")
circuits_parquet = spark.read.parquet("/content/f1_datasets/circuits")

## Ingest races file

In [13]:
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                     StructField("year", IntegerType(), True),
                                     StructField("round", IntegerType(), True),
                                     StructField("circuitId", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("date", DateType(), True),
                                     StructField("time", StringType(), True),
                                     StructField("url", StringType(), True)
])

races_df = (spark.read
               .schema(races_schema)
               .csv('/content/f1_datasets/races.csv', sep=',', header = True)
)

races_with_df = (races_df
                  .withColumn("race_timestamp", to_timestamp(concat(col("date"),lit(' '),col("time")),'yyyy-MM-dd HH:mm:ss'))
                  .withColumn("ingestion_date",current_timestamp())
)

races_selected_df = (races_with_df.select(col("raceId").alias("race_id"),col("year").alias("race_year"),
                                          col("round"),col("circuitId").alias("circuit_id"),col("name"),col("ingestion_date"),col("race_timestamp"))
)

races_selected_df.write.mode("overwrite").parquet("/content/f1_datasets/races")
races_parquet = spark.read.parquet("/content/f1_datasets/races")

+-------+---------+-----+----------+--------------------+--------------------+-------------------+
|race_id|race_year|round|circuit_id|                name|      ingestion_date|     race_timestamp|
+-------+---------+-----+----------+--------------------+--------------------+-------------------+
|      1|     2009|    1|         1|Australian Grand ...|2022-11-09 09:31:...|2009-03-29 06:00:00|
|      2|     2009|    2|         2|Malaysian Grand Prix|2022-11-09 09:31:...|2009-04-05 09:00:00|
|      3|     2009|    3|        17|  Chinese Grand Prix|2022-11-09 09:31:...|2009-04-19 07:00:00|
|      4|     2009|    4|         3|  Bahrain Grand Prix|2022-11-09 09:31:...|2009-04-26 12:00:00|
|      5|     2009|    5|         4|  Spanish Grand Prix|2022-11-09 09:31:...|2009-05-10 12:00:00|
|      6|     2009|    6|         6|   Monaco Grand Prix|2022-11-09 09:31:...|2009-05-24 12:00:00|
|      7|     2009|    7|         5|  Turkish Grand Prix|2022-11-09 09:31:...|2009-06-07 12:00:00|
|      8| 

### Add partition by 

In [14]:
races_selected_df.write.mode("overwrite").partitionBy('race_year').parquet("/content/f1_datasets/processed/races")
races_parquet = spark.read.parquet("/content/f1_datasets/processed/races")

+-------+-----+----------+--------------------+--------------------+-------------------+---------+
|race_id|round|circuit_id|                name|      ingestion_date|     race_timestamp|race_year|
+-------+-----+----------+--------------------+--------------------+-------------------+---------+
|   1074|    1|         3|  Bahrain Grand Prix|2022-11-09 09:31:...|2022-03-20 15:00:00|     2022|
|   1075|    2|        77|Saudi Arabian Gra...|2022-11-09 09:31:...|2022-03-27 17:00:00|     2022|
|   1076|    3|         1|Australian Grand ...|2022-11-09 09:31:...|2022-04-10 05:00:00|     2022|
|   1077|    4|        21|Emilia Romagna Gr...|2022-11-09 09:31:...|2022-04-24 13:00:00|     2022|
|   1078|    5|        79|    Miami Grand Prix|2022-11-09 09:31:...|2022-05-08 19:30:00|     2022|
|   1079|    6|         4|  Spanish Grand Prix|2022-11-09 09:31:...|2022-05-22 13:00:00|     2022|
|   1080|    7|         6|   Monaco Grand Prix|2022-11-09 09:31:...|2022-05-29 13:00:00|     2022|
|   1081| 

## Constructors file

In [15]:
constructors_schema = StructType(fields=[StructField("constructorId", StringType(), False),
                                     StructField("constructorRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("nationality", StringType(), True),
                                     StructField("url", StringType(), True)
])

constructors_df = (spark.read
               .schema(constructors_schema)
               .csv('/content/f1_datasets/constructors.csv', sep=',', header = True)
)

constructors_dropped_df = constructors_df.drop("url")
constructors_with_df = constructors_dropped_df.withColumn("ingestion_date",current_timestamp())
constructors_selected_df = (constructors_with_df.select(col("constructorId").alias("constructor_id"),
                                                        col("constructorRef").alias("constructor_ref"),
                                                        col("name"),col("nationality"),col("ingestion_date"))
)

constructors_selected_df.write.mode("overwrite").parquet("/content/f1_datasets/constructors")
constructors_parquet = spark.read.parquet("/content/f1_datasets/constructors")

+--------------+---------------+-----------+-----------+--------------------+
|constructor_id|constructor_ref|       name|nationality|      ingestion_date|
+--------------+---------------+-----------+-----------+--------------------+
|             1|        mclaren|    McLaren|    British|2022-11-09 09:31:...|
|             2|     bmw_sauber| BMW Sauber|     German|2022-11-09 09:31:...|
|             3|       williams|   Williams|    British|2022-11-09 09:31:...|
|             4|        renault|    Renault|     French|2022-11-09 09:31:...|
|             5|     toro_rosso| Toro Rosso|    Italian|2022-11-09 09:31:...|
|             6|        ferrari|    Ferrari|    Italian|2022-11-09 09:31:...|
|             7|         toyota|     Toyota|   Japanese|2022-11-09 09:31:...|
|             8|    super_aguri|Super Aguri|   Japanese|2022-11-09 09:31:...|
|             9|       red_bull|   Red Bull|   Austrian|2022-11-09 09:31:...|
|            10|    force_india|Force India|     Indian|2022-11-

## Drivers file

In [16]:
drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                    StructField("driverRef", StringType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("forename",StringType(),True),
                                    StructField("surname",StringType(),True),
                                    StructField("dob", DateType(), True),
                                    StructField("nationality", StringType(), True),
                                    StructField("url", StringType(), True)
                                    ])

drivers_df = (spark.read
              .schema(drivers_schema)
              .csv('/content/f1_datasets/drivers.csv', sep=',', header = True)
)
drivers_renamed_df = (drivers_df
                      .withColumnRenamed("driverId","driver_id")
                      .withColumnRenamed("driverRef","driver_ref")
                      .withColumn("ingestion_date",current_timestamp())
                      .withColumn("name",concat(col("forename"),lit(" "), col("surname")))
                     )
drivers_renamed_df.write.mode("overwrite").parquet("/content/f1_datasets/drivers")
drivers_parquet = spark.read.parquet("/content/f1_datasets/drivers")

+---------+----------+------+----+---------+----------+----------+-----------+--------------------+--------------------+------------------+
|driver_id|driver_ref|number|code| forename|   surname|       dob|nationality|                 url|      ingestion_date|              name|
+---------+----------+------+----+---------+----------+----------+-----------+--------------------+--------------------+------------------+
|        1|  hamilton|    44| HAM|    Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|2022-11-09 09:31:...|    Lewis Hamilton|
|        2|  heidfeld|  null| HEI|     Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|2022-11-09 09:31:...|     Nick Heidfeld|
|        3|   rosberg|     6| ROS|     Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|2022-11-09 09:31:...|      Nico Rosberg|
|        4|    alonso|    14| ALO| Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|2022-11-09 09:31:...|   Fernando Alonso|
|        5|kovalaine

## Results file

In [17]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(),True),
                                    StructField("rank", IntegerType(),True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(),True),
                                    StructField("statusId", StringType(), True)
                                    ])

results_df = (spark.read
              .schema(results_schema)
              .csv('/content/f1_datasets/results.csv', sep=',', header = True)
)

results_clean_df = (results_df
                    .drop("statusId")
                    .withColumnRenamed("resultId","result_id")
                    .withColumnRenamed("raceId","race_id")
                    .withColumnRenamed("driverId","driver_id")
                    .withColumnRenamed("constructorId","constructor_id")
                    .withColumnRenamed("positionText","position_text")
                    .withColumnRenamed("positionOrder","position_order")
                    .withColumnRenamed("fastestLap","fastest_lap")
                    .withColumnRenamed("fastestLapTime","fastest_lap_time")
                    .withColumnRenamed("fastestLapSpeed","fastest_lap_speed")
                    .withColumn("ingestion_date", current_timestamp())
                    )

results_clean_df.write.mode("overwrite").partitionBy("race_id").parquet("/content/f1_datasets/processed/results")
results_parquet = spark.read.parquet("/content/f1_datasets/processed/results")

+---------+---------+--------------+------+----+--------+-------------+--------------+------+----+----------+------------+-----------+----+----------------+-----------------+--------------------+-------+
|result_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|      time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date|race_id|
+---------+---------+--------------+------+----+--------+-------------+--------------+------+----+----------+------------+-----------+----+----------------+-----------------+--------------------+-------+
|    19232|      657|           113|    14|  19|       1|            1|             1|   8.0| 200|3:49:17.27|    13757270|       null|null|              \N|             null|2022-11-09 09:31:...|    800|
|    19233|      525|           114|     9|   3|       2|            2|             2|   6.0| 200|  +1:09.95|    13827220|       null|null|              \N|             null|2022-11-09

## Pitstops file

In [18]:
pitstops_schema = StructType(fields=[StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("stop", StringType(), False),
                                    StructField("lap", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("duration", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True)
])
pitstops_df = (spark.read
              .schema(pitstops_schema)
              .csv('/content/f1_datasets/pit_stops.csv', sep=',', header = True)
)

pitstops_clean_df = (pitstops_df
                    .withColumnRenamed("driverId","driver_id")
                    .withColumnRenamed("raceId","race_id")
                    .withColumn("ingestion_date", current_timestamp())
                    )

pitstops_clean_df.write.mode("overwrite").parquet("/content/f1_datasets/pit_stops")
pitstops_parquet = spark.read.parquet("/content/f1_datasets/pit_stops")

+-------+---------+----+---+--------+--------+------------+--------------------+
|race_id|driver_id|stop|lap|    time|duration|milliseconds|      ingestion_date|
+-------+---------+----+---+--------+--------+------------+--------------------+
|    841|      153|   1|  1|17:05:23|  26.898|       26898|2022-11-09 09:32:...|
|    841|       30|   1|  1|17:05:52|  25.021|       25021|2022-11-09 09:32:...|
|    841|       17|   1| 11|17:20:48|  23.426|       23426|2022-11-09 09:32:...|
|    841|        4|   1| 12|17:22:34|  23.251|       23251|2022-11-09 09:32:...|
|    841|       13|   1| 13|17:24:10|  23.842|       23842|2022-11-09 09:32:...|
|    841|       22|   1| 13|17:24:29|  23.643|       23643|2022-11-09 09:32:...|
|    841|       20|   1| 14|17:25:17|  22.603|       22603|2022-11-09 09:32:...|
|    841|      814|   1| 14|17:26:03|  24.863|       24863|2022-11-09 09:32:...|
|    841|      816|   1| 14|17:26:50|  25.259|       25259|2022-11-09 09:32:...|
|    841|       67|   1| 15|

## Laptimes file

In [19]:
laptimes_schema = StructType(fields=[StructField("raceId", IntegerType(), True),
                                     StructField("driverId", IntegerType(), True),
                                     StructField("lap", IntegerType(), True),
                                     StructField("position", IntegerType(), False),
                                     StructField("time", StringType(), True),
                                     StructField("milliseconds", IntegerType(), True)
])
laptimes_df = (spark.read
              .schema(laptimes_schema)
              .csv('/content/f1_datasets/lap_times.csv', sep=',', header = True)
)

laptimes_clean_df = (laptimes_df
                    .withColumnRenamed("driverId","driver_id")
                    .withColumnRenamed("raceId","race_id")
                    .withColumn("ingestion_date", current_timestamp())
                    )

laptimes_clean_df.write.mode("overwrite").parquet("/content/f1_datasets/lap_times")
laptimes_parquet = spark.read.parquet("/content/f1_datasets/lap_times")

+-------+---------+---+--------+--------+------------+--------------------+
|race_id|driver_id|lap|position|    time|milliseconds|      ingestion_date|
+-------+---------+---+--------+--------+------------+--------------------+
|    841|       20|  1|       1|1:38.109|       98109|2022-11-09 09:32:...|
|    841|       20|  2|       1|1:33.006|       93006|2022-11-09 09:32:...|
|    841|       20|  3|       1|1:32.713|       92713|2022-11-09 09:32:...|
|    841|       20|  4|       1|1:32.803|       92803|2022-11-09 09:32:...|
|    841|       20|  5|       1|1:32.342|       92342|2022-11-09 09:32:...|
|    841|       20|  6|       1|1:32.605|       92605|2022-11-09 09:32:...|
|    841|       20|  7|       1|1:32.502|       92502|2022-11-09 09:32:...|
|    841|       20|  8|       1|1:32.537|       92537|2022-11-09 09:32:...|
|    841|       20|  9|       1|1:33.240|       93240|2022-11-09 09:32:...|
|    841|       20| 10|       1|1:32.572|       92572|2022-11-09 09:32:...|
|    841|   

## Qualifying file

In [20]:
qualifying_schema = StructType(fields=[
    StructField("qualifyId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), False),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True)
])
qualifying_df = (spark.read
              .schema(qualifying_schema)
              .csv('/content/f1_datasets/qualifying.csv', sep=',', header = True)
)

qualifying_clean_df = (qualifying_df
                       .withColumnRenamed("qualifyId","qualify_id")
                       .withColumnRenamed("constructorId","constructor_id")
                       .withColumnRenamed("driverId","driver_id")
                       .withColumnRenamed("raceId","race_id")
                       .withColumn("ingestion_date", current_timestamp())
                    )

qualifying_clean_df.write.mode("overwrite").parquet("/content/f1_datasets/qualifying")
qualifying_parquet = spark.read.parquet("/content/f1_datasets/qualifying")

+----------+-------+---------+--------------+------+--------+--------+--------+--------+--------------------+
|qualify_id|race_id|driver_id|constructor_id|number|position|      q1|      q2|      q3|      ingestion_date|
+----------+-------+---------+--------------+------+--------+--------+--------+--------+--------------------+
|         1|     18|        1|             1|    22|       1|1:26.572|1:25.187|1:26.714|2022-11-09 09:32:...|
|         2|     18|        9|             2|     4|       2|1:26.103|1:25.315|1:26.869|2022-11-09 09:32:...|
|         3|     18|        5|             1|    23|       3|1:25.664|1:25.452|1:27.079|2022-11-09 09:32:...|
|         4|     18|       13|             6|     2|       4|1:25.994|1:25.691|1:27.178|2022-11-09 09:32:...|
|         5|     18|        2|             2|     3|       5|1:25.960|1:25.518|1:27.236|2022-11-09 09:32:...|
|         6|     18|       15|             7|    11|       6|1:26.427|1:26.101|1:28.527|2022-11-09 09:32:...|
|         

In [None]:
print("Notebook f1-csv-ingestion.ipynb has successfully been executed")