In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .appName('Apache Spark Bogota - Demo') \
    .getOrCreate()

In [2]:
spark.version

'3.0.0'

In [3]:
df = spark.read.parquet('covid_parquet')

In [4]:
display(df.show(truncate=False))

+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion  |edad|sexo|tipo       |pais_procedencia         |
+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|1      |06/03/2020       |Bogotá          |Bogotá D.C.          |Recuperado|19  |F   |Importado  |Italia                   |
|2      |09/03/2020       |Buga            |Valle del Cauca      |Recuperado|34  |M   |Importado  |España                   |
|3      |09/03/2020       |Medellín        |Antioquia            |Recuperado|50  |F   |Importado  |España                   |
|4      |11/03/2020       |Medellín        |Antioquia            |Recuperado|55  |M   |Relacionado|Colombia                 |
|5      |11/03/2020       |Medellín        |Antioquia            |Recuperado|25  |M   |Relacionado|Colombia           

None

In [5]:
print(df.printSchema())

root
 |-- id_caso: string (nullable = true)
 |-- fecha_diagnostico: string (nullable = true)
 |-- ciudad_ubicacion: string (nullable = true)
 |-- departamento_distrito: string (nullable = true)
 |-- atencion: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- pais_procedencia: string (nullable = true)

None


In [6]:
df.createOrReplaceTempView("covid_parquet")

In [7]:
display(spark.sql("select count(1) from covid_parquet").show())

+--------+
|count(1)|
+--------+
|    1161|
+--------+



None

In [9]:
import random
import shutil
from pyspark.sql.functions import *
from pyspark.sql.types import *

def random_checkpoint_dir():
    return "covid_parquet/chkpt/%s" % str(random.randint(0, 10000))

@udf(returnType=StringType())
def random_ciudad_ubicacion():
    return str(random.choice(["Bogotá", "Medellín", "Itagüí", "Cartagena", "Palmira", "Neiva", "Villavicencio"]))

@udf(returnType=StringType())
def random_departamento_distrito():
    return str(random.choice(["Bogotá D.C.", "Antioquia", "Valle del Cauca", "Meta", "Huila", "Cartagena D.T. y C"]))

@udf(returnType=StringType())
def random_atencion():
    return str(random.choice(["Recuperado", "Casa", "Hospital", "Hospital UCI"]))

@udf(returnType=StringType())
def random_edad():
    return str(random.randint(1, 100))

@udf(returnType=StringType())
def random_sexo():
    return str(random.choice(["F", "M"]))

@udf(returnType=StringType())
def random_tipo():
    return str(random.choice(["Importado", "Relacionado", "En estudio"]))

@udf(returnType=StringType())
def random_pais_procedencia():
    return str(random.choice(["Colombia", "España", "Estados Unidos", "Italia"]))

# Creamos una flujo de datos
def generate_include_data_stream_parquet(tbl_format, tbl_path):
  
  stream_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("id_caso", (1161 + col("value")).cast(StringType())) \
    .withColumn("fecha_diagnostico", current_date().cast(StringType())) \
    .withColumn("ciudad_ubicacion", random_ciudad_ubicacion()) \
    .withColumn("departamento_distrito", random_departamento_distrito()) \
    .withColumn("atencion", random_atencion()) \
    .withColumn("edad", random_edad()) \
    .withColumn("sexo", random_sexo()) \
    .withColumn("tipo", random_tipo()) \
    .withColumn("pais_procedencia", random_pais_procedencia())

  query = stream_data.writeStream \
    .format(tbl_format) \
    .option("checkpointLocation", random_checkpoint_dir()) \
    .trigger(processingTime = "10 seconds") \
    .start(tbl_path)

  return query

# Function to stop all streaming queries 
def stop_all_streams():
    # Stop all the streams
    print("Stopping all streams")
    for s in spark.streams.active:
        s.stop()
    print("Stopped all streams")
    print("Deleting checkpoints")
    shutil.rmtree("covid_parquet/chkpt")
    print("Deleted checkpoints")

In [10]:
stream = generate_include_data_stream_parquet(
    tbl_format = "parquet", 
    tbl_path = "covid_parquet")

In [21]:
df2 = spark.read.format("parquet").load("covid_parquet")

