## Create spark context

In [1]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "2").appName("Analysis").master("local[2]").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext



In [2]:
%run "../includes/configuration"

## Read all the data required

In [3]:
# Renaming columns so there will be less ambiguity because of similar names
drivers_df = spark.read.parquet(f"{processed_folder_path}/drivers").withColumnRenamed("number", "driver_number")\
.withColumnRenamed("name", "driver_name").withColumnRenamed("nationality", "driver_nationality")

constructors_df = spark.read.parquet(f"{processed_folder_path}/constructors").withColumnRenamed("name", "team")

circuits_df = spark.read.parquet(f"{processed_folder_path}/circuits").withColumnRenamed("location", "circuit_location")

races_df = spark.read.parquet(f"{processed_folder_path}/races").withColumnRenamed("name", "race_name")\
.withColumnRenamed("race_timestamp", "race_date")

results_df = spark.read.parquet(f"{processed_folder_path}/results").withColumnRenamed("time", "race_time")

## Join circuits to races

In [4]:
race_circuits_df = races_df.join(circuits_df, races_df.circuit_id == circuits_df.circuit_id, "inner")\
.select(races_df.race_id, races_df.race_year, races_df.race_name, races_df.race_date, circuits_df.circuit_location)

In [5]:
race_circuits_df.show(5)

+-------+---------+--------------------+-------------------+----------------+
|race_id|race_year|           race_name|          race_date|circuit_location|
+-------+---------+--------------------+-------------------+----------------+
|   1053|     2021|Emilia Romagna Gr...|2021-04-18 13:00:00|           Imola|
|   1052|     2021|  Bahrain Grand Prix|2021-03-28 15:00:00|          Sakhir|
|   1051|     2021|Australian Grand ...|2021-11-21 06:00:00|       Melbourne|
|   1054|     2021|                 TBC|               null|         Nürburg|
|   1055|     2021|  Spanish Grand Prix|2021-05-09 13:00:00|        Montmeló|
+-------+---------+--------------------+-------------------+----------------+
only showing top 5 rows



In [6]:
## Join results to all other dataframes

In [7]:
race_results_df = results_df.join(race_circuits_df, results_df.race_id == race_circuits_df.race_id)\
.join(drivers_df, results_df.driver_id == drivers_df.driver_id)\
.join(constructors_df, results_df.constructor_id == constructors_df.constructor_id)

In [8]:
# Check if there are repeated columns
race_results_df.columns

['result_id',
 'race_id',
 'driver_id',
 'constructor_id',
 'number',
 'grid',
 'position',
 'position_text',
 'position_order',
 'points',
 'laps',
 'race_time',
 'milliseconds',
 'fastest_lap',
 'rank',
 'fastest_lap_time',
 'fastest_lap_speed',
 'ingestion_date',
 'race_id',
 'race_year',
 'race_name',
 'race_date',
 'circuit_location',
 'driver_id',
 'driver_ref',
 'driver_number',
 'code',
 'driver_name',
 'dob',
 'driver_nationality',
 'ingestion_date',
 'constructor_id',
 'constructor_ref',
 'team',
 'nationality',
 'ingestion_date']

In [22]:
final_df = race_results_df.select('result_id',
 races_df['race_id'],
 drivers_df['driver_id'],
 constructors_df['constructor_id'],
 'number',
 'grid',
 'position',
 'position_text',
 'position_order',
 'points',
 'laps',
 'race_time',
 'milliseconds',
 'fastest_lap',
 'rank',
 'fastest_lap_time',
 'fastest_lap_speed',
 'race_year',
 'race_name',
 'race_date',
 'circuit_location',
 'driver_ref',
 'driver_number',
 'code',
 'driver_name',
 'dob',
 'driver_nationality',
 'constructor_ref',
 'team',
 'nationality',
 drivers_df['ingestion_date'])\
.withColumn("created_date", current_timestamp())

In [23]:
final_df.write.csv("/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/all_tables", header=True)

In [25]:
from functools import reduce

# List of CSV files to merge
csv_files = ["/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/all_tables/part-00000-45594a6b-c8ec-425b-a9cc-cda013f4c5cb-c000.csv", "/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/all_tables/part-00001-45594a6b-c8ec-425b-a9cc-cda013f4c5cb-c000.csv", "/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/all_tables/part-00002-45594a6b-c8ec-425b-a9cc-cda013f4c5cb-c000.csv"]

# Read CSV files and create DataFrames
dataframes = [spark.read.csv(file, header=True, inferSchema=True) for file in csv_files]

