### **Paso 5.2.7 - Ingesta del directorio "laptimes" para el directorio "2021-03-21"**

Nos permite crear e indicar parámetros en tiempo de ejecución

<center><img src="https://i.postimg.cc/Sx9fDwyH/db151.png"></center>

In [None]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [None]:
v_data_source

Out[3]: 'Eargast'

In [None]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [None]:
v_file_date

Out[5]: '2021-03-21'

In [None]:
%run "../includes/configuration"

In [None]:
%run "../includes/common_functions"

#### Paso 1 - Leer el directorio **lap_times** el cual contiene multiples archivos CSV

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
lap_times_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])

In [None]:
# El parámetro "raw_folder_path" se encuentra en el notebook "configuration"
# El parámetro "v_file_date" se encuentra en el notebook e indicamos su valor en tiempo de ejecución
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv(f"{raw_folder_path}/{v_file_date}/lap_times")
# Esto seria equivalente a la ruta: /mnt/formula1dl/raw/2021-03-21/laptimes

In [None]:
lap_times_df.show(truncate=False)

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|time    |milliseconds|
+------+--------+---+--------+--------+------------+
|841   |20      |1  |1       |1:38.109|98109       |
|841   |20      |2  |1       |1:33.006|93006       |
|841   |20      |3  |1       |1:32.713|92713       |
|841   |20      |4  |1       |1:32.803|92803       |
|841   |20      |5  |1       |1:32.342|92342       |
|841   |20      |6  |1       |1:32.605|92605       |
|841   |20      |7  |1       |1:32.502|92502       |
|841   |20      |8  |1       |1:32.537|92537       |
|841   |20      |9  |1       |1:33.240|93240       |
|841   |20      |10 |1       |1:32.572|92572       |
|841   |20      |11 |1       |1:32.669|92669       |
|841   |20      |12 |1       |1:32.902|92902       |
|841   |20      |13 |1       |1:33.698|93698       |
|841   |20      |14 |3       |1:52.075|112075      |
|841   |20      |15 |4       |1:38.385|98385       |
|841   |20      |16 |2       |1:31.548|91548  

In [None]:
lap_times_df.printSchema()

root
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- lap: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: integer (nullable = true)



#### Paso 2 - Renombrar columnas y añadir nuevas columnas
1. Renombrar driverId y raceId
2. Añadir ingestion_date con current timestamp

In [None]:
from pyspark.sql.functions import lit

In [None]:
# La función "add_ingestion_date()" se encuentra en el notebook "common_functions"
lap_times_with_ingestion_date_df = add_ingestion_date(lap_times_df)

In [None]:
lap_times_with_ingestion_date_df.show(truncate=False)

+------+--------+---+--------+--------+------------+-----------------------+
|raceId|driverId|lap|position|time    |milliseconds|ingestion_date         |
+------+--------+---+--------+--------+------------+-----------------------+
|841   |20      |1  |1       |1:38.109|98109       |2023-06-15 17:28:01.211|
|841   |20      |2  |1       |1:33.006|93006       |2023-06-15 17:28:01.211|
|841   |20      |3  |1       |1:32.713|92713       |2023-06-15 17:28:01.211|
|841   |20      |4  |1       |1:32.803|92803       |2023-06-15 17:28:01.211|
|841   |20      |5  |1       |1:32.342|92342       |2023-06-15 17:28:01.211|
|841   |20      |6  |1       |1:32.605|92605       |2023-06-15 17:28:01.211|
|841   |20      |7  |1       |1:32.502|92502       |2023-06-15 17:28:01.211|
|841   |20      |8  |1       |1:32.537|92537       |2023-06-15 17:28:01.211|
|841   |20      |9  |1       |1:33.240|93240       |2023-06-15 17:28:01.211|
|841   |20      |10 |1       |1:32.572|92572       |2023-06-15 17:28:01.211|

In [None]:
final_df = lap_times_with_ingestion_date_df.withColumnRenamed("driverId", "driver_id") \
                                           .withColumnRenamed("raceId", "race_id") \
                                           .withColumn("data_source", lit(v_data_source)) \
                                           .withColumn("file_date", lit(v_file_date))

In [None]:
final_df.show(truncate=False)