In [22]:
print(df2.count())

440


In [18]:
display(df2.show(truncate=False))

+-----------------------+-----+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+
|timestamp              |value|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion    |edad|sexo|tipo       |pais_procedencia|
+-----------------------+-----+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+
|2020-07-29 18:56:19.818|90   |1251   |2020-07-29       |Villavicencio   |Valle del Cauca      |Recuperado  |93  |F   |Importado  |España          |
|2020-07-29 18:56:20.018|91   |1252   |2020-07-29       |Bogotá          |Valle del Cauca      |Casa        |99  |M   |En estudio |España          |
|2020-07-29 18:56:20.218|92   |1253   |2020-07-29       |Villavicencio   |Valle del Cauca      |Hospital    |31  |M   |En estudio |España          |
|2020-07-29 18:56:20.418|93   |1254   |2020-07-29       |Itagüí          |Antioquia            |Recuperado

None

In [19]:
display(df2.printSchema())

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- id_caso: string (nullable = true)
 |-- fecha_diagnostico: string (nullable = false)
 |-- ciudad_ubicacion: string (nullable = true)
 |-- departamento_distrito: string (nullable = true)
 |-- atencion: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- pais_procedencia: string (nullable = true)



None

In [23]:
stop_all_streams()

Stopping all streams
Stopped all streams
Deleting checkpoints
Deleted checkpoints


In [24]:
df3 = spark.read.option("mergeSchema", "true").format("parquet").load("covid_parquet/*")

In [25]:
print(df3.count())

1651


In [26]:
print(df3.printSchema())

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- id_caso: string (nullable = true)
 |-- fecha_diagnostico: string (nullable = true)
 |-- ciudad_ubicacion: string (nullable = true)
 |-- departamento_distrito: string (nullable = true)
 |-- atencion: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- pais_procedencia: string (nullable = true)

None


In [27]:
print(df3.show(truncate=False))

+---------+-----+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|timestamp|value|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion  |edad|sexo|tipo       |pais_procedencia         |
+---------+-----+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|null     |null |1      |06/03/2020       |Bogotá          |Bogotá D.C.          |Recuperado|19  |F   |Importado  |Italia                   |
|null     |null |2      |09/03/2020       |Buga            |Valle del Cauca      |Recuperado|34  |M   |Importado  |España                   |
|null     |null |3      |09/03/2020       |Medellín        |Antioquia            |Recuperado|50  |F   |Importado  |España                   |
|null     |null |4      |11/03/2020       |Medellín        |Antioquia            |Recuperado|55  |M   |Relacionado|Colombia                 |
|null 

In [28]:
df_delta = spark.read.format("delta").load("covid_delta")
df_delta.createOrReplaceTempView("covid_delta")

In [37]:
display(spark.sql("select count(1) from covid_delta").show())

+--------+
|count(1)|
+--------+
|    6521|
+--------+



None

In [30]:
print(df_delta.printSchema())

root
 |-- id_caso: string (nullable = true)
 |-- fecha_diagnostico: string (nullable = true)
 |-- ciudad_ubicacion: string (nullable = true)
 |-- departamento_distrito: string (nullable = true)
 |-- atencion: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- pais_procedencia: string (nullable = true)

None


In [31]:
display(spark.sql("select * from covid_delta").show(truncate=False))

+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion  |edad|sexo|tipo       |pais_procedencia         |
+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|1      |06/03/2020       |Bogotá          |Bogotá D.C.          |Recuperado|19  |F   |Importado  |Italia                   |
|2      |09/03/2020       |Buga            |Valle del Cauca      |Recuperado|34  |M   |Importado  |España                   |
|3      |09/03/2020       |Medellín        |Antioquia            |Recuperado|50  |F   |Importado  |España                   |
|4      |11/03/2020       |Medellín        |Antioquia            |Recuperado|55  |M   |Relacionado|Colombia                 |
|5      |11/03/2020       |Medellín        |Antioquia            |Recuperado|25  |M   |Relacionado|Colombia           

None

In [32]:
spark.readStream.format("delta").load("covid_delta").createOrReplaceTempView("covid_delta_stream")

spark.sql("select count(*) from covid_delta_stream") \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x117964760>

