In [0]:
import pyspark
from pyspark.sql import SparkSession
from delta.tables import *

# Crear sesión de Spark
spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


In [0]:
# Configuración de rutas
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
deltaPath = "/tmp/loans_delta"

In [0]:
# Crear tabla Delta Lake con datos de préstamos
df = spark.read.format("parquet").load(sourcePath)
df.write.format("delta").save(deltaPath)

In [0]:
# Crear vista temporal sobre los datos
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

In [0]:
# Consultar los datos
spark.sql("SELECT count(*) FROM loans_delta").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [0]:
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



In [0]:
df.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)



In [0]:
import random, os

# Crear un nuevo DataFrame de streaming con datos aleatorios
streamingSourceDir = "/tmp/streaming_source"

# Crear directorio de fuente de streaming si no existe
if not os.path.exists(streamingSourceDir):
    os.makedirs(streamingSourceDir)

random_data = [(random.randint(1000000, 9999999), random.randint(1000, 1000), random.uniform(0, 1000), random.choice(['CA', 'WA', 'TX', 'OK', 'PA','NY','FL','NM'])) for _ in range(5)]
schema = """
    loan_id LONG,
    funded_amnt INT,
    paid_amnt DOUBLE,
    addr_state STRING
"""
newLoanStreamDF = spark.createDataFrame(random_data, schema=schema)
newLoanStreamDF.show()

+-------+-----------+------------------+----------+
|loan_id|funded_amnt|         paid_amnt|addr_state|
+-------+-----------+------------------+----------+
|8275334|       1000| 536.4856148926217|        TX|
|9971909|       1000|108.93329938258101|        NM|
|2367400|       1000| 761.1484110892422|        OK|
|7149065|       1000| 695.8854301025499|        TX|
|6233620|       1000| 570.1251780892048|        CA|
+-------+-----------+------------------+----------+



In [0]:
# Guardar datos aleatorios en la carpeta de streaming
newLoanStreamDF.write.mode("append").json(streamingSourceDir)

In [0]:
# Crear DataFrame de streaming leyendo de la carpeta
streamingDF = spark.readStream.schema(schema).json(streamingSourceDir)

In [0]:
# Cargar datos de streaming en una tabla Delta Lake
# Suponiendo que tenemos un DataFrame de streaming `newLoanStreamDF` con los mismos datos
checkpointDir = "/tmp/checkpoints"
streamingQuery = (streamingDF.writeStream
                  .format("delta")
                  .option("checkpointLocation", checkpointDir)
                  .trigger(once=True)
                  .start(deltaPath))

In [0]:
# Esperar a que termine la consulta de streaming
streamingQuery.awaitTermination()
streamingQuery.stop()

In [0]:
streamingQuery.isActive

False

In [0]:
# Consultar los datos
spark.sql("SELECT count(*) FROM loans_delta").show()

+--------+
|count(1)|
+--------+
|   14710|
+--------+



option("mergeSchema", "true"): Esta opción permite la evolución del esquema. Cuando se establece en "true", permite que Delta Lake realice cambios en el esquema de la tabla existente para acomodar nuevas columnas que puedan no estar presentes en el esquema actual. Es decir, si el DataFrame loanUpdates contiene columnas adicionales que no están en el esquema actual del Delta Lake, el esquema se actualizará automáticamente para incluir estas nuevas columnas.

In [0]:
from pyspark.sql.functions import col
# Forzar esquema en escritura para evitar corrupción de datos
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [(1111111, 1000, 1000.0, 'TX', True),
         (2222222, 2000, 0.0, 'CA', False)]
loanUpdates = spark.createDataFrame(items, cols)
# Asegurar que los tipos de datos coincidan
loanUpdates = loanUpdates.withColumn("loan_id", col("loan_id").cast("long")) \
                         .withColumn("funded_amnt", col("funded_amnt").cast("int")) \
                         .withColumn("paid_amnt", col("paid_amnt").cast("double")) \
                         .withColumn("addr_state", col("addr_state").cast("string"))

