# Descripción

Contamos con unos datasets que corresponden a un **listado de inspecciones de sanidad en locales** (Restaurantes, supermercados, etc), junto con su respectivo riesgo para la salud. Contamos con otro dataset que nos muestra una **descripción de dicho riesgo**.

**El objetivo es cargar esos datasets bajo unas especificaciones concretas y manipularlos acorde a las instrucciones de cada ejercicio.**

Todas las operaciones necesarias están descritas en los ejercicios, aunque se valorará tareas extras por propia iniciativa del alumno. También se valorará el uso del API de DataFrame.

**La entrega será un fichero zip con tu nombre y apellidos** que contendrá:
- Este fichero `ipynb`, con las correspondientes soluciones.
- Imagen del plan de ejecución del ejercicio 4.
- Descripción del plan de ejecución que podrá estar embebido en el fichero `ipynb`.

El ejercicio 10 tienes dos opciones, puedes optar por el `10a` que es continuación de este proyecto o por el `10b`

# Descargar Datasets

In [0]:
%sh 
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/food_inspections_lite.csv' --output-dir /databricks/driver
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/risk_description.csv'  --output-dir /databricks/driver

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 7361k  100 7361k    0     0  10.5M      0 --:--:-- --:--:-- --:--:-- 10.6M
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   176  100   176    0     0    596      0 --:--:-- --:--:-- --:--:--   594


In [0]:
dbutils.fs.cp('file:/databricks/driver/food_inspections_lite.csv','dbfs:/dataset/food_inspections_lite.csv')
dbutils.fs.cp('file:/databricks/driver/risk_description.csv','dbfs:/dataset/risk_description.csv')

True

In [0]:
dbutils.fs.ls('/dataset/')

