# 💪 **CDF en acción** 💪

En este notebook, veremos como aplicar la propiedad Change Data Feed a nuestras tablas para poder actualizarlas a partir del aislamiento de los cambios acontecidos.

⚠️ Ten en cuenta que para poder ejecutar la mayoría de pasos de este cuaderno, necesitarás tener un lakehouse agregado a él. Para ello, puedes seguir [los pasos indicados aquí](https://learn.microsoft.com/es-es/fabric/data-engineering/lakehouse-notebook-explore).

Recuerda que dispones de los ficheros **Product.csv** y **Stores.csv** dentro de la ruta de GitHub.

## Imports y referencias
---

In [9]:
import os
from delta.tables import *
from pyspark.sql.functions import lit,max,to_timestamp,min,when

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 13, Finished, Available, Finished)

## Carguemos nuestros datos 
---


In [2]:
product_df = spark.read.format("csv").options(delimiter=";", header=True).load("Files/raw/20240930/Product.csv")
display(product_df)

# Cargamos nuestros datos en una tabla del Lakehouse asociado
product_df.write.saveAsTable("products")

StatementMeta(, 82747795-ffd0-4f1a-b7fe-d10fdc3c0897, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 805cd05d-4511-4e0f-831c-4c58d9e82aad)

## Rastreamos nuestros cambios 👀
---

### Activación de la propiedad en la tabla

Para poder llevar a cabo el registro de los cambios es necesario activar la propiedad previamente.

In [1]:
%%sql

ALTER TABLE products SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

Una vez tenemos habilitado el CDF, podemos comprobar esto en el historial de la tabla delta, como vemos a continuación tenemos una nueva versión con la operación de tipo **_SET TBLPROPERTIES_**.

In [2]:
%%sql

DESCRIBE HISTORY products

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 3, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

### ¿Qué pasaría si añadimos, modificamos o eliminamos registros de la tabla?

In [3]:
%%sql

UPDATE products SET Category = 'PC Games' WHERE ProductKey = 1662;

INSERT INTO products (ProductKey,ProductCode,Color) VALUES (9000,100100,'Yellow');

DELETE FROM products WHERE ProductKey = 30;

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 6, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 1 rows and 1 fields>

In [4]:
%%sql

DESCRIBE HISTORY products

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 7, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 15 fields>

### Pero... ¿y cómo accedo a mis modificaciones? ¡Pues mediante la función table_changes! 🎉

In [5]:
%%sql
-- table_changes(tabla, version inicial, version final) nos permite acceder a los cambios de nuestra tabla en el rango de versiones especificado. 
-- En caso de que solo indiquemos una versión se leen todos los cambios a partir de dicha versión.

SELECT * FROM table_changes('products',1)

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 8, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 17 fields>

Consultemos los cambios de la primera versión registrada para la tabla, ¿seremos capaces?

In [11]:
%%sql

SELECT * FROM table_changes('products',0)

StatementMeta(, 1ad01294-8fec-4768-8786-9077d270c902, 13, Finished, Available, Finished)

Error: [DELTA_MISSING_CHANGE_DATA] Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.

Estabais avisados... no podemos acceder a cambios realizados antes de activar la propiedad! 😉😉

## Actualicemos nuestra tabla 

In [6]:
# Cargamos los datos en una nueva tabla products_cdf para tener una versión inicial de la tabla products
products_cdf_df = spark.read.format("csv").options(delimiter=";", header=True).load("Files/raw/20240930/Product.csv")
products_cdf_df.write.format("delta").mode("overwrite").save("Tables/products_cdf")

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 10, Finished, Available, Finished)

In [7]:
# Consultamos los cambios sobre la tabla base
# Descartamos los cambios de tipo "preimage" ya que son informativos pero no los debemos aplicar sobre nuestra tabla 
changes_df = spark.sql('SELECT * FROM table_changes("products",1) WHERE _change_type<>"update_preimage"')

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 11, Finished, Available, Finished)

