In [5]:
import findspark
findspark.init

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [6]:
from  pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [15]:
results_schema = StructType(fields=[StructField("resultId",IntegerType(),False),
                                    StructField("raceId",IntegerType(),True),
                                    StructField("driverId",IntegerType(),True),
                                    StructField("constructorId",IntegerType(),True),
                                    StructField("number",IntegerType(),True),
                                    StructField("grid",IntegerType(),True),
                                    StructField("position",IntegerType(),True),
                                    StructField("positionText",StringType(),True),
                                    StructField("positionOrder",IntegerType(),True),
                                    StructField("points",FloatType(),True),
                                    StructField("laps",IntegerType(),True),
                                    StructField("time",StringType(),True),
                                    StructField("milliseconds",IntegerType(),True),
                                    StructField("fastestLap",IntegerType(),True),
                                    StructField("rank",IntegerType(),True),
                                    StructField("fastestLapTime",StringType(),True),
                                    StructField("fastestLapSpeed",FloatType(),True),
                                    StructField("statusId",StringType(),True),
                                    
    
])

In [16]:
results_df = spark.read.schema(results_schema).json('C:/Users/ACER/Documents/Burgo Juan/Udemi - Azure Databricks/results.json')

In [17]:
results_df.show()

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|          218.3|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

### Rename columns and add new columns 

In [18]:
from pyspark.sql.functions import col, current_timestamp

In [19]:
results_with_columns_df=  results_df.withColumnRenamed("resultId","result_id")\
.withColumnRenamed("raceId","race_id")\
.withColumnRenamed("driverId","driver_id")\
.withColumnRenamed("constructorId","constructor_id")\
.withColumnRenamed("positionText","position_text")\
.withColumnRenamed("positionOrder","position_order")\
.withColumnRenamed("fastestLap","fastest_lap")\
.withColumnRenamed("fastestLapTime","fastest_lap_time")\
.withColumnRenamed("fastestLapSpeed","fastest_lap_spedd")\
.withColumn("ingestion_date",current_timestamp())


In [20]:
results_with_columns_df.show()

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_spedd|statusId|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|         39|   2|        1:27.452|            218.3|       1|2022-09-06 14:34:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|         41| 

### Drop  the unwaned column

In [21]:
results_final_df=results_with_columns_df.drop(col("statusId"))

In [22]:
results_final_df.show()

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_spedd|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|         39|   2|        1:27.452|            218.3|2022-09-06 14:43:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|         41|   3|        1:27.739|          217.5

### Write to outpt to processed container in parque format

In [24]:
results_final_df.write.mode("overwrite").partitionBy("race_id").parquet('C:/Users/ACER/Documents/Burgo Juan/Udemi - Azure Databricks/results')

In [25]:
spark.read.parquet('C:/Users/ACER/Documents/Burgo Juan/Udemi - Azure Databricks/results').show()

+---------+---------+--------------+------+----+--------+-------------+--------------+------+----+----------+------------+-----------+----+----------------+-----------------+--------------------+-------+
|result_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|      time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_spedd|      ingestion_date|race_id|
+---------+---------+--------------+------+----+--------+-------------+--------------+------+----+----------+------------+-----------+----+----------------+-----------------+--------------------+-------+
|    19232|      657|           113|    14|  19|       1|            1|             1|   8.0| 200|3:49:17.27|    13757270|       null|null|              \N|             null|2022-09-06 14:45:...|    800|
|    19233|      525|           114|     9|   3|       2|            2|             2|   6.0| 200|  +1:09.95|    13827220|       null|null|              \N|             null|2022-09-06