[FileInfo(path='dbfs:/dataset/Coral_cover_data.csv', name='Coral_cover_data.csv', size=5161518, modificationTime=1748072373000),
 FileInfo(path='dbfs:/dataset/OnlineRetail.csv', name='OnlineRetail.csv', size=45038728, modificationTime=1748072380000),
 FileInfo(path='dbfs:/dataset/bank.csv', name='bank.csv', size=461474, modificationTime=1746871963000),
 FileInfo(path='dbfs:/dataset/books/', name='books/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dataset/characters.csv', name='characters.csv', size=5462, modificationTime=1747469386000),
 FileInfo(path='dbfs:/dataset/departuredelays.csv', name='departuredelays.csv', size=33396236, modificationTime=1748072375000),
 FileInfo(path='dbfs:/dataset/food_inspections_lite.csv', name='food_inspections_lite.csv', size=7538077, modificationTime=1748789816000),
 FileInfo(path='dbfs:/dataset/frankenstein.txt', name='frankenstein.txt', size=421623, modificationTime=1747469385000),
 FileInfo(path='dbfs:/dataset/loan.csv', name='loan.csv', siz

In [0]:
KAFKA_BOOSTRAP_SERVER="35.227.18.205:9094"

In [0]:
checkpoint_path = "/tmp/project_spark/_checkpoint"

In [0]:
spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

# Ejercicio 1
---

1. **Crea dos dataframes, uno a partir del fichero `food_inspections_lite.csv` y otro a partir de `risk_description.csv`**
2. **Convierte esos dos dataframes a tablas delta**


In [0]:
dbutils.fs.head("/dataset/food_inspections_lite.csv")

[Truncated to first 65536 bytes]


'Inspection ID,DBA Name,AKA Name,License #,Facility Type,Risk,Address,City,State,Zip,Inspection Date,Inspection Type,Results,Violations,Latitude,Longitude,Location\n2373596,COUNTY BBQ,COUNTY BBQ,2732781,Restaurant,Risk 3 (Low),1352 W TAYLOR ST ,CHICAGO,IL,60607,06/11/2020,License,Fail,,41.86945393993995,-87.66133829204541,"(-87.66133829204541, 41.86945393993995)"\n2373587,KIDZ CREATIVE CORNER,KIDZ CREATIVE CORNER,2555611,Daycare Above and Under 2 Years,Risk 1 (High),4259 N WESTERN AVE ,CHICAGO,IL,60618,06/11/2020,License Re-Inspection,Pass,,41.95944802136064,-87.68848155320953,"(-87.68848155320953, 41.95944802136064)"\n2373578,DSD DELI,DSD DELI,2626186,DELI/GROCERY,Risk 3 (Low),5205 N MILWAUKEE ,CHICAGO,IL,60630,06/11/2020,License Re-Inspection,Pass,,41.97484853157327,-87.76682905043768,"(-87.76682905043768, 41.97484853157327)"\n2373575,DSD DELI,DSD DELI,2626178,DELI/GROCERY,Risk 1 (High),5205 N MILWAUKEE ,CHICAGO,IL,60630,06/11/2020,License Re-Inspection,Pass,,41.97484853157327,-87.76

In [0]:
dbutils.fs.head("/dataset/risk_description.csv")

'risk_id,description\n1,Este riesgo significar la clausura inmediata del local\n2,Este riesgo está cerca de convertirse en la clausura del local\n3,Necesita una mejora importante\n'

In [0]:
# Parte 1 -> Crear dataframes
food_inspections_df = spark.read.format("csv") \
  .option("sep", ",") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load("/dataset/food_inspections_lite.csv")

risk_df = spark.read.format("csv") \
  .option("sep", ",") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load("/dataset/risk_description.csv")


In [0]:
%sql
DROP TABLE IF EXISTS food_inspections;
DROP TABLE IF EXISTS risk_descriptions;

In [0]:
dbutils.fs.rm("/user/hive/warehouse/food_inspections", recurse=True)
dbutils.fs.rm("/user/hive/warehouse/risk_descriptions", recurse=True)

False

In [0]:
# Parte 2 -> Crear tablas delta
food_inspections_df \
  .withColumnsRenamed( # No le gustan los espacios :(
    {
      "Inspection ID": "Inspection_ID",
      "DBA Name": "DBA_Name",
      "AKA Name": "AKA_Name",
      "License #": "License",
      "Facility Type": "Facility_Type",
      "Inspection Date": "Inspection_Date",
      "Inspection Type": "Inspection_Type",
    }
  ) \
  .write \
  .mode("overwrite") \
  .format("delta") \
  .saveAsTable("food_inspections")

risk_df \
  .write \
  .mode("overwrite") \
  .format("delta") \
  .saveAsTable("risk_descriptions")

In [0]:
%sql
SELECT * FROM food_inspections LIMIT 10;

Inspection_ID,DBA_Name,AKA_Name,License,Facility_Type,Risk,Address,City,State,Zip,Inspection_Date,Inspection_Type,Results,Violations,Latitude,Longitude,Location
2373596,COUNTY BBQ,COUNTY BBQ,2732781,Restaurant,Risk 3 (Low),1352 W TAYLOR ST,CHICAGO,IL,60607,2020-06-11,License,Fail,,41.86945393993995,-87.66133829204541,"(-87.66133829204541, 41.86945393993995)"
2373587,KIDZ CREATIVE CORNER,KIDZ CREATIVE CORNER,2555611,Daycare Above and Under 2 Years,Risk 1 (High),4259 N WESTERN AVE,CHICAGO,IL,60618,2020-06-11,License Re-Inspection,Pass,,41.95944802136064,-87.68848155320953,"(-87.68848155320953, 41.95944802136064)"
2373578,DSD DELI,DSD DELI,2626186,DELI/GROCERY,Risk 3 (Low),5205 N MILWAUKEE,CHICAGO,IL,60630,2020-06-11,License Re-Inspection,Pass,,41.97484853157327,-87.76682905043768,"(-87.76682905043768, 41.97484853157327)"
2373575,DSD DELI,DSD DELI,2626178,DELI/GROCERY,Risk 1 (High),5205 N MILWAUKEE,CHICAGO,IL,60630,2020-06-11,License Re-Inspection,Pass,,41.97484853157327,-87.76682905043768,"(-87.76682905043768, 41.97484853157327)"
2373576,COUNTY BBQ,COUNTY BBQ,2732780,Restaurant,Risk 1 (High),1352 W TAYLOR ST,CHICAGO,IL,60607,2020-06-11,License,Fail,"3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: NO EMPLOYEE HEALTH POLICY ON PREMISES AS REQUIRED; MUST PROVIDE. INSTRUCTED THE PERSON IN CHARGE TO PROVIDE DOCUMENTATION OF EMPLOYEES RESPONSIBILITY TO REPORT ANY ILLNESS TRANSMISSIBLE THROUGH FOOD. PRIORITY FOUNDATION VIOLATION 7-38-010. | 16. FOOD-CONTACT SURFACES: CLEANED & SANITIZED - Comments: THE LOW TEMP DISH MACHINE IN THE PREP AREA IS NOT SANITIZING PROPERLY; MUST CORRECT. FOUND NO CHEMICAL READING WITH THE TEST STRIP AFTER TWO CYCLES. MACHINE MUST BE ABLE TO PROPERLY SANITIZE AT 100PPM CHLORINE. PRIORITY VIOLATION 7-38-025. | 51. PLUMBING INSTALLED; PROPER BACKFLOW DEVICES - Comments: OBSERVED THE EXPOSED HANDSINK IN THE BASEMENT PREP AREA DRAINING SLOWLY; MUST CORRECT. | 57. ALL FOOD EMPLOYEES HAVE FOOD HANDLER TRAINING - Comments: INSTRUCTED THE PERSON IN CHARGE THAT ALL FOOD HANDLERS MUST OBTAIN THE REQUIRED FOOD HANDLER TRAINING.",41.86945393993995,-87.66133829204541,"(-87.66133829204541, 41.86945393993995)"
2373547,NEW SEOUL,NEW SEOUL,62062,,Risk 3 (Low),5351 N LINCOLN AVE,CHICAGO,IL,60625,2020-06-11,Canvass,Out of Business,,41.97923607630261,-87.69252686676323,"(-87.69252686676323, 41.97923607630261)"
2373543,"CAFE UTJEHA, INC.",CAFE UTJEHA,1592606,Restaurant,Risk 2 (Medium),5350 N LINCOLN AVE,CHICAGO,IL,60625,2020-06-11,Canvass,Pass w/ Conditions,"2. CITY OF CHICAGO FOOD SERVICE SANITATION CERTIFICATE - Comments: NO CITY OF CHICAGO CERTIFIED FOOD MANAGER ON SITE DURING THE INSPECTION WHILE THE TEMPERATURE CONTROL FOR SAFETY FOODS (MILK) HAS BEEN PREPARED AND SERVED. INSTRUCTED A CITY OF CHICAGO CERTIFIED FOOD MANAGER MUST BE ON SITE AT ALL TIMES FOODS ARE BEING PREPARED AND SERVED. PRIORITY FOUNDATION VIOLATION 7-38-012.CITATION ISSUED | 3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: MANAGER PROVIDED NO PROOF OF EMPLOYEE HEALTH POLICY ON SITE. INSTRUCTED TO PROVIDE AN EMPLOYEE HEALTH POLICY THAT INCLUDES A SIGNED ACKNOWLEDGEMENT FROM EACH EMPLOYEE. PRIORITY FOUNDATION. 7.38.010. NO CITATION ISSUED. | 36. THERMOMETERS PROVIDED & ACCURATE - Comments: OBSERVED NO PROBE THERMOMETER FOR TAKING FOOD TEMPERATURES.MUST PROVIDE.(PRIORITY FOUNDATION-NO CITATION ISSUED) 7-38-005 | 44. UTENSILS, EQUIPMENT & LINENS: PROPERLY STORED, DRIED, & HANDLED - Comments: MUST REMOVE SINGLE SERVICE FOOD UTENSILS AND CUPS STORED IN KITCHEN CABINETS UNDER HAND WASHING SINK | 51. PLUMBING INSTALLED; PROPER BACKFLOW DEVICES - Comments: BACKFLOW PREVENTION DEVICES NOT LOCATED CAPPUCCINO/ESPRESSO MACHINE. MUST INSTALL BACKFLOW PREVENTION DEVICES OR AIR GAP BETWEEN THE WATER SUPPLY INLET AND THE UNIT SO IT MAY BE LOCATED TO BE SERVICED AND MAINTAINED | 55. PHYSICAL FACILITIES INSTALLED, MAINTAINED & CLEAN - Comments: OBSERVED CARPET IN LADY'S BATHROOM.MUST REMOVE TO MAINTAIN EASY CLEANABLE SURFACES",41.97918308087692,-87.69281717734684,"(-87.69281717734684, 41.979183080876915)"
2373523,MARATHON GAS,MARATHON GAS,2156705,Grocery Store,Risk 3 (Low),7100 S HALSTED ST,CHICAGO,IL,60621,2020-06-10,Canvass,Out of Business,,41.76510943191112,-87.64455695176326,"(-87.64455695176326, 41.76510943191112)"
2373490,HALSTED BOWL,HALSTED BOWL,1221174,Restaurant,Risk 2 (Medium),12345 S HALSTED ST,CHICAGO,IL,60628,2020-06-10,Canvass,Out of Business,,41.66920524810207,-87.6414739367472,"(-87.6414739367472, 41.66920524810207)"
2373485,HAROLD'S CHICKEN,HAROLD'S CHICKEN,2488469,Restaurant,Risk 2 (Medium),12020 S HALSTED ST,CHICAGO,IL,60628,2020-06-10,Canvass,Out of Business,,41.67536971040201,-87.64197314637781,"(-87.64197314637781, 41.67536971040201)"


In [0]:
%sql
SELECT * FROM risk_descriptions LIMIT 10;

risk_id,description
1,Este riesgo significar la clausura inmediata del local
2,Este riesgo está cerca de convertirse en la clausura del local
3,Necesita una mejora importante


# Ejercicio 2
**Obtén el número de inspecciones distintas con Riesgo alto `Risk 1 (High)`**

---



In [0]:
from pyspark.sql.functions import col

# Se asume que cada inspección es distinta
food_inspections_df \
  .filter(food_inspections_df.Risk == "Risk 1 (High)") \
  .groupBy(col("Risk")) \
  .count() \
  .select(
    col("Risk"),
    col("count").alias("number")
  ).display()


Risk,number
Risk 1 (High),7215



# Ejercicio 3
**A partir de los dataframes cargados anteriormente, obtén una tabla con las siguientes columnas:<br>**
1. `DBA Name`
2. `Facility Type`
3. `Risk`
4. `Risk description`

---

In [0]:
# Esto nos sirve para ver lo que nos encontramos en la columna Risk y tenerlo en cuenta en el siguiente.

food_inspections_df \
  .select(food_inspections_df.Risk) \
  .distinct().display()

Risk
""
Risk 1 (High)
All
Risk 2 (Medium)
Risk 3 (Low)


In [0]:
# Tenemos que hacer un join de los dos dataframes, pero necesitamos formatear el nivel de riesgo para encajar con el segundo
# Se pueden usar funciones que ya existen pero por mostrar otro elemento que hemos visto en clase, vamos a crear una udf

from pyspark.sql.functions import col, udf

@udf()
def get_risk_level(risk):
  if risk is not None:
    return risk.split(" ")[1] # segunda "palabra" [Risk, 1, (High)]

# No se menciona nada de ordenar, nos lo ahorramos y evitamos shuffle innecesario

food_inspections_df \
  .na.drop(subset=["Risk"]) \
  .filter(food_inspections_df.Risk != "All") \
  .withColumn("risk_level", get_risk_level(col("Risk"))) \
  .join(risk_df, col("risk_level") == risk_df.risk_id) \
  .select(
    food_inspections_df["DBA Name"],
    food_inspections_df["Facility Type"],
    col("Risk"),
    risk_df["description"].alias("Risk description")
  ).display()

DBA Name,Facility Type,Risk,Risk description
COUNTY BBQ,Restaurant,Risk 3 (Low),Necesita una mejora importante
KIDZ CREATIVE CORNER,Daycare Above and Under 2 Years,Risk 1 (High),Este riesgo significar la clausura inmediata del local
DSD DELI,DELI/GROCERY,Risk 3 (Low),Necesita una mejora importante
DSD DELI,DELI/GROCERY,Risk 1 (High),Este riesgo significar la clausura inmediata del local
COUNTY BBQ,Restaurant,Risk 1 (High),Este riesgo significar la clausura inmediata del local
NEW SEOUL,,Risk 3 (Low),Necesita una mejora importante
"CAFE UTJEHA, INC.",Restaurant,Risk 2 (Medium),Este riesgo está cerca de convertirse en la clausura del local
MARATHON GAS,Grocery Store,Risk 3 (Low),Necesita una mejora importante
HALSTED BOWL,Restaurant,Risk 2 (Medium),Este riesgo está cerca de convertirse en la clausura del local
HAROLD'S CHICKEN,Restaurant,Risk 2 (Medium),Este riesgo está cerca de convertirse en la clausura del local


# Ejercicio 4
**Accede a la Spark UI para ver el plan de ejecución. Describe cada una de las piezas/cajas que componen el plan de ejecución (Una descripción breve de una línea por caja será suficiente).**<br><br>**Recordad hacer un pantallazo del plan de ejecución analizado**

---



Vamos de abajo a arriba en la captura de pantalla `plan_ejecucion_sergio_yunta.png`:

1. Vemos abajo del todo la parte del plan correspondiente a la lectura del csv al que hace referencia `food_inspections_df` y el filtro de los valores nulos de la columna risk **junto** con el filtrado de los valores `"All"`. De momento no hay shuffle porque es una operación del filtrado.
2. Seguimos el camino y llegamos a la evaluación de nuestra UDF `get_risk_level` (representado por `BatchEvalPython`) que se forma de un filtrado de nulos (el `if ... is not None`) y una proyección de valores. Posteriormente nos indica que se vuelve a evaluar antes de la proyección para agregar una columna "auxiliar" para hacer el join a continuación.
3. En la parte de la derecha, vemos de nuevo una lectura de csv de `risk_df` donde también automáticamente hace un filtrado de nulos.
4. Pasamos al join, vemos que como era de esperar es de tipo `Broadcast Hash Join` ya que la segunda tabla (`risk_df`) es muy pequeña y por tamaño es más eficiente replicarla en todos los nodos.
5. Por último, hay una proyección de las columnas que se piden en el enunciado. Las cajas que quedan son utilidades de spark como el plan de ejecución adaptativo y las optimizaciones que realiza de forma automática.

# Ejercicio 5
**1. Obtén el número de inspecciones para cada local (columna `DBA Name`) y su resultado (columna `Results`).**<br><br>
**2. Obtén las dos locales (`DBA Name`) que más inspecciones han tenido por cada uno de los resultados**<br><br>
**3. Guarda los resultados en una nueva tabla Delta llamada `inspections_results`**

---

In [0]:
# Esto nos sirve para ver lo que nos encontramos en la columna Risk y tenerlo en cuenta en el siguiente.

food_inspections_df \
  .select(food_inspections_df.Results) \
  .distinct().display()

Results
Not Ready
Fail
No Entry
Business Not Located
Pass w/ Conditions
Out of Business
Pass


In [0]:
from pyspark.sql.functions import col

# Primer paso, obtener el número de inspecciones por local y resultado

inspections_grouped = food_inspections_df \
  .groupBy(
    food_inspections_df["DBA Name"],
    food_inspections_df["Results"]
  ) \
  .count() \
  .select(
    food_inspections_df["DBA Name"],
    col("count").alias("Num inspections"),
    food_inspections_df["Results"]
  )

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, desc

# Segundo paso, mediante window partitioning obtenemos el top dos de locales por result

by_result = Window.partitionBy("Results").orderBy(desc("Num inspections"))

inspections_results_df = inspections_grouped \
  .withColumn("inspection_row_num", row_number().over(by_result)) \
  .filter(col("inspection_row_num") <= 2) \
  .select(
    inspections_grouped["DBA Name"],
    inspections_grouped["Num inspections"],
    inspections_grouped["Results"],
  )

inspections_results_df.display()

DBA Name,Num inspections,Results
RICE THAI CAFE,1,Business Not Located
CHINA STATION,1,Business Not Located
SUBWAY,10,Fail
DUNKIN DONUTS,5,Fail
LANS,5,No Entry
LA PENA RESTAURANTE,4,No Entry
PALETERIA Y NEVERIA LA MEXICANA YOGURT AND CHURRO,5,Not Ready
SUBWAY,4,Not Ready
SUBWAY,16,Out of Business
DUNKIN DONUTS,10,Out of Business


In [0]:
%sql
DROP TABLE IF EXISTS inspections_results

In [0]:
dbutils.fs.rm("/user/hive/warehouse/inspections_results", recurse=True)

False

In [0]:
# Tercer paso, transformarlo en la tabla delta
inspections_results_df \
  .withColumnsRenamed( # No le gustan los espacios :(
    {
      "DBA Name": "DBA_Name",
      "Num inspections": "Num_inspections",
    }
  ) \
  .write \
  .mode("overwrite") \
  .format("delta") \
  .saveAsTable("inspections_results")

In [0]:
%sql
SELECT * FROM inspections_results

DBA_Name,Num_inspections,Results
RICE THAI CAFE,1,Business Not Located
CHINA STATION,1,Business Not Located
SUBWAY,10,Fail
DUNKIN DONUTS,5,Fail
LANS,5,No Entry
LA PENA RESTAURANTE,4,No Entry
PALETERIA Y NEVERIA LA MEXICANA YOGURT AND CHURRO,5,Not Ready
SUBWAY,4,Not Ready
SUBWAY,16,Out of Business
DUNKIN DONUTS,10,Out of Business


# Ejercicio 6
1. **Actualiza la tabla delta creada en el ejercicio anterios, con el valor `DBA_Name = "error"`**
2. **Restaura la tabla a su estado original**

---



In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import lit

inspections_results_table = DeltaTable.forName(spark, "inspections_results")

inspections_results_table.update(set = { "DBA_Name": lit("error") } )

In [0]:
%sql
SELECT * FROM inspections_results

DBA_Name,Num_inspections,Results
error,1,Business Not Located
error,1,Business Not Located
error,10,Fail
error,5,Fail
error,5,No Entry
error,4,No Entry
error,5,Not Ready
error,4,Not Ready
error,16,Out of Business
error,10,Out of Business


In [0]:
%sql
DESCRIBE HISTORY inspections_results

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-06-01T15:00:32Z,7726499663423251,sergio.yunta@hotmail.com,UPDATE,Map(predicate -> []),,List(4402192642578873),0601-134109-24xz8dde,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1631, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 677, numDeletionVectorsUpdated -> 0, scanTimeMs -> 62, numAddedFiles -> 1, numUpdatedRows -> 14, numAddedBytes -> 1480, rewriteTimeMs -> 615)",,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-01T15:00:20Z,7726499663423251,sergio.yunta@hotmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {}, statsOnLoad -> false)",,List(4402192642578873),0601-134109-24xz8dde,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 14, numOutputBytes -> 1631)",,Databricks-Runtime/15.4.x-scala2.12