# Union all DataFrames
merged_dataframe = reduce(lambda x, y: x.union(y), dataframes)

# Show the merged DataFrame
merged_dataframe.show()

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+---------+--------------------+-------------------+----------------+---------------+-------------+----+-----------------+----------+------------------+---------------+------------+-----------+--------------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|  race_time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|race_year|           race_name|          race_date|circuit_location|     driver_ref|driver_number|code|      driver_name|       dob|driver_nationality|constructor_ref|        team|nationality|      ingestion_date|        created_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------

In [26]:
merged_dataframe.write.csv("/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/one_table", header=True)

In [27]:
#analysis

In [30]:
merged_csv = spark.read.csv(r"/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/one_table/merged.csv", header=True)

merged_csv.columns

In [36]:
# Define era ranges
era_ranges = {
    "Pre-Modern Era": (1950, 1969),
    "Transition Era": (1970, 1982),
    "Turbo Era": (1983, 1989),
    "Modern Era": (1990, 2021)
}

# Function to determine era based on year
def get_era(year):
    for era, (start, end) in era_ranges.items():
        if start <= year and year <= end:
            return era
    return "Unknown"

In [38]:
# Add Era column to races DataFrame
races_with_era_df = merged_csv.withColumn("Era", get_era(merged_csv["race_year"]))

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [None]:

# Perform analysis within each era
era_analysis_df = races_with_era_df.groupBy("Era").agg(
    # Add your analysis calculations here
)

# Show the era-wise analysis
era_analysis_df.show()

# Stop the Spark session
# spark.stop()

In [39]:
era_csv = spark.read.csv(r"/home/sunbeam/Desktop/FastLaneForecast/combinedCsv/one_table/merged_era_file.csv", header=True)

In [40]:
era_csv.columns

['result_id',
 'race_id',
 'driver_id',
 'constructor_id',
 'number',
 'grid',
 'position',
 'position_text',
 'position_order',
 'points',
 'laps',
 'race_time',
 'milliseconds',
 'fastest_lap',
 'rank',
 'fastest_lap_time',
 'fastest_lap_speed',
 'race_year',
 'race_name',
 'race_date',
 'circuit_location',
 'driver_ref',
 'driver_number',
 'code',
 'driver_name',
 'dob',
 'driver_nationality',
 'constructor_ref',
 'team',
 'nationality',
 'ingestion_date',
 'created_date',
 'era',
 '_c33']

In [42]:
final_df = era_csv.drop("_c33")

In [53]:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define a window specification partitioned by driver name and ordered by points
window_spec = Window.partitionBy("driver_name").orderBy(F.desc("points"))

# Create a new column with the rank of each driver based on points
df_with_rank = final_df.withColumn("rank", F.row_number().over(window_spec))

# Select the top 5 drivers' data based on the rank
top_5_drivers = df_with_rank.filter(F.col("rank") <= 5)

# Show the result
top_5_drivers.select("driver_name", "points").show()

+-------------+------+
|  driver_name|points|
+-------------+------+
| Adolf Brudes|     0|
|  Adolfo Cruz|     0|
| Adrian Sutil|     8|
| Adrian Sutil|     8|
| Adrian Sutil|     8|
| Adrian Sutil|     6|
| Adrian Sutil|     6|
|Adrián Campos|     0|
|Adrián Campos|     0|
|Adrián Campos|     0|
|Adrián Campos|     0|
|Adrián Campos|     0|
| Aguri Suzuki|     4|
| Aguri Suzuki|     1|
| Aguri Suzuki|     1|
| Aguri Suzuki|     1|
| Aguri Suzuki|     1|
|    Al Herman|     0|
|    Al Herman|     0|
|    Al Herman|     0|
+-------------+------+
only showing top 20 rows



In [75]:
# Create a temporary SQL view from the DataFrame
final_df.createOrReplaceTempView("f1_data")

# Write and execute the SQL query to get top 5 drivers' points for each era
spark.sql("SELECT driver_name, points, DENSE_RANK() OVER(PARTITION BY era ORDER BY points DESC) AS points_rank, era FROM f1_data WHERE DENSE_RANK() OVER(PARTITION BY era ORDER BY points DESC) <= 5").show()

AnalysisException: It is not allowed to use window functions inside WHERE clause

In [50]:
# Task 1: Average Points and Standings
average_points_by_era = final_df.groupBy("era").agg(round(avg("points"), 2).alias("average_points"))
average_standings_by_era = final_df.groupBy("era").agg(round(avg("position"), 0).alias("average_position"))

