###Ingest races.csv file

In [0]:
dbutils.widgets.text('p_data_source','')
v_data_source = dbutils.widgets.get('p_data_source')

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

#####Step 1 - Read the CSV file using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

In [0]:
races_schema = StructType(fields=[StructField("raceId",IntegerType(),False),
                                  StructField("year",IntegerType(),True),
                                  StructField("round",IntegerType(),True),
                                  StructField("circuitId",IntegerType(),True),
                                  StructField("name",StringType(),True),
                                  StructField("date",DateType(),True),
                                  StructField("time",StringType(),True),
                                  StructField("url",StringType(),True)])

In [0]:
races_df = spark.read.csv(f"{raw_folder_path}/races.csv",header=True,schema=races_schema)

In [0]:
races_df.show(10)

+------+----+-----+---------+--------------------+----------+--------+--------------------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|http://en.wikiped...|
|     6|2009|    6|        6|   Monaco Grand Prix|2009-05-24|12:00:00|http://en.wikiped...|
|     7|2009|    7|        5|  Turkish Grand Prix|2009-06-07|12:00:00|http://en.wikiped...|
|     8|2009|    8|        9|  British Grand Prix|2009-06-21|12:00:00|http://en.

##### Step 2 - Add ingestion date and race_timestamp to the dataframe

In [0]:
from pyspark.sql.functions import current_timestamp,to_timestamp,col,lit,concat

In [0]:
races_with_timestamp_df = races_df.withColumn('ingestion_date',current_timestamp()) \
                                  .withColumn('race_timestamp',to_timestamp(concat(col('date'),lit(' '),col('time')),'yyyy-MM-dd HH:mm:ss')) \
                                  .withColumn('data_source',lit(v_data_source))

In [0]:
races_with_timestamp_df.show(10)

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+-----------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|      ingestion_date|     race_timestamp|data_source|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+-----------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|2023-09-30 17:57:...|2009-03-29 06:00:00| Ergast API|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|2023-09-30 17:57:...|2009-04-05 09:00:00| Ergast API|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|2023-09-30 17:57:...|2009-04-19 07:00:00| Ergast API|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|2023-09-30 17:57:...|2009-04-26 12:00:0

##### Step 3 - Select only the columns required and rename as required

In [0]:
races_selected_df = races_with_timestamp_df.select(col('raceId').alias('race_id'),col('year').alias('race_year'),
                                                   col('round'),col('circuitId').alias('circuit_id'),
                                                   col('name'),col('ingestion_date'),
                                                   col('race_timestamp'),col('data_source'))

In [0]:
races_selected_df.show(10)

+-------+---------+-----+----------+--------------------+--------------------+-------------------+-----------+
|race_id|race_year|round|circuit_id|                name|      ingestion_date|     race_timestamp|data_source|
+-------+---------+-----+----------+--------------------+--------------------+-------------------+-----------+
|      1|     2009|    1|         1|Australian Grand ...|2023-09-30 17:57:...|2009-03-29 06:00:00| Ergast API|
|      2|     2009|    2|         2|Malaysian Grand Prix|2023-09-30 17:57:...|2009-04-05 09:00:00| Ergast API|
|      3|     2009|    3|        17|  Chinese Grand Prix|2023-09-30 17:57:...|2009-04-19 07:00:00| Ergast API|
|      4|     2009|    4|         3|  Bahrain Grand Prix|2023-09-30 17:57:...|2009-04-26 12:00:00| Ergast API|
|      5|     2009|    5|         4|  Spanish Grand Prix|2023-09-30 17:57:...|2009-05-10 12:00:00| Ergast API|
|      6|     2009|    6|         6|   Monaco Grand Prix|2023-09-30 17:57:...|2009-05-24 12:00:00| Ergast API|
|

##### Step 4 - Partition the data by Race Year and write the output to processed container in parquet file

In [0]:
races_selected_df.write.mode('overwrite').partitionBy('race_year').parquet(f"{processed_folder_path}/races")

In [0]:
dbutils.notebook.exit('Success')