In [0]:
inspections_results_table.restoreToVersion(0)

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [0]:
%sql
SELECT * FROM inspections_results

DBA_Name,Num_inspections,Results
RICE THAI CAFE,1,Business Not Located
CHINA STATION,1,Business Not Located
SUBWAY,10,Fail
DUNKIN DONUTS,5,Fail
LANS,5,No Entry
LA PENA RESTAURANTE,4,No Entry
PALETERIA Y NEVERIA LA MEXICANA YOGURT AND CHURRO,5,Not Ready
SUBWAY,4,Not Ready
SUBWAY,16,Out of Business
DUNKIN DONUTS,10,Out of Business


# Ejercicio 7

**Crea una aplicación son Structured Streaming que lea los datos del topic de Kafka `inspections`. La url del servidor Kafka está definida al comienzo de este notebook**

**Los datos procedentes de este topic son exactamente los mismos que estamos analizando durante todo este notebook, `Food Inspections`, así que el esquema es el mismo**

In [0]:
# Utilidad para resetear el checkpoint
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

In [0]:
KAFKA_BOOSTRAP_SERVER="35.227.18.205:9094"

In [0]:
from pyspark.sql.functions import from_json, col, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
# Si no le añadimos la opción maxOffsetsPerTrigger, explota :(
inspections_kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BOOSTRAP_SERVER) \
  .option("subscribe", "inspections") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 500) \
  .load()

