In [10]:
%load_ext nb_black
import pyspark
from pyspark import SparkContext, SparkConf

conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black


<IPython.core.display.Javascript object>

In [11]:
from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    IntegerType,
    DoubleType,
)

circuits_schema = StructType(
    fields=[
        StructField("circuitId", IntegerType(), False),
        StructField("circuitRef", StringType(), True),
        StructField("name", StringType(), True),
        StructField("location", StringType(), True),
        StructField("country", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lng", DoubleType(), True),
        StructField("alt", IntegerType(), True),
        StructField("url", StringType(), True),
    ]
)

<IPython.core.display.Javascript object>

In [12]:
# reading csv file using spark dataframe reader'

circuits_df = (
    spark.read.format("csv")
    .option("header", True)
    .schema(circuits_schema)
    .load("../Data/circuits.csv")
)

<IPython.core.display.Javascript object>

In [13]:
# selecting the required column (droping url and rename the column this is one way)
from pyspark.sql.functions import col

circuits_selected_df = circuits_df.select(
    col("circuitId").alias("circuit_id"),
    col("circuitRef"),
    col("name"),
    col("location"),
    col("country"),
    col("lat"),
    col("lng"),
    col("alt"),
)

<IPython.core.display.Javascript object>

In [14]:
# Renaming a column
circuits_renamed_df = (
    circuits_selected_df.withColumnRenamed("circuitRef", "circuits_ref")
    .withColumnRenamed("lat", "latitude")
    .withColumnRenamed("lng", "longitude")
    .withColumnRenamed("alt", "altitude")
)

<IPython.core.display.Javascript object>

In [15]:
# adding igestion date to dataframe
from pyspark.sql.functions import current_timestamp, lit

circuits_final_df = circuits_renamed_df.withColumn(
    "ingestion_date", current_timestamp()
)
# .withColumn("env", lit("Production")) #to add column with values

<IPython.core.display.Javascript object>

In [16]:
circuits_final_df.show(n=7)

+----------+------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|circuit_id|circuits_ref|                name|    location|  country|latitude|longitude|altitude|      ingestion_date|
+----------+------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|         1| albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|2022-01-24 00:20:...|
|         2|      sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|      18|2022-01-24 00:20:...|
|         3|     bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|       7|2022-01-24 00:20:...|
|         4|   catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|     109|2022-01-24 00:20:...|
|         5|    istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|     130|2022-01-24 00:20:...|
|         6|      monaco|   Circuit de Monaco| M

<IPython.core.display.Javascript object>

In [150]:
# Witing data to datalake as parquet form can be done in databricks
# circuits_final_df.write.mode("overwrite").parquet("circuits")

<IPython.core.display.Javascript object>