In [33]:
stream2 = generate_include_data_stream_parquet(
    tbl_format = "delta", 
    tbl_path = "covid_delta")

In [34]:
# Creamos una flujo de datos corregido, elegimos columnas de acuerdo al esquema
def generate_include_data_stream_parquet_fix(tbl_format, tbl_path):
  
  stream_data = spark.readStream.format("rate").option("rowsPerSecond", 80).load() \
    .withColumn("id_caso", (1161 + col("value")).cast(StringType())) \
    .withColumn("fecha_diagnostico", current_date().cast(StringType())) \
    .withColumn("ciudad_ubicacion", random_ciudad_ubicacion()) \
    .withColumn("departamento_distrito", random_departamento_distrito()) \
    .withColumn("atencion", random_atencion()) \
    .withColumn("edad", random_edad()) \
    .withColumn("sexo", random_sexo()) \
    .withColumn("tipo", random_tipo()) \
    .withColumn("pais_procedencia", random_pais_procedencia()) \
    .select("id_caso", "fecha_diagnostico", "ciudad_ubicacion", "departamento_distrito", "atencion", "edad", "sexo", "tipo", "pais_procedencia")

  query = stream_data.writeStream \
    .format(tbl_format) \
    .option("checkpointLocation", random_checkpoint_dir()) \
    .trigger(processingTime = "10 seconds") \
    .start(tbl_path)

  return query

In [35]:
stream2 = generate_include_data_stream_parquet_fix(
    tbl_format = "delta", 
    tbl_path = "covid_delta")

In [39]:
stop_all_streams()

Stopping all streams
Stopped all streams
Deleting checkpoints
Deleted checkpoints


In [38]:
stream3 = generate_include_data_stream_parquet_fix(
    tbl_format = "delta", 
    tbl_path = "covid_delta")

# Evolución de esquema

In [40]:
diagnosticos = [
    ("cov1", "29/07/2020", "Bogotá", "Bogotá D.C.", "Hospital UCI", "78", "M", "Relacionado", "Colombia", "Molecular", True),
    ("cov2", "29/07/2020", "Bogotá", "Bogotá D.C.", "Hospital", "38", "F", "Relacionado", "Colombia", "Antígeno", False),
    ("cov3", "29/07/2020", "Medellín", "Antioquia", "Casa", "24", "F", "Relacionado", "Colombia", "Antígeno", True),
    ("cov4", "29/07/2020", "Itagüí", "Antioquia", "Recuperado", "42", "M", "Relacionado", "Colombia", "Anticuerpos", True)
]

columnas = ["id_caso", "fecha_diagnostico", "ciudad_ubicacion", "departamento_distrito", "atencion", "edad", "sexo", "tipo", "pais_procedencia", "tipo_prueba", "confirmado_pcr"]

covid_data_update = spark.createDataFrame(diagnosticos, columnas)
  
covid_data_update.write.format("delta").mode("append").option("mergeSchema", "true").save("covid_delta")

In [41]:
df_delta_updated = spark.read.format("delta").load("covid_delta")

In [42]:
df_delta_updated.printSchema()

root
 |-- id_caso: string (nullable = true)
 |-- fecha_diagnostico: string (nullable = true)
 |-- ciudad_ubicacion: string (nullable = true)
 |-- departamento_distrito: string (nullable = true)
 |-- atencion: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- pais_procedencia: string (nullable = true)
 |-- tipo_prueba: string (nullable = true)
 |-- confirmado_pcr: boolean (nullable = true)



In [43]:
df_delta_updated.show(truncate=True)

+-------+-----------------+----------------+---------------------+----------+----+----+-----------+--------------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|  atencion|edad|sexo|       tipo|    pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+----------+----+----+-----------+--------------------+-----------+--------------+
|      1|       06/03/2020|          Bogotá|          Bogotá D.C.|Recuperado|  19|   F|  Importado|              Italia|       null|          null|
|      2|       09/03/2020|            Buga|      Valle del Cauca|Recuperado|  34|   M|  Importado|              España|       null|          null|
|      3|       09/03/2020|        Medellín|            Antioquia|Recuperado|  50|   F|  Importado|              España|       null|          null|
|      4|       11/03/2020|        Medellín|            Antioquia|Recuperado|  55|   M|Relacionado|            C