"""
Esto es interesante, el esquema del ejercicio 1 no nos sirve (los tipos de datos) porque los mensajes de kafka vienen
todos con String, por lo que si ponemos otros tipos de datos que no sean string en el schema, 
vamos a tener como resultado null en dichas columnas.

Lo dejamos como string todos, y realizamos el casting luego.
"""

schema = StructType(
    [
        StructField('Inspection ID', StringType(), True),
        StructField('DBA Name', StringType(), True),
        StructField('AKA Name', StringType(), True),
        StructField('License #', StringType(), True),
        StructField('Facility Type', StringType(), True),
        StructField('Risk', StringType(), True),
        StructField('Address', StringType(), True),
        StructField('City', StringType(), True),
        StructField('State', StringType(), True),
        StructField('Zip', StringType(), True),
        StructField('Inspection Date', StringType(), True),
        StructField('Inspection Type', StringType(), True),
        StructField('Results', StringType(), True),
        StructField('Violations', StringType(), True),
        StructField('Latitude', StringType(), True),
        StructField('Longitude', StringType(), True),
        StructField('Location', StringType(), True),
    ]
)
inspections_stream = inspections_kafka_df \
  .selectExpr("CAST(value AS STRING)", "timestamp") \
  .withColumn("value", from_json("value", schema)) \
  .select(col('value.*'), col("timestamp")) \
  .withColumn("Inspection ID", col("Inspection ID").cast("int")) \
  .withColumn("License #", col("License #").cast("int")) \
  .withColumn("Zip", col("Zip").cast("int")) \
  .withColumn("Latitude", col("Latitude").cast("double")) \
  .withColumn("Longitude", col("Longitude").cast("double")) \
  .withColumn("Inspection Date", to_date(col("Inspection Date"), "MM/dd/yyyy"))

