## Import Libraries

In [1]:
# Import Necessary Libraries
import os
import hashlib
import urllib.request
import json
from datetime import timedelta, date

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

from delta.tables import DeltaTable

## Initiate Spark Session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


23/10/10 14:36:56 WARN Utils: Your hostname, OASIS-CORP.local resolves to a loopback address: 127.0.0.1; using 192.168.225.160 instead (on interface en0)
23/10/10 14:36:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/oasis/sources/spark-3.2.0/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/oasis/.ivy2/cache
The jars for the packages stored in: /Users/oasis/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-626de8e2-d36b-48bb-9140-d99a5612330f;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.0.0 in central
	found io.delta#delta-storage;2.0.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 228ms :: artifacts dl 10ms
	:: modules in use:
	io.delta#delta-core_2.12;2.0.0 from central in [default]
	io.delta#delta-storage;2.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|ev

## Data Lake Paths

In [3]:
raw_layer = "s3a://oasiscorp-raw/formula-oasis"

## Deifine Schema

In [4]:
races_schema = StructType(fields=[StructField("raceID", StringType(), False),
                                  StructField("year", StringType(), True),
                                  StructField("round", StringType(), True),
                                    StructField("circuitId", StringType(), True),
                                    StructField("name", StringType(), True),
                                    StructField("date", StringType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("url", StringType(), True)])

## Read the data and specify the schema in datareader API

In [5]:
# Read the data from the raw layer
races_sdf = spark.read \
                .option("header", "true") \
                .schema(races_schema) \
                .csv(f"{raw_layer}/races.csv")

23/10/10 14:37:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [6]:
races_sdf.show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+------+----+-----+---------+--------------------+----------+--------+--------------------+
|raceID|year|round|circuitId|                name|      date|    time|                 url|
+------+----+-----+---------+--------------------+----------+--------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00|http://en.wikiped...|
|     6|2009|    6|        6|   Monaco Grand Prix|2009-05-24|12:00:00|http://en.wikiped...|
|     7|2009|    7|        5|  Turkish Grand Prix|2009-06-07|12:00:00|http://en.wikiped...|
|     8|2009|    8|        9|  British Grand Prix|2009-06-21|12:00:00|http://en.

                                                                                

In [7]:
races_sdf.columns

['raceID', 'year', 'round', 'circuitId', 'name', 'date', 'time', 'url']

In [8]:
# Assuming you have a DataFrame named 'races_sdf'
races_sdf_curated = races_sdf.withColumn("ingestion_timestamp", current_timestamp()) \
                                .withColumn("race_timestamp", to_timestamp(concat(col("date"), lit(" "), col("time")), 'yyyy-MM-dd HH:mm:ss'))

races_sdf_curated.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|raceID|year|round|circuitId|                name|      date|    time|                 url| ingestion_timestamp|     race_timestamp|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------------------+-------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|2023-10-10 14:37:...|2009-03-29 06:00:00|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|2023-10-10 14:37:...|2009-04-05 09:00:00|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|07:00:00|http://en.wikiped...|2023-10-10 14:37:...|2009-04-19 07:00:00|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|12:00:00|http://en.wikiped...|2023-10-10 14:37:...|2009-04-26 12:00:00|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|12:00:00

                                                                                

In [9]:
from pyspark.sql.functions import col

races_sdf_select = races_sdf_curated.select(
    col("raceID").alias("race_id"),
    col("year").alias("race_year"),
    col("circuitId").alias("circuit_id"),
    col("round"),
    col("name"),
    col("ingestion_timestamp").alias("ingested_date"),
    col("race_timestamp")
)

races_sdf_select.show(10)

[Stage 2:>                                                          (0 + 1) / 1]

+-------+---------+----------+-----+--------------------+--------------------+-------------------+
|race_id|race_year|circuit_id|round|                name|       ingested_date|     race_timestamp|
+-------+---------+----------+-----+--------------------+--------------------+-------------------+
|      1|     2009|         1|    1|Australian Grand ...|2023-10-10 14:37:...|2009-03-29 06:00:00|
|      2|     2009|         2|    2|Malaysian Grand Prix|2023-10-10 14:37:...|2009-04-05 09:00:00|
|      3|     2009|        17|    3|  Chinese Grand Prix|2023-10-10 14:37:...|2009-04-19 07:00:00|
|      4|     2009|         3|    4|  Bahrain Grand Prix|2023-10-10 14:37:...|2009-04-26 12:00:00|
|      5|     2009|         4|    5|  Spanish Grand Prix|2023-10-10 14:37:...|2009-05-10 12:00:00|
|      6|     2009|         6|    6|   Monaco Grand Prix|2023-10-10 14:37:...|2009-05-24 12:00:00|
|      7|     2009|         5|    7|  Turkish Grand Prix|2023-10-10 14:37:...|2009-06-07 12:00:00|
|      8| 

                                                                                

In [10]:
# print Schema 

races_sdf_select.printSchema()

root
 |-- race_id: string (nullable = true)
 |-- race_year: string (nullable = true)
 |-- circuit_id: string (nullable = true)
 |-- round: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ingested_date: timestamp (nullable = false)
 |-- race_timestamp: timestamp (nullable = true)



## Write Data to S3 in parquet format

In [11]:
processed_layer = "s3a://oasiscorp-curated/formula-oasis"

In [12]:
races_sdf_select.write \
                    .mode("overwrite") \
                        .partitionBy("race_year") \
                            .parquet(f"{processed_layer}/races")

                                                                                

## Read Data Back From S3

In [13]:
# Read From Parquet

races_df = spark.read.parquet(f"{processed_layer}/races")
races_df.show(10)

[Stage 6:>                                                          (0 + 1) / 1]

+-------+----------+-----+--------------------+--------------------+-------------------+---------+
|race_id|circuit_id|round|                name|       ingested_date|     race_timestamp|race_year|
+-------+----------+-----+--------------------+--------------------+-------------------+---------+
|   1053|        21|    2|Emilia Romagna Gr...|2023-10-10 14:38:...|2021-04-18 13:00:00|     2021|
|   1052|         3|    1|  Bahrain Grand Prix|2023-10-10 14:38:...|2021-03-28 15:00:00|     2021|
|   1051|         1|   21|Australian Grand ...|2023-10-10 14:38:...|2021-11-21 06:00:00|     2021|
|   1054|        20|    3|                 TBC|2023-10-10 14:38:...|               null|     2021|
|   1055|         4|    4|  Spanish Grand Prix|2023-10-10 14:38:...|2021-05-09 13:00:00|     2021|
|   1056|         6|    5|   Monaco Grand Prix|2023-10-10 14:38:...|2021-05-23 13:00:00|     2021|
|   1057|        73|    6|Azerbaijan Grand ...|2023-10-10 14:38:...|2021-06-06 12:00:00|     2021|
|   1058| 

                                                                                

In [15]:
# grab distinct race years 
races_df.select("race_year").distinct().show()



+---------+
|race_year|
+---------+
|     2018|
|     2019|
|     2012|
|     2016|
|     2017|
|     2021|
|     2015|
|     2013|
|     2014|
|     2020|
|     2010|
|     2011|
|     2007|
|     2006|
|     2004|
|     2009|
|     2005|
|     2008|
|     1977|
|     1978|
+---------+
only showing top 20 rows



                                                                                