In [51]:
df_delta_updated.filter(df_delta_updated["id_caso"] == "cov4").show()

+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion|edad|sexo|tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+



## Eliminar pruebas de anticuerpos

In [47]:
spark.read.format("delta").load("covid_delta").createOrReplaceTempView("covid_delta")

In [48]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "covid_delta")
deltaTable.delete("tipo_prueba = 'Anticuerpos'")

In [49]:
display(spark.sql("select * from covid_delta where tipo_prueba='Anticuerpos'").show())

+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion|edad|sexo|tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+



None

In [50]:
deltaTable.history().show(truncate=False)
deltaTable.history().toPandas().head(50)

+-------+-------------------+------+--------+----------------+--------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation       |operationParameters                                                                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                        |userMetadata|
+-------+-------------------+------+--------+----------------+--------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------------------------+------------+
|17     |2020-07-29 19:10:47|null  |null    |DELE

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,17,2020-07-29 19:10:47,,,DELETE,"{'predicate': '[""(`tipo_prueba` = 'Anticuerpos...",,,,16.0,,False,"{'numDeletedRows': '1', 'numRemovedFiles': '1'...",
1,16,2020-07-29 19:08:50,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,15.0,,True,"{'numOutputRows': '4', 'numOutputBytes': '3092...",
2,15,2020-07-29 19:06:52,,,STREAMING UPDATE,"{'epochId': '11', 'outputMode': 'Append', 'que...",,,,13.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",
3,14,2020-07-29 19:06:50,,,STREAMING UPDATE,"{'epochId': '2', 'outputMode': 'Append', 'quer...",,,,13.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",
4,13,2020-07-29 19:06:42,,,STREAMING UPDATE,"{'epochId': '10', 'outputMode': 'Append', 'que...",,,,11.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",
5,12,2020-07-29 19:06:40,,,STREAMING UPDATE,"{'epochId': '1', 'outputMode': 'Append', 'quer...",,,,11.0,,True,"{'numOutputRows': '320', 'numRemovedFiles': '0...",
6,11,2020-07-29 19:06:36,,,STREAMING UPDATE,"{'epochId': '0', 'outputMode': 'Append', 'quer...",,,,10.0,,True,"{'numOutputRows': '0', 'numRemovedFiles': '0',...",
7,10,2020-07-29 19:06:30,,,STREAMING UPDATE,"{'epochId': '9', 'outputMode': 'Append', 'quer...",,,,9.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",
8,9,2020-07-29 19:06:20,,,STREAMING UPDATE,"{'epochId': '8', 'outputMode': 'Append', 'quer...",,,,8.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",
9,8,2020-07-29 19:06:10,,,STREAMING UPDATE,"{'epochId': '7', 'outputMode': 'Append', 'quer...",,,,7.0,,True,"{'numOutputRows': '800', 'numRemovedFiles': '0...",


In [67]:
spark.read.format("delta") \
  .option("versionAsOf", 17) \
  .load("covid_delta") \
  .createOrReplaceTempView("covid_delta_previous_version")

In [68]:
display(spark.sql("SELECT * FROM covid_delta_previous_version where tipo_prueba='Anticuerpos'").show())

+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion|edad|sexo|tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+
+-------+-----------------+----------------+---------------------+--------+----+----+----+----------------+-----------+--------------+



None

In [71]:
spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .load("covid_delta") \
  .createOrReplaceTempView("covid_delta_previous_version")

In [72]:
display(spark.sql("SELECT count(1) FROM covid_delta_previous_version").show())

+--------+
|count(1)|
+--------+
|    1161|
+--------+



None

In [73]:
display(spark.sql("SELECT * FROM covid_delta_previous_version").show(truncate=False))

+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion  |edad|sexo|tipo       |pais_procedencia         |
+-------+-----------------+----------------+---------------------+----------+----+----+-----------+-------------------------+
|1      |06/03/2020       |Bogotá          |Bogotá D.C.          |Recuperado|19  |F   |Importado  |Italia                   |
|2      |09/03/2020       |Buga            |Valle del Cauca      |Recuperado|34  |M   |Importado  |España                   |
|3      |09/03/2020       |Medellín        |Antioquia            |Recuperado|50  |F   |Importado  |España                   |
|4      |11/03/2020       |Medellín        |Antioquia            |Recuperado|55  |M   |Relacionado|Colombia                 |
|5      |11/03/2020       |Medellín        |Antioquia            |Recuperado|25  |M   |Relacionado|Colombia           