In [0]:
inspections_stream.display()

Inspection ID,DBA Name,AKA Name,License #,Facility Type,Risk,Address,City,State,Zip,Inspection Date,Inspection Type,Results,Violations,Latitude,Longitude,Location,timestamp
2373596,COUNTY BBQ,COUNTY BBQ,2732781,Restaurant,Risk 3 (Low),1352 W TAYLOR ST,CHICAGO,IL,60607.0,2020-06-11,License,Fail,,41.86945393993995,-87.66133829204541,"(-87.66133829204541, 41.86945393993995)",2025-05-18T16:47:28.588Z
2373587,KIDZ CREATIVE CORNER,KIDZ CREATIVE CORNER,2555611,Daycare Above and Under 2 Years,Risk 1 (High),4259 N WESTERN AVE,CHICAGO,IL,60618.0,2020-06-11,License Re-Inspection,Pass,,41.95944802136064,-87.68848155320953,"(-87.68848155320953, 41.95944802136064)",2025-05-18T16:47:28.598Z
2373578,DSD DELI,DSD DELI,2626186,DELI/GROCERY,Risk 3 (Low),5205 N MILWAUKEE,CHICAGO,IL,60630.0,2020-06-11,License Re-Inspection,Pass,,41.97484853157327,-87.76682905043768,"(-87.76682905043768, 41.97484853157327)",2025-05-18T16:47:28.607Z
2373575,DSD DELI,DSD DELI,2626178,DELI/GROCERY,Risk 1 (High),5205 N MILWAUKEE,CHICAGO,IL,60630.0,2020-06-11,License Re-Inspection,Pass,,41.97484853157327,-87.76682905043768,"(-87.76682905043768, 41.97484853157327)",2025-05-18T16:47:28.613Z
2373576,COUNTY BBQ,COUNTY BBQ,2732780,Restaurant,Risk 1 (High),1352 W TAYLOR ST,CHICAGO,IL,60607.0,2020-06-11,License,Fail,"3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: NO EMPLOYEE HEALTH POLICY ON PREMISES AS REQUIRED; MUST PROVIDE. INSTRUCTED THE PERSON IN CHARGE TO PROVIDE DOCUMENTATION OF EMPLOYEES RESPONSIBILITY TO REPORT ANY ILLNESS TRANSMISSIBLE THROUGH FOOD. PRIORITY FOUNDATION VIOLATION 7-38-010. | 16. FOOD-CONTACT SURFACES: CLEANED & SANITIZED - Comments: THE LOW TEMP DISH MACHINE IN THE PREP AREA IS NOT SANITIZING PROPERLY; MUST CORRECT. FOUND NO CHEMICAL READING WITH THE TEST STRIP AFTER TWO CYCLES. MACHINE MUST BE ABLE TO PROPERLY SANITIZE AT 100PPM CHLORINE. PRIORITY VIOLATION 7-38-025. | 51. PLUMBING INSTALLED; PROPER BACKFLOW DEVICES - Comments: OBSERVED THE EXPOSED HANDSINK IN THE BASEMENT PREP AREA DRAINING SLOWLY; MUST CORRECT. | 57. ALL FOOD EMPLOYEES HAVE FOOD HANDLER TRAINING - Comments: INSTRUCTED THE PERSON IN CHARGE THAT ALL FOOD HANDLERS MUST OBTAIN THE REQUIRED FOOD HANDLER TRAINING.",41.86945393993995,-87.66133829204541,"(-87.66133829204541, 41.86945393993995)",2025-05-18T16:47:28.623Z
2373547,NEW SEOUL,NEW SEOUL,62062,,Risk 3 (Low),5351 N LINCOLN AVE,CHICAGO,IL,60625.0,2020-06-11,Canvass,Out of Business,,41.97923607630261,-87.69252686676323,"(-87.69252686676323, 41.97923607630261)",2025-05-18T16:47:28.637Z
2373543,"CAFE UTJEHA, INC.",CAFE UTJEHA,1592606,Restaurant,Risk 2 (Medium),5350 N LINCOLN AVE,CHICAGO,IL,60625.0,2020-06-11,Canvass,Pass w/ Conditions,"2. CITY OF CHICAGO FOOD SERVICE SANITATION CERTIFICATE - Comments: NO CITY OF CHICAGO CERTIFIED FOOD MANAGER ON SITE DURING THE INSPECTION WHILE THE TEMPERATURE CONTROL FOR SAFETY FOODS (MILK) HAS BEEN PREPARED AND SERVED. INSTRUCTED A CITY OF CHICAGO CERTIFIED FOOD MANAGER MUST BE ON SITE AT ALL TIMES FOODS ARE BEING PREPARED AND SERVED. PRIORITY FOUNDATION VIOLATION 7-38-012.CITATION ISSUED | 3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: MANAGER PROVIDED NO PROOF OF EMPLOYEE HEALTH POLICY ON SITE. INSTRUCTED TO PROVIDE AN EMPLOYEE HEALTH POLICY THAT INCLUDES A SIGNED ACKNOWLEDGEMENT FROM EACH EMPLOYEE. PRIORITY FOUNDATION. 7.38.010. NO CITATION ISSUED. | 36. THERMOMETERS PROVIDED & ACCURATE - Comments: OBSERVED NO PROBE THERMOMETER FOR TAKING FOOD TEMPERATURES.MUST PROVIDE.(PRIORITY FOUNDATION-NO CITATION ISSUED) 7-38-005 | 44. UTENSILS, EQUIPMENT & LINENS: PROPERLY STORED, DRIED, & HANDLED - Comments: MUST REMOVE SINGLE SERVICE FOOD UTENSILS AND CUPS STORED IN KITCHEN CABINETS UNDER HAND WASHING SINK | 51. PLUMBING INSTALLED; PROPER BACKFLOW DEVICES - Comments: BACKFLOW PREVENTION DEVICES NOT LOCATED CAPPUCCINO/ESPRESSO MACHINE. MUST INSTALL BACKFLOW PREVENTION DEVICES OR AIR GAP BETWEEN THE WATER SUPPLY INLET AND THE UNIT SO IT MAY BE LOCATED TO BE SERVICED AND MAINTAINED | 55. PHYSICAL FACILITIES INSTALLED, MAINTAINED & CLEAN - Comments: OBSERVED CARPET IN LADY'S BATHROOM.MUST REMOVE TO MAINTAIN EASY CLEANABLE SURFACES",41.97918308087692,-87.69281717734684,"(-87.69281717734684, 41.979183080876915)",2025-05-18T16:47:28.641Z
2373523,MARATHON GAS,MARATHON GAS,2156705,Grocery Store,Risk 3 (Low),7100 S HALSTED ST,CHICAGO,IL,60621.0,2020-06-10,Canvass,Out of Business,,41.76510943191112,-87.64455695176326,"(-87.64455695176326, 41.76510943191112)",2025-05-18T16:47:28.649Z
2373490,HALSTED BOWL,HALSTED BOWL,1221174,Restaurant,Risk 2 (Medium),12345 S HALSTED ST,CHICAGO,IL,60628.0,2020-06-10,Canvass,Out of Business,,41.66920524810207,-87.6414739367472,"(-87.6414739367472, 41.66920524810207)",2025-05-18T16:47:28.656Z
2373485,HAROLD'S CHICKEN,HAROLD'S CHICKEN,2488469,Restaurant,Risk 2 (Medium),12020 S HALSTED ST,CHICAGO,IL,60628.0,2020-06-10,Canvass,Out of Business,,41.67536971040201,-87.64197314637781,"(-87.64197314637781, 41.67536971040201)",2025-05-18T16:47:28.665Z