loanUpdates.write.format("delta").mode("append").option("mergeSchema", "true").save(deltaPath)

In [0]:
deltaTable = DeltaTable.forPath(spark, deltaPath)
# Upsert de datos: Fusionar cambios de datos nuevos
deltaTable.alias("t").merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# Ver esquema de la tabla actualizada
print("Schema de la tabla actualizada:")
deltaTable.toDF().printSchema()

Schema de la tabla actualizada:
root
 |-- loan_id: long (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- paid_amnt: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- closed: boolean (nullable = true)



In [0]:
# Mostrar los datos de la tabla actualizada
print("Datos de la tabla actualizada:")
deltaTable.toDF().show(5)

Datos de la tabla actualizada:
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  NULL|
|      1|       1000|   361.19|        WA|  NULL|
|      2|       1000|   176.26|        TX|  NULL|
|      3|       1000|   1000.0|        OK|  NULL|
|      4|       1000|   249.98|        PA|  NULL|
+-------+-----------+---------+----------+------+
only showing top 5 rows



In [0]:
# Filas con valor WA en addr_state
deltaTable.toDF().filter("addr_state = 'WA'").count()

340

In [0]:
# Filas con valor OR en addr_state
deltaTable.toDF().filter("addr_state = 'OR'").count()

178

In [0]:
# Transformar datos existentes: Actualizar estado de direcciones
deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})

In [0]:
# Filas con valor WA en addr_state
deltaTable.toDF().filter("addr_state = 'WA'").count()

518

In [0]:
# Filas con valor OR en addr_state
deltaTable.toDF().filter("addr_state = 'OR'").count()

0

In [0]:
# Filas con "funded_amnt =< paid_amnt"
deltaTable.toDF().filter("funded_amnt <= paid_amnt").count()

5134

In [0]:
# Eliminar datos de usuarios: Borrar préstamos completamente pagados
deltaTable.delete("funded_amnt <= paid_amnt")

In [0]:
# Filas con "funded_amnt =< paid_amnt"
deltaTable.toDF().filter("funded_amnt <= paid_amnt").count()

0

In [0]:
# Filas totales
deltaTable.toDF().count()

9578

In [0]:
# Auditar cambios de datos: Mostrar historial de operaciones
deltaTable.history().select("version", "timestamp", "operation", "operationParameters").show(truncate=False)


+-------+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |operation       |operationParameters                                                                                                                                                                                            |
+-------+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|5      |2024-07-17 05:03:06|DELETE          |{predicate -> ["(cast(funded_amnt#2428 as double) <= paid_amnt#2429)"]}                                                                                                                      

In [0]:
# Viajar en el tiempo: Consultar versiones anteriores de la tabla
# De antes de que se hiciera el merge
#spark.read.format("delta").option("timestampAsOf", "2020-01-01").load(deltaPath).show()
spark.read.format("delta").option("versionAsOf", "0").load(deltaPath).show(5)


+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+
only showing top 5 rows



In [0]:
# Version despues del merge
spark.read.format("delta").option("versionAsOf", "3").load(deltaPath).show(5)


+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  NULL|
|      1|       1000|   361.19|        WA|  NULL|
|      2|       1000|   176.26|        TX|  NULL|
|      3|       1000|   1000.0|        OK|  NULL|
|      4|       1000|   249.98|        PA|  NULL|
+-------+-----------+---------+----------+------+
only showing top 5 rows



In [0]:
# Detener la sesión de Spark
spark.stop()

Custom TB Handler failed, unregistering


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     50[0m     )

File [0;32m/databricks/spark/python/pyspark/sql/session.py:1919[0m, in [0;36mSparkSession.stop[0;34m(self)[0m
[1;32m   1918[0m [38;5;28;01massert[39;00m [38;5;28mself[39m[38;5;241m.[39m_jvm [38