+-------+---------+---+--------+--------+------------+-----------------------+-----------+----------+
|race_id|driver_id|lap|position|time    |milliseconds|ingestion_date         |data_source|file_date |
+-------+---------+---+--------+--------+------------+-----------------------+-----------+----------+
|841    |20       |1  |1       |1:38.109|98109       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |2  |1       |1:33.006|93006       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |3  |1       |1:32.713|92713       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |4  |1       |1:32.803|92803       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |5  |1       |1:32.342|92342       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |6  |1       |1:32.605|92605       |2023-06-15 17:28:01.793|Eargast    |2021-03-21|
|841    |20       |7  |1       |1:32.502|92502       |2023-06-15 17:28:01.793|Earg

#### Paso 3 - Escribir datos en el datalake como parquet y crear la tabla **lap_times** en la base de datos **f1_processed**

In [None]:
final_df.select('race_id').distinct().collect()

Out[18]: [Row(race_id=858),
 Row(race_id=897),
 Row(race_id=879),
 Row(race_id=883),
 Row(race_id=898),
 Row(race_id=853),
 Row(race_id=918),
 Row(race_id=857),
 Row(race_id=876),
 Row(race_id=847),
 Row(race_id=874),
 Row(race_id=842),
 Row(race_id=914),
 Row(race_id=844),
 Row(race_id=908),
 Row(race_id=916),
 Row(race_id=860),
 Row(race_id=926),
 Row(race_id=875),
 Row(race_id=855),
 Row(race_id=861),
 Row(race_id=882),
 Row(race_id=906),
 Row(race_id=912),
 Row(race_id=896),
 Row(race_id=852),
 Row(race_id=887),
 Row(race_id=867),
 Row(race_id=881),
 Row(race_id=868),
 Row(race_id=846),
 Row(race_id=871),
 Row(race_id=886),
 Row(race_id=904),
 Row(race_id=931),
 Row(race_id=845),
 Row(race_id=929),
 Row(race_id=849),
 Row(race_id=843),
 Row(race_id=854),
 Row(race_id=862),
 Row(race_id=903),
 Row(race_id=891),
 Row(race_id=850),
 Row(race_id=880),
 Row(race_id=870),
 Row(race_id=900),
 Row(race_id=884),
 Row(race_id=890),
 Row(race_id=930),
 Row(race_id=877),
 Row(race_id=856),
 Ro

In [None]:
for race_id_list in final_df.select('race_id').distinct().collect():
  print(race_id_list)

Row(race_id=858)
Row(race_id=897)
Row(race_id=879)
Row(race_id=883)
Row(race_id=898)
Row(race_id=853)
Row(race_id=918)
Row(race_id=857)
Row(race_id=876)
Row(race_id=847)
Row(race_id=874)
Row(race_id=842)
Row(race_id=914)
Row(race_id=844)
Row(race_id=908)
Row(race_id=916)
Row(race_id=860)
Row(race_id=926)
Row(race_id=875)
Row(race_id=855)
Row(race_id=861)
Row(race_id=882)
Row(race_id=906)
Row(race_id=912)
Row(race_id=896)
Row(race_id=852)
Row(race_id=887)
Row(race_id=867)
Row(race_id=881)
Row(race_id=868)
Row(race_id=846)
Row(race_id=871)
Row(race_id=886)
Row(race_id=904)
Row(race_id=931)
Row(race_id=845)
Row(race_id=929)
Row(race_id=849)
Row(race_id=843)
Row(race_id=854)
Row(race_id=862)
Row(race_id=903)
Row(race_id=891)
Row(race_id=850)
Row(race_id=880)
Row(race_id=870)
Row(race_id=900)
Row(race_id=884)
Row(race_id=890)
Row(race_id=930)
Row(race_id=877)
Row(race_id=856)
Row(race_id=928)
Row(race_id=869)
Row(race_id=885)
Row(race_id=907)
Row(race_id=848)
Row(race_id=895)
Row(race_id=89

In [None]:
for race_id_list in final_df.select('race_id').distinct().collect():
  print(race_id_list.race_id)

858
897
879
883
898
853
918
857
876
847
874
842
914
844
908
916
860
926
875
855
861
882
906
912
896
852
887
867
881
868
846
871
886
904
931
845
929
849
843
854
862
903
891
850
880
870
900
884
890
930
877
856
928
869
885
907
848
895
893
866
911
933
863
859
899
865
913
872
888
910
902
909
892
905
901
915
873
894
864
851
927
932
841
878
211
193
939
210
183
192
159
236
223
222
209
230
225
232
233
190
224
177
185
206
212
182
218
205
168
178
142
164
169
944
235
191
163
227
936
165
207
179
189
937
197
202
231
175
196
948
217
229
220
173
161
176
162
238
171
194
166
234
239
938
216
181
945
167
237
160
219
143
226
214
941
195
221
203
141
943
200
145
940
228
213
170
188
204
198
199
158
201
942
184
186
215
174
172
144
949
934
180
208
187
31
961
950
34
28
26
27
44
12
350
22
346
47
353
355
955
1
52
13
348
6
16
3
40
20
340
339
48
5
959
19
41
347
43
15
37
9
17
35
960
4
8
39
23
49
7
958
343
956
51
957
69
10
50
951
45
952
38
25
24
70
351
29
21
338
352
345
32
341
953
354
33
11
68
14
344
349
342
42
337
2


In [None]:
n = 0
for race_id_list in final_df.select('race_id').distinct().collect():
  n = n + 1
print(n)

454


1. Si no hubiesemos utilizado la sentencia **IF** y la tabla **f1_processed.lap_times** no hubiese existido, nos hubiese devuelto un **ERROR**. Es por eso que utilizamos el **IF**
2. **spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")** nos permite utilizar la función **insertInto** en modo **overwrite** y asi SOLO SOBRESCRIBIR las particiones que sean coincidentes. Tengo un ejemplo en mi material de **Apache Spark con Python**
3. Por lo tanto, si la tabla ya existe, solo sobrescribirá las particiones coincidentes
4. Y si no existe la tabla, la creará
5. Como estamos ingestando el directorio de archivos CSV **lap_times** del directorio **2021-03-21**, existen 454 particiones
6. Con este método, podemos EJECUTAR EL NOTEBOOK CUANTAS VECES QUERAMOS Y NO SE DUPLICARAN LOS DATOS

In [None]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

In [None]:
final_df = final_df.select("driver_id","lap","position","time","milliseconds","ingestion_date","data_source","file_date","race_id")

In [None]:
if (spark._jsparkSession.catalog().tableExists("f1_processed.lap_times")):
  final_df.write.mode('overwrite').insertInto("f1_processed.lap_times")
else:
  final_df.write.partitionBy('race_id').format('parquet').saveAsTable("f1_processed.lap_times")

In [None]:
spark.read.parquet("/mnt/formula1dl/processed/lap_times").show(truncate=False)

+---------+---+--------+--------+------------+-----------------------+-----------+----------+-------+
|driver_id|lap|position|time    |milliseconds|ingestion_date         |data_source|file_date |race_id|
+---------+---+--------+--------+------------+-----------------------+-----------+----------+-------+
|4        |1  |1       |1:22.551|82551       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |2  |1       |1:18.209|78209       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |3  |1       |1:17.071|77071       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |4  |1       |1:16.751|76751       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |5  |1       |1:16.472|76472       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |6  |1       |1:16.826|76826       |2023-06-15 17:28:14.555|Eargast    |2021-03-21|59     |
|4        |7  |1       |1:16.698|76698       |2023-06-15 17:28:14.555|Eargast    |

In [None]:
%sql
SELECT * FROM f1_processed.lap_times;

driver_id,lap,position,time,milliseconds,ingestion_date,data_source,file_date,race_id
4,1,1,1:22.551,82551,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,2,1,1:18.209,78209,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,3,1,1:17.071,77071,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,4,1,1:16.751,76751,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,5,1,1:16.472,76472,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,6,1,1:16.826,76826,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,7,1,1:16.698,76698,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,8,1,1:16.854,76854,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,9,1,1:16.734,76734,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59
4,10,1,1:15.924,75924,2023-06-15T17:28:14.555+0000,Eargast,2021-03-21,59


In [None]:
%sql
-- Vemos que tenemos la data hasta la race_id = 1047
SELECT race_id, COUNT(1)
FROM f1_processed.lap_times
GROUP BY race_id
ORDER BY race_id DESC

race_id,count(1)
1047,1043
1046,1531
1045,1016
1044,1076
1043,1128
1042,1288
1041,1017
1040,946
1039,778
1038,924


In [None]:
%sql
DESCRIBE FORMATTED f1_processed.lap_times;

col_name,data_type,comment
driver_id,int,
lap,int,
position,int,
time,string,
milliseconds,int,
ingestion_date,timestamp,
data_source,string,
file_date,string,
race_id,int,
# Partition Information,,


<center><img src="https://i.postimg.cc/Dfp89bw7/db118.png"></center>

In [None]:
dbutils.notebook.exit("Success")

Success