# Ejercicio 8
**En base a la fuente de datos del ejercicio anterior, obtén cada 5 segundos el número de inspecciones por `Facility Type`**

In [0]:
# Utilidad para resetear el checkpoint
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

In [0]:
from pyspark.sql.functions import window

grouped_by_facility = inspections_stream \
  .groupBy(
    window(col("timestamp"), "5 seconds"),
    col("Facility Type")
  ).count()

In [0]:
grouped_by_facility.display()

window,Facility Type,count
"List(2025-05-18T17:23:15Z, 2025-05-18T17:23:20Z)",Grocery Store,3
"List(2025-05-18T16:57:45Z, 2025-05-18T16:57:50Z)",Restaurant,7
"List(2025-05-18T17:02:45Z, 2025-05-18T17:02:50Z)",School,1
"List(2025-05-18T16:59:25Z, 2025-05-18T16:59:30Z)",Grocery Store,1
"List(2025-05-18T16:47:45Z, 2025-05-18T16:47:50Z)",Grocery Store,1
"List(2025-05-18T17:24:55Z, 2025-05-18T17:25:00Z)",Restaurant,6
"List(2025-05-18T17:13:15Z, 2025-05-18T17:13:20Z)",Grocery Store,1
"List(2025-05-18T16:56:45Z, 2025-05-18T16:56:50Z)",Grocery Store,3
"List(2025-05-18T17:29:55Z, 2025-05-18T17:30:00Z)",Grocery Store,2
"List(2025-05-18T17:31:55Z, 2025-05-18T17:32:00Z)",Restaurant,8