None

In [None]:
#spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")
#deltaTable.vacuum(0)
#deltaTable.history().toPandas().head()

## Actualizar e incorporar nuevos datos mediante MERGE

In [74]:
display(spark.sql("select * from covid_delta where id_caso IN ('cov1','cov2','cov3','cov4','cov5')").show())

+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|    atencion|edad|sexo|       tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+-----------+--------------+
|   cov1|       29/07/2020|          Bogotá|          Bogotá D.C.|Hospital UCI|  78|   M|Relacionado|        Colombia|  Molecular|          true|
|   cov2|       29/07/2020|          Bogotá|          Bogotá D.C.|    Hospital|  38|   F|Relacionado|        Colombia|   Antígeno|         false|
|   cov3|       29/07/2020|        Medellín|            Antioquia|        Casa|  24|   F|Relacionado|        Colombia|   Antígeno|          true|
+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+------

None

In [75]:
diagnosticos = [
    ("cov2", "29/07/2020", "Bogotá", "Bogotá D.C.", "Hospital", "38", "F", "Relacionado", "Colombia", "Molecular", True),
    ("cov5", "29/07/2020", "Villavicencio", "Antioquia", "Casa", "22", "M", "Relacionado", "Colombia", "Molecular", True),
    ("cov3", "08/08/2020", "Medellín", "Antioquia", "Hospital", "24", "F", "Relacionado", "Colombia", "Antígeno", True),
]

columnas = ["id_caso", "fecha_diagnostico", "ciudad_ubicacion", "departamento_distrito", "atencion", "edad", "sexo", "tipo", "pais_procedencia", "tipo_prueba", "confirmado_pcr"]

covid_data_update = spark.createDataFrame(diagnosticos, columnas)
  
display(covid_data_update.show())

+-------+-----------------+----------------+---------------------+--------+----+----+-----------+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|atencion|edad|sexo|       tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+--------+----+----+-----------+----------------+-----------+--------------+
|   cov2|       29/07/2020|          Bogotá|          Bogotá D.C.|Hospital|  38|   F|Relacionado|        Colombia|  Molecular|          true|
|   cov5|       29/07/2020|   Villavicencio|            Antioquia|    Casa|  22|   M|Relacionado|        Colombia|  Molecular|          true|
|   cov3|       08/08/2020|        Medellín|            Antioquia|Hospital|  24|   F|Relacionado|        Colombia|   Antígeno|          true|
+-------+-----------------+----------------+---------------------+--------+----+----+-----------+----------------+-----------+--------------+



None

In [76]:
from delta.tables import *

delta_table = DeltaTable.forPath(spark, "covid_delta")

delta_table.alias("destination").merge(
    covid_data_update.alias("source"), 
    "source.id_caso = destination.id_caso") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [77]:
display(spark.sql("select * from covid_delta where id_caso IN ('cov1','cov2','cov3','cov4','cov5')").show())

+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+-----------+--------------+
|id_caso|fecha_diagnostico|ciudad_ubicacion|departamento_distrito|    atencion|edad|sexo|       tipo|pais_procedencia|tipo_prueba|confirmado_pcr|
+-------+-----------------+----------------+---------------------+------------+----+----+-----------+----------------+-----------+--------------+
|   cov1|       29/07/2020|          Bogotá|          Bogotá D.C.|Hospital UCI|  78|   M|Relacionado|        Colombia|  Molecular|          true|
|   cov2|       29/07/2020|          Bogotá|          Bogotá D.C.|    Hospital|  38|   F|Relacionado|        Colombia|  Molecular|          true|
|   cov3|       08/08/2020|        Medellín|            Antioquia|    Hospital|  24|   F|Relacionado|        Colombia|   Antígeno|          true|
|   cov5|       29/07/2020|   Villavicencio|            Antioquia|        Casa|  22|   M|Relacionado|        Colombia|  Mole

None