In [10]:
# Una vez tenemos los cambios sobre un DataFrame, aplicaremos un merge sobre la tabla products_scd2 de forma que actualice, inserte
# o elimine los cambios registrados
path_curated = 'Tables/products_cdf'
pks_str = '''curated.ProductKey = updates.ProductKey'''

curatedTable = DeltaTable.forPath(spark, path_curated)

curatedTable.alias('curated') \
.merge(
    changes_df.alias('updates'),
    pks_str) \
.whenMatchedDelete(condition = 'updates._change_type="delete"') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 14, Finished, Available, Finished)

In [11]:
%%sql

-- Comprobamos los resultados
SELECT * FROM products_cdf
WHERE ProductKey IN (1662,30,9000)

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 15, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 14 fields>

# 😎 Actualizando como un jefe gracias al CDF 😎

### Pero... ¿cómo puedo actualizar mi tabla únicamente con los últimos cambios? 

En este caso, utilizaremos datos de los taxis de Nueva York. 

En nuestro lakehouse disponemos de una tabla **_nyc_taxi_** que contiene cambios aplicados sobre ella gracias a [los conjuntos de datos de ejemplo dentro de Fabric](https://learn.microsoft.com/es-es/fabric/data-factory/create-first-pipeline-with-sample-data).

In [1]:
%%sql

-- Vemos que la propiedad CDF está activada desde la versión 2 y que se han aplicado 4 merge posteriormente
DESCRIBE HISTORY nyc_taxi

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 2, Finished, Available, Finished)

<Spark SQL result set with 7 rows and 15 fields>

Queremos aplicar los cambios de esos merge sobre una nueva tabla que contiene los datos de la versión inicial de la tabla **_nyc_taxi_**

In [12]:
# Creamos la tabla nyc_taxi_cdf con la versión inicial de nuestros datos
df = spark.sql('SELECT * FROM nyc_taxi_base')
df.write.format("delta").mode("overwrite").save("Tables/nyc_taxi_cdf")

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 16, Finished, Available, Finished)

### Recogemos el último estado de nuestros cambios

Primero, necesitamos saber desde que versión de la delta debemos analizar nuestros cambios.

In [13]:
history_taxi = spark.sql("DESCRIBE HISTORY nyc_taxi")
init_version_taxi = history_taxi.filter((history_taxi.operation=='SET TBLPROPERTIES') \
    & (history_taxi.operationParameters["properties"]=='{"delta.enableChangeDataFeed":"true"}')).select('version').first()[0]
init_version_taxi

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 17, Finished, Available, Finished)

2

Posteriormente, vamos a almacenar nuestros cambios en un dataframe, sobre el que operaremos.

In [14]:
# Vamos a leer los cambios desde la versión inicial y le añadimos un ranking, de forma que tengamos el ranking de cambios aplicados sobre cada registro
#Descartamos las preimages ya que solo queremos cambios a procesar en nuestra tabla
query_ranking  = f"""SELECT *, 
            --particionamos el ranking sobre nuestra PK
            RANK() OVER (PARTITION BY vendorID, lpepPickupDatetime, lpepDropoffDatetime  ORDER BY _commit_version desc) AS rank 
            FROM table_changes('nyc_taxi', {init_version_taxi}) 
            WHERE _change_type !='update_preimage' """ 
changes_ranking_df = spark.sql(query_ranking)
display(changes_ranking_df)

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7a27c325-615b-4f65-b5c9-7eff20d0befb)

In [15]:
# Nos quedamos con la última versión de nuestros cambios para optimizar la actualización de nuestra tabla
last_changes_df = changes_ranking_df.filter(changes_ranking_df.rank==1)
display(last_changes_df)

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a28abf0b-30c9-4850-bab5-f7baffe83475)

Realizamos el merge a partir de nuestro DataFrame de últimos cambios

In [16]:
# Preparar valores necesarios
path_curated = 'Tables/nyc_taxi'
pks_str = '''curated.vendorID = updates.vendorID AND curated.lpepPickupDatetime = updates.lpepPickupDatetime 
            AND curated.lpepDropoffDatetime = updates.lpepDropoffDatetime'''