# Ejercicio 9
**En base a la fuente de datos del ejercicio 7, obtén cada 5 segundos el número de inspecciones por `Results` de los últimos 30 segundos**

In [0]:
# Utilidad para resetear el checkpoint
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

In [0]:
grouped_by_results = inspections_stream.groupBy(
  window(col("timestamp"), "30 seconds", "5 seconds"),
  col("Results")
).count()

In [0]:
grouped_by_results.display()

window,Results,count
"List(2025-05-18T17:29:30Z, 2025-05-18T17:30:00Z)",Pass w/ Conditions,4
"List(2025-05-18T17:31:15Z, 2025-05-18T17:31:45Z)",No Entry,2
"List(2025-05-18T17:25:55Z, 2025-05-18T17:26:25Z)",Pass w/ Conditions,3
"List(2025-05-18T16:57:25Z, 2025-05-18T16:57:55Z)",Pass,13
"List(2025-05-18T17:24:20Z, 2025-05-18T17:24:50Z)",Out of Business,2
"List(2025-05-18T16:55:15Z, 2025-05-18T16:55:45Z)",Not Ready,1
"List(2025-05-18T17:32:15Z, 2025-05-18T17:32:45Z)",Pass,7
"List(2025-05-18T17:22:30Z, 2025-05-18T17:23:00Z)",Out of Business,5
"List(2025-05-18T17:30:55Z, 2025-05-18T17:31:25Z)",Pass,15
"List(2025-05-18T17:29:55Z, 2025-05-18T17:30:25Z)",Pass w/ Conditions,3


# Ejercicio 10a
1. **Actualiza la columna `Results` de la tabla delta de food inspections creada en el ejercicio 1 con el valor `No result`**
2. **Ahora que tenemos la tabla delta corrompida con el valor `No result`, vamos a resolver el problema con los datos que nos llegan de Kafka, los cuales vamos a suponer como verdad absoluta, y por tanto tendremos que actualizar en tiempor real conforme vayan llegando elementos en Kafka**.<br>

---

Se aconseja para todos los streams anteriores ya que el de este ejercicio suele hacer un uso intensivo de los recursos

In [0]:
# Utilidad para resetear el checkpoint
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

In [0]:
# Corrompemos la tabla
from delta.tables import DeltaTable

# La tabla debe existir (ejecutar ejercicio 1)
inspections_table = DeltaTable.forName(spark, "food_inspections")
inspections_table.update(set = { "Results": lit("No result") } )

In [0]:
%sql
SELECT Inspection_ID, Results FROM food_inspections LIMIT 10

Inspection_ID,Results
2373596,No result
2373587,No result
2373578,No result
2373575,No result
2373576,No result
2373547,No result
2373543,No result
2373523,No result
2373490,No result
2373485,No result


In [0]:
# Tenemos el stream preparado en el ejercicio 7, cambiamos un poco el df con las columnas cambiadas de nombre
inspections_to_fix_stream = inspections_kafka_df \
  .selectExpr("CAST(value AS STRING)") \
  .withColumn("value", from_json("value", schema)) \
  .select(col('value.*')) \
  .withColumn("Inspection ID", col("Inspection ID").cast("int")) \
  .withColumn("License #", col("License #").cast("int")) \
  .withColumn("Zip", col("Zip").cast("int")) \
  .withColumn("Latitude", col("Latitude").cast("double")) \
  .withColumn("Longitude", col("Longitude").cast("double")) \
  .withColumn("Inspection Date", to_date(col("Inspection Date"), "MM/dd/yyyy")) \
  .withColumnsRenamed(
    {
      "Inspection ID": "Inspection_ID",
      "DBA Name": "DBA_Name",
      "AKA Name": "AKA_Name",
      "License #": "License",
      "Facility Type": "Facility_Type",
      "Inspection Date": "Inspection_Date",
      "Inspection Type": "Inspection_Type",
    }
  )
  

Hice el ejercicio 10b antes que este, dejo el mismo comentario:

Para esta parte me ha parecido muy interesante esta parte de la documentación https://docs.databricks.com/gcp/en/structured-streaming/delta-lake#upsert-from-streaming-queries-using-foreachbatch que habla de actualizar o insertar en la tabla desde un streaming mediante el método foreachbatch.

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Como los datos se repiten, tenemos que "deduplicarlos"
def deduplicate(batch_df):
    window = Window.partitionBy("Inspection_ID").orderBy(col("Inspection_Date").desc())
    return batch_df.withColumn("rn", row_number().over(window)) \
                   .filter("rn = 1") \
                   .drop("rn")

