In [0]:
dbutils.widgets.text("p_file_date","")
v_file_date=dbutils.widgets.get("p_file_date")

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source=dbutils.widgets.get("p_data_source")

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DateType
name_schema=StructType(fields=[StructField('forename',StringType(),True),StructField('surname',StringType(),True)])
driver_schema=StructType(fields=[StructField("driverId",IntegerType(),True),StructField("driverRef",StringType(),True),StructField("number",IntegerType(),True),StructField("code",StringType(),True),StructField("name",name_schema),StructField('dob',DateType(),True),StructField('nationality',StringType(),True),StructField('url',StringType(),True)])


In [0]:
drivers_df=spark.read.schema(driver_schema).json(f"/mnt/varunstorage321/raw/{v_file_date}/drivers.json")
drivers_df.show()
drivers_df.printSchema()

+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code|                name|       dob|nationality|                 url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|  NULL| HEI|    {Nick, Heidfeld}|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     {Nico, Rosberg}|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO|  {Fernando, Alonso}|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|  NULL| KOV|{Heikki, Kovalainen}|1981-10-19|    Finnish|http://en.wikiped...|
|       6|  nakajima|  NULL| NAK|  {Kazuki, Nakajima}|1985-01-11|   Japanese|http://en.wikiped...|
|       7|  bourdais|  NULL| BOU|{Sébastien, Bourd...|1979-02-28|     French|http://en.wikiped...|
|       8|

In [0]:
display(drivers_df)

In [0]:
from pyspark.sql.functions import current_timestamp,concat,lit,col
drivers_with_columns_df=drivers_df.withColumnRenamed('driverId',"driver_id").withColumnRenamed('driverRef','driver_ref').withColumn('ingestion_date',current_timestamp()).withColumn('name',concat(col('name.forename'),lit(' '),col('name.surname'))).withColumn('data_source',lit(v_data_source)).withColumn('file_date',lit(v_file_date))

In [0]:
drivers_with_columns_df.show()

+---------+----------+------+----+------------------+----------+-----------+--------------------+--------------------+-----------+----------+
|driver_id|driver_ref|number|code|              name|       dob|nationality|                 url|      ingestion_date|data_source| file_date|
+---------+----------+------+----+------------------+----------+-----------+--------------------+--------------------+-----------+----------+
|        1|  hamilton|    44| HAM|    Lewis Hamilton|1985-01-07|    British|http://en.wikiped...|2025-01-27 07:30:...|    testing|2021-04-18|
|        2|  heidfeld|  NULL| HEI|     Nick Heidfeld|1977-05-10|     German|http://en.wikiped...|2025-01-27 07:30:...|    testing|2021-04-18|
|        3|   rosberg|     6| ROS|      Nico Rosberg|1985-06-27|     German|http://en.wikiped...|2025-01-27 07:30:...|    testing|2021-04-18|
|        4|    alonso|    14| ALO|   Fernando Alonso|1981-07-29|    Spanish|http://en.wikiped...|2025-01-27 07:30:...|    testing|2021-04-18|
|     

In [0]:
drivers_final_df=drivers_with_columns_df.drop(col('url'))
drivers_final_df.show()

+---------+----------+------+----+------------------+----------+-----------+--------------------+-----------+----------+
|driver_id|driver_ref|number|code|              name|       dob|nationality|      ingestion_date|data_source| file_date|
+---------+----------+------+----+------------------+----------+-----------+--------------------+-----------+----------+
|        1|  hamilton|    44| HAM|    Lewis Hamilton|1985-01-07|    British|2025-01-27 07:30:...|    testing|2021-04-18|
|        2|  heidfeld|  NULL| HEI|     Nick Heidfeld|1977-05-10|     German|2025-01-27 07:30:...|    testing|2021-04-18|
|        3|   rosberg|     6| ROS|      Nico Rosberg|1985-06-27|     German|2025-01-27 07:30:...|    testing|2021-04-18|
|        4|    alonso|    14| ALO|   Fernando Alonso|1981-07-29|    Spanish|2025-01-27 07:30:...|    testing|2021-04-18|
|        5|kovalainen|  NULL| KOV| Heikki Kovalainen|1981-10-19|    Finnish|2025-01-27 07:30:...|    testing|2021-04-18|
|        6|  nakajima|  NULL| NA

In [0]:
#drivers_final_df.write.mode("overwrite").parquet('/mnt/varunstorage321/processed/drivers')

In [0]:
#drivers_final_df.write.mode('overwrite').format('parquet').saveAsTable('f1_processed.drivers')

In [0]:
drivers_final_df.write.mode('overwrite').format('delta').saveAsTable('f1_processed.drivers')

In [0]:
%sql
  Select * from f1_processed.drivers

In [0]:
%sql
desc history f1_processed.drivers

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-01-27T07:30:02Z,5539008212065545,vishu.narang@outlook.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3886820591457769),0104-100303-jdstnosa,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 853, numOutputBytes -> 29895)",,Databricks-Runtime/15.4.x-scala2.12
2,2025-01-27T07:26:39Z,5539008212065545,vishu.narang@outlook.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3886820591457769),0104-100303-jdstnosa,1.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 853, numOutputBytes -> 29895)",,Databricks-Runtime/15.4.x-scala2.12
1,2025-01-27T07:26:23Z,5539008212065545,vishu.narang@outlook.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3886820591457769),0104-100303-jdstnosa,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 853, numOutputBytes -> 29895)",,Databricks-Runtime/15.4.x-scala2.12
0,2025-01-27T07:25:49Z,5539008212065545,vishu.narang@outlook.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3886820591457769),0104-100303-jdstnosa,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 853, numOutputBytes -> 29895)",,Databricks-Runtime/15.4.x-scala2.12


In [0]:
%sql
Select count(*) from f1_processed.drivers TIMESTAMP AS OF '2025-01-27T07:26:39.000+00:00'

count(1)
853


In [0]:
dbutils.notebook.exit("success")