# Cargar tabla de destino
curatedTable = DeltaTable.forPath(spark, path_curated)

# Realizar el merge
curatedTable.alias('curated') \
    .merge(
        last_changes_df.alias('updates'),pks_str) \
    .whenMatchedDelete(condition = 'updates._change_type="delete"') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

StatementMeta(, f3935762-0836-4ade-8e0e-3ac9ac2c584d, 20, Finished, Available, Finished)

# 🐱‍💻 Generando tu SCD2 gracias al poder de CDF 🐱‍💻 
---

A continuación, vamos a mostrar como a partir del CDF podemos llegar a construir nuestra tabla SCD2. Para ello utilizaremos la tabla **_Store_** de [la BBDD de **<u>_<u>Contoso</u>_</u>**](https://www.sqlbi.com/tools/contoso-data-generator/). 

Al igual que en el apartado anterior, tenemos una tabla base (**_store_base_**) en la que se han aplicado distintos cambios.

In [5]:
%%sql

DESCRIBE HISTORY store_base

StatementMeta(, , , Waiting, , Waiting)

<Spark SQL result set with 8 rows and 15 fields>

### Generamos la versión inicial de nuestra SCD2

A partir de la versión principal de la tabla, construiremos nuestra SCD2. 

Para ello, crearemos una tabla llamada _**store_scd2**_ que contenga los datos de la versión inicial y además las columnas de inicio y fin de validez, así como una columna de borrado lógico que nos indique si el registro ha sido eliminado o no.

In [11]:
# Obtenemos la versión inicial y el timestamp de creación de la misma
table_history = spark.sql('DESCRIBE HISTORY store_base')
init_version = history.filter((table_history.operation=='SET TBLPROPERTIES') & (table_history.operationParameters["properties"]=='{"delta.enableChangeDataFeed":"true"}'))\
    .select('version').first()[0]
datetime_init = history.filter(history.version==init_version).select('timestamp').first()[0]

print(init_version,datetime_init)

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 13, Finished, Available, Finished)

1 2024-10-19 19:25:19.534000


In [12]:
# Creamos la tabla store_scd2 a partir de la versión inicial de store_base y le añadimos las columnas de inicio y fin de validez y la de control de borrados
scd2_df = spark.read.option("versionAsOf", init_version).table("store_base")
scd2_df = scd2_df.withColumn("scd2_start_datetime",lit(datetime_init))\
    .scd2_df.withColumn("scd2_end_datetime",to_timestamp(lit(None)))\
    .scd2_df.withColumn("is_deleted",lit(0))

display(scd2_df)

scd2_df.write.format('delta').mode("overwrite").saveAsTable('store_scd2')

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8f046504-2ee9-4e6f-a777-262e93870a87)

Vamos a tratar los cambios realizados en la tabla de origen para luego actualizar nuestra SCD2 a partir de estos.

In [13]:
# Leemos los cambios generados desde la versión inicial y los rankeamos para tener un orden de los mismos. 
# En este caso aplicamos dense_rank() para que no haya saltos en la enumeración
sql_query  = f"""SELECT *
            ,dense_rank() OVER (PARTITION BY StoreKey ORDER BY _commit_version asc) AS rank 
            FROM table_changes('store_base', {init_version}) """
base_changes_df = spark.sql(sql_query)


# Filtramos nuestros cambios para descartar todos los preimage excepto el primero, ya que es necesario para actualizar la fecha fin del registro
scd2_changes_df = base_changes_df.filter((base_changes_df._change_type!='update_preimage') | \
    (base_changes_df._change_type=='update_preimage') & (base_changes_df.rank == 1))

# Añadimos la columna de control de borrados, de forma que para aquellas operaciones de tipo delete tenga el valor 1
scd2_changes_df = scd2_changes_df.withColumn("is_deleted",when(scd2_changes_df._change_type=='delete',1).otherwise(0))