In [51]:
average_points_by_era.show()

+--------------+--------------+
|           era|average_points|
+--------------+--------------+
|Transition Era|          0.92|
|Pre-Modern Era|          1.09|
|    Modern Era|          2.56|
|     Turbo Era|          0.86|
+--------------+--------------+



In [52]:
average_standings_by_era.show()

+--------------+----------------+
|           era|average_position|
+--------------+----------------+
|Transition Era|             7.0|
|Pre-Modern Era|             7.0|
|    Modern Era|             9.0|
|     Turbo Era|             7.0|
+--------------+----------------+



In [81]:
final_df.show(1)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+---------+------------+-----------+----+----------------+-----------------+---------+--------------------+--------------------+----------------+----------+-------------+----+---------------+--------+------------------+---------------+--------+-----------+--------------------+--------------------+----------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|race_time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|race_year|           race_name|           race_date|circuit_location|driver_ref|driver_number|code|    driver_name|     dob|driver_nationality|constructor_ref|    team|nationality|      ingestion_date|        created_date|       era|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+---------+------------+-----------+----+----------------+-------

In [77]:
# Task 2: Wins and Podium Finishes
wins_and_podiums_by_era = final_df.groupBy("era").agg(sum("wins").alias("total_wins"), sum("podiums").alias("total_podiums"))


AnalysisException: cannot resolve 'wins' given input columns: [circuit_location, code, constructor_id, constructor_ref, created_date, dob, driver_id, driver_name, driver_nationality, driver_number, driver_ref, era, fastest_lap, fastest_lap_speed, fastest_lap_time, grid, ingestion_date, laps, milliseconds, nationality, number, points, position, position_order, position_text, race_date, race_id, race_name, race_time, race_year, rank, result_id, team];
'Aggregate [era#1716], [era#1716, sum('wins) AS total_wins#2716, sum('podiums) AS total_podiums#2718]
+- Project [result_id#1684, race_id#1685, driver_id#1686, constructor_id#1687, number#1688, grid#1689, position#1690, position_text#1691, position_order#1692, points#1693, laps#1694, race_time#1695, milliseconds#1696, fastest_lap#1697, rank#1698, fastest_lap_time#1699, fastest_lap_speed#1700, race_year#1701, race_name#1702, race_date#1703, circuit_location#1704, driver_ref#1705, driver_number#1706, code#1707, ... 9 more fields]
   +- Relation [result_id#1684,race_id#1685,driver_id#1686,constructor_id#1687,number#1688,grid#1689,position#1690,position_text#1691,position_order#1692,points#1693,laps#1694,race_time#1695,milliseconds#1696,fastest_lap#1697,rank#1698,fastest_lap_time#1699,fastest_lap_speed#1700,race_year#1701,race_name#1702,race_date#1703,circuit_location#1704,driver_ref#1705,driver_number#1706,code#1707,... 10 more fields] csv


In [None]:
#stop

In [9]:
final_df = race_results_df.select("race_year", "race_name", "race_date", "circuit_location", "driver_name", "driver_number", "driver_nationality", "team", "grid", "fastest_lap", "race_time", "points", "position")\
.withColumn("created_date", current_timestamp())

In [10]:
final_df.filter("race_year == 2020 and race_name == 'Abu Dhabi Grand Prix'").orderBy(final_df.points.desc()).show(5)

+---------+--------------------+-------------------+----------------+---------------+-------------+------------------+--------+----+-----------+-----------+------+--------+--------------------+
|race_year|           race_name|          race_date|circuit_location|    driver_name|driver_number|driver_nationality|    team|grid|fastest_lap|  race_time|points|position|        created_date|
+---------+--------------------+-------------------+----------------+---------------+-------------+------------------+--------+----+-----------+-----------+------+--------+--------------------+
|     2020|Abu Dhabi Grand Prix|2020-12-13 13:10:00|       Abu Dhabi| Max Verstappen|           33|             Dutch|Red Bull|   1|         14|1:36:28.645|  25.0|       1|2023-08-20 11:23:...|
|     2020|Abu Dhabi Grand Prix|2020-12-13 13:10:00|       Abu Dhabi|Valtteri Bottas|           77|           Finnish|Mercedes|   2|         40|    +15.976|  18.0|       2|2023-08-20 11:23:...|
|     2020|Abu Dhabi Grand Pri

In [11]:
final_df.write.mode("overwrite").parquet(f"{presentation_folder_path}/race_results")