# Esta función sirve para hacer un upsert de los datos que vienen del stream a nuestra tabla delta
def upsert_to_delta(microBatchOutputDF, _):
  # Se asume que existe la tabla delta de destino
  micro_batch_deduplicated = deduplicate(microBatchOutputDF)
  inspections_table = DeltaTable.forName(spark, "food_inspections")
  inspections_table.alias("t").merge(
      micro_batch_deduplicated.alias("s"), 
      "s.Inspection_ID = t.Inspection_ID"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# Pasamos la función al método foreachBatch y simulamos tiempo real bajando el trigger de procesamiento
inspections_to_fix_stream.writeStream \
  .foreachBatch(upsert_to_delta) \
  .outputMode("update") \
  .trigger(processingTime="5 seconds") \
  .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7fd018d06590>

Existen varias maneras de comprobar este ejercicio, la que he hecho yo es con la query de abajo, viendo como el número de filas con valor distinto de `No result` va aumentando hasta llegar a 10000 filas.

Para resetearlo, ir al ejercicio 1 y volver a ejecutar drop y create table (Celdas 14, 15 y 16) aunque creo que también vale con volver a actualizar la tabla delta de la celda 56. Aquí resetear el checkpoint.

In [0]:
%sql
SELECT Inspection_ID, DBA_Name, Results FROM food_inspections WHERE Results <> 'No result'

Inspection_ID,DBA_Name,Results
2290819,UVA KITCHEN AND WINE BAR,Pass w/ Conditions
2290829,TACOS Y TAMALES EL POLLO.,Pass
2290848,DELTA SKY CLUB,Pass w/ Conditions
2290852,"G.N.P.H. #EIGHT, INC.",Pass
2290861,CAFE FURAIBO JAPANESE RESTAURANT & SUSHI,Out of Business
2290864,TONYS PANTRY MART INC.,Out of Business
2290874,TACOS Y TAMALES EL POLLO INC.,Out of Business
2290883,HONEY 1 BBQ,Pass
2290885,MCDONALD'S RESTAURANT,Pass
2290886,MADISON MINI MART,Out of Business


# Ejercicio 10b
Diseñar una solución de análisis en tiempo real utilizando Apache Spark en Databricks para consumir datos de vuelos transmitidos por Kafka, almacenarlos en una tabla Delta y visualizar la posición actual de los vuelos sobre un mapa.
- Los datos de los vuelos están en un topic llamado `flights`
- Guardar en una tabla delta todos los vuelos, pero sólo una entrada por código de vuelo, de manera que si recibimos actualizaciones de posiciones del vuelo, se actualizará el registro correspondiente. Esto debe suceder en tiempo real.

---

In [0]:
# Utilidad para resetear el checkpoint
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

'/tmp/project_spark/_checkpoint'

In [0]:
KAFKA_BOOSTRAP_SERVER="35.227.18.205:9094"

In [0]:
from pyspark.sql.functions import from_json, col, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, BooleanType

flights_kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BOOSTRAP_SERVER) \
  .option("subscribe", "flights") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 500) \
  .load()
  
schema_flights = StructType([
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("last_contact", LongType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("on_ground", BooleanType(), True),
    StructField("velocity", DoubleType(), True),
    StructField("heading", DoubleType(), True),
    StructField("squawk", StringType(), True),
    StructField("spi", BooleanType(), True),
    StructField("position_source", IntegerType(), True)
])

flights_stream = flights_kafka_df \
  .selectExpr("CAST(value AS STRING)") \
  .withColumn("value", from_json(col("value"), schema_flights)) \
  .select(col('value.*'))

In [0]:
%sql
DROP TABLE IF EXISTS flights;

In [0]:
FLIGHTS_TABLE_PATH = "/user/hive/warehouse/flights"
dbutils.fs.rm(FLIGHTS_TABLE_PATH, recurse=True)

True

Para esta parte me ha parecido muy interesante esta parte de la documentación [https://docs.databricks.com/gcp/en/structured-streaming/delta-lake#upsert-from-streaming-queries-using-foreachbatch](https://docs.databricks.com/gcp/en/structured-streaming/delta-lake#upsert-from-streaming-queries-using-foreachbatch) que habla de actualizar o insertar en la tabla desde un streaming mediante el método foreachbatch.

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Como los datos se repiten, tenemos que "deduplicarlos"
def deduplicate(batch_df):
    window = Window.partitionBy("icao24").orderBy(col("last_contact").desc())
    return batch_df.withColumn("rn", row_number().over(window)) \
                   .filter("rn = 1") \
                   .drop("rn")

# Esta función sirve para hacer un upsert de los datos que vienen del stream a nuestra tabla delta
def upsert_to_delta(microBatchOutputDF, _):
  if not DeltaTable.isDeltaTable(spark, FLIGHTS_TABLE_PATH):
    microBatchOutputDF \
      .write \
      .format("delta") \
      .save(FLIGHTS_TABLE_PATH)
    spark.sql("CREATE TABLE flights USING DELTA LOCATION \'" + FLIGHTS_TABLE_PATH + "\'")
    return

  micro_batch_deduplicated = deduplicate(microBatchOutputDF)
  flights_table = DeltaTable.forPath(spark, FLIGHTS_TABLE_PATH)
  flights_table.alias("t").merge(
      micro_batch_deduplicated.alias("s"), 
      "s.icao24 = t.icao24"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

# Pasamos la función al método foreachBatch y simulamos tiempo real bajando el trigger de procesamiento
flights_stream.writeStream \
  .foreachBatch(upsert_to_delta) \
  .outputMode("update") \
  .trigger(processingTime="20 seconds") \
  .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f71de326810>

El resultado es un poco confuso, parece correcto pero al haber tantos datos, el mapa se ve lleno y no muy legible.

He tratado de limitar a varios ids de vuelo pero imposible, no consigo algo legible.

In [0]:
%sql
SELECT latitude, longitude from flights

latitude,longitude
36.1642,-4.9265
41.2912,2.0882
41.2912,2.0882
41.8286,-2.3029
36.3159,-7.8402
37.3288,-4.1372
39.386,-4.0925
39.386,-4.0925
36.8676,-2.5789
38.6698,0.873


Databricks visualization. Run in Databricks to view.

Para más información sobre los datos de entrada consultar [OpenSky Network](https://openskynetwork.github.io/opensky-api/rest.html#all-state-vectors). A continuación se muestra un pantallazo de la visualización que se pretende conseguir. Te en cuenta que esta visualización de mapa está disponible en Databricks, con lo que no necesitará importar ninguna librería externa

![Flight Map](https://raw.githubusercontent.com/masfworld/datahack_docker/ab487794745499248388b67cf574085c5d86746e/zeppelin/data/image.png)