# Creamos la columna operation que nos diga que operación corresponde en el merge según el tipo de cambio
scd2_changes_df = scd2_changes_df.withColumn("operation",when((scd2_changes_df._change_type=='update_preimage') | \
    (scd2_changes_df._change_type=='delete'),'update').otherwise('insert'))

# Finalmente añadimos una columna que determine la fecha de fin a asignar con cada cambio.
# Para ello, en el caso de un cambio de tipo postimage, debemos recurrir a la fecha de realización del siguiente cambio del resigstro, si existe. 
# En el dataframe versions guardamos un resumen de la información de la pk, la versión y el timestamp de dicha versión de nuestros cambios
versions = base_changes_df.select(base_changes_df['StoreKey'],base_changes_df['_commit_version'].alias('version'), \
    base_changes_df['_commit_timestamp'].alias('commit_timestamp'),base_changes_df['rank'].alias('rank_value')).dropDuplicates()

# Aplicamos el join con el dataframe anterior para obtener para cada cambio, el tiemstamp de su siguiente versión
scd2_changes_df = scd2_changes_df.join(versions, (scd2_changes_df.StoreKey==versions.StoreKey) & \
    (scd2_changes_df.rank == versions.rank_value-1) , 'left') \
    .select(scd2_changes_df["*"],versions["commit_timestamp"].alias("next_commit_timestamp"))

# Generamos la columna end_datetime que contenga la fecha de fin de validez de cada registro en función de la operación realizada
complete_changes_df = scd2_changes_df.withColumn("end_datetime",\
    when(scd2_changes_df.operation=='update',scd2_changes_df._commit_timestamp)\
    .when(scd2_changes_df._change_type=='update_postimage',scd2_changes_df.next_commit_timestamp)\
    .otherwise(None))

display(complete_changes_df)

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b4334a42-0235-4da5-b2c3-b697e5d5686f)

Finalmente, realizamos el merge para construir nuestra SCD2 en función de los cambios aplicados

In [14]:
# Preparar valores necesarios
path_curated = 'Tables/store_scd2'
pks_str = 'curated.StoreKey == changes.StoreKey and changes.operation=="update" and isnull(curated.scd2_end_datetime)'

# Cargar tabla de destino
curatedTable = DeltaTable.forPath(spark, path_curated)

# Realizar el merge
curatedTable.alias('curated') \
  .merge(complete_changes_df.alias('changes'),pks_str) \
  .whenMatchedUpdate(
      set = {
        "scd2_end_datetime":"changes.end_datetime - INTERVAL '1' SECOND",
        "is_deleted":"changes.is_deleted"}) \
  .whenNotMatchedInsert(
      values = {
        "StoreKey": "changes.StoreKey",  
        "StoreCode": "changes.StoreCode",
        "Country": "changes.Country",
        "State": "changes.State",
        "Name": "changes.Name",
        "SquareMeters": "changes.SquareMeters",
        "OpenDate": "changes.OpenDate",
        "CloseDate": "changes.CloseDate",
        "Status": "changes.Status",
        "scd2_start_datetime": "changes._commit_timestamp",
        "scd2_end_datetime": "changes.end_datetime - INTERVAL '1' SECOND",
        "is_deleted":"changes.is_deleted"
      }) \
  .execute()

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 16, Finished, Available, Finished)

Comprobamos que la SCD2 se ha construido correctamente en función de la _**StoreKey**_. Recordemos que según su valor:

- StoreKey=150: eliminado el registro
- StoreKey=160: actualizado 3 veces
- StoreKey=200: actualizado 1 vez
- StoreKey=777: insertado el registro

In [15]:
%%sql

SELECT * 
FROM store_scd2
WHERE StoreKey in (150,160,200,777)

StatementMeta(, 2f85bcbe-dfae-476a-8f5f-ddcd0fad4a39, 17, Finished, Available, Finished)

<Spark SQL result set with 8 rows and 12 fields>