<a href="https://colab.research.google.com/github/jquesada92/delta_lake_project/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install delta-spark==3.0.0

Collecting delta-spark==3.0.0
  Downloading delta_spark-3.0.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.0.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.0.0


In [2]:
SOURCE_PATH = '/content/drive/MyDrive/Colab Notebooks/contraloria'
STAGING_PATH = f'{SOURCE_PATH}/staging'
CHECKPOINT_PATH = f'{SOURCE_PATH}/checkpoint'
spark_warehouse = f'{SOURCE_PATH}/spark-warehouse'
bronze_path = f'{spark_warehouse}/bronze_scd_type_2'
key_cols = ['cedula','institucion']
update_col = 'fecha_actualizacion'

In [3]:
from pyspark.sql import SparkSession
from delta import *
from delta.tables import DeltaTable
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import DataFrame

# Create SparkSession with Delta Lake configurations
# Add the Delta Lake packages to the SparkSession configuration
builder = SparkSession.builder \
    .appName("DeltaLakeAlternativeSession") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.warehouse.dir",spark_warehouse )\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
def create_table_if_not_exists(df:DataFrame,table_path:str)->None:
  if not spark.catalog.tableExists(table_path):
    # Create the table if it does not exist
    df.write.format("delta").mode("overwrite").save(table_path)
    return False
  else:
    return True

In [5]:

def bronze_type2_upsert( microbatch_df: DataFrame, batch_id: str, table_path:str = bronze_path )-> None:
    """
    Performs a Type 2 SCD upsert into the bronze Delta table.
    - Deduplicates incoming microbatch by key, variationNumber, and timestamp.
    - Adds SCD2 columns: start_date, end_date, is_current.
    - If the table does not exist, creates it.
    - Otherwise, merges to expire old records and appends new ones.
    """
    if microbatch_df.isEmpty():
        return

    __window = lambda x: Window.partitionBy(*x).orderBy(F.desc(update_col))

    # Deduplicate and add SCD2 columns
    df_updates = (
        microbatch_df.withColumn(
            "duplicated",
            F.row_number().over(__window(key_cols + [update_col])),
        )
        .where("duplicated = 1")
        .drop("duplicated")
        .withColumn(
            "row_num",
            F.row_number().over(__window(key_cols)),
        )
        .withColumn("start_date", F.col(update_col))
        .withColumn("end_date", F.lag(update_col).over(__window(key_cols)))
        .withColumn(
            "last_update",
            F.when(F.col("row_num") == F.lit(1), F.lit(True)).otherwise(F.lit(False)),
        )
        .drop("row_num")
    )

    create_table_if_not_exists(df_updates,table_path)

    # Reference to the Delta table
    delta_target = DeltaTable.forPath(spark, table_path)
    updates = df_updates.alias("updates")
    target = delta_target.alias("target")

    # Merge condition on key columns
    merge_condition = " AND ".join([f"target.{k} = updates.{k}" for k in key_cols])

    # 1. Mark old record as not current if a new version arrives
    delta_target.alias("target").merge(
        updates.alias("updates"),
        f"({merge_condition} AND target.last_update = true) AND (updates.last_update = true) AND (updates.{update_col} > target.{update_col})",
    ).whenMatchedUpdate(
        set={"end_date": "updates.start_date", "last_update": F.lit(False)}
    ).execute()

    # 2. Always insert the new version as a new record
    df_updates.write.format("delta").mode("append").save(table_path)



In [6]:
schema =StructType([StructField('nombre', StringType(), True),
                    StructField('apellido', StringType(), True),
                    StructField('cedula', StringType(), True),
                    StructField('cargo', StringType(), True),
                    StructField('salario', DoubleType(), True),
                    StructField('gasto', DoubleType(), True),
                    StructField('estado', StringType(), True),
                    StructField('fecha_de_inicio', DateType(), True),
                    StructField('fecha_actualizacion', TimestampType(), True),
                    StructField('fecha_consulta', TimestampType(), True),
                    StructField('archivo', StringType(), True),
                    StructField('institucion', StringType(), True)])


source_staging_sdf =  (spark.readStream.format("parquet")
    .schema(schema)
    .parquet(STAGING_PATH)
    .withColumn('file_path', F.input_file_name()
)
)


bronze_query = (
    source_staging_sdf.writeStream
    .trigger(availableNow=True)
    .foreachBatch(lambda df, batch_id: bronze_type2_upsert( df, batch_id))
    .option("checkpointLocation",CHECKPOINT_PATH + '/bronze')
    .outputMode("append")
    .start()
)

bronze_query.awaitTermination()

In [10]:
  spark.sql(f"select * from delta.`{bronze_path}`").show()

+---------+---------+-------------+--------------------+-------+-----+----------+---------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-----------+
|   nombre| apellido|       cedula|               cargo|salario|gasto|    estado|fecha_de_inicio|fecha_actualizacion|      fecha_consulta|             archivo|         institucion|           file_path|         start_date|           end_date|last_update|
+---------+---------+-------------+--------------------+-------+-----+----------+---------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-----------+
|     RUTH|  DEL CID|1-0016-000226|EDUCADOR B 1 (MAE...|2068.97|  0.0|PERMANENTE|     1993-03-22|2025-07-16 08:20:06|2025-07-19 15:55:...|InformeConsultaPl...|MINISTERIO DE EDU...|file:///content/d...|2025-07-16 08:20:06|               NU

In [None]:
bronze_sdf = spark.read.format('delta').load(bronze_path)\
                    .where("last_update = true")\
                    .withColumn('activate',F.lit(True))

create_table_if_not_exists(bronze_sdf,'silver_scd_type_2')

updates_sdf = bronze_sdf.withColumn('last_update', F.col('fecha'))


In [None]:

def silver_upsert( microbatch_df: DataFrame, batch_id: str, table_name:str = bronze_table )-> None:
    """
    Performs a Type 2 SCD upsert into the bronze Delta table.
    - Deduplicates incoming microbatch by key, variationNumber, and timestamp.
    - Adds SCD2 columns: start_date, end_date, is_current.
    - If the table does not exist, creates it.
    - Otherwise, merges to expire old records and appends new ones.
    """
    if microbatch_df.isEmpty():
        return



    __window = lambda x: Window.partitionBy(*x).orderBy(F.desc(update_col))

    # Deduplicate and add SCD2 columns
    df_updates = (
        microbatch_df.withColumn(
            "duplicated",
            F.row_number().over(__window(key_cols + [update_col])),
        )
        .where("duplicated = 1")
        .drop("duplicated")
        .withColumn(
            "row_num",
            F.row_number().over(__window(key_cols)),
        )
        .withColumn("start_date", F.col(update_col))
        .withColumn("end_date", F.lag(update_col).over(__window(key_cols)))
        .withColumn(
            "last_update",
            F.when(F.col("row_num") == F.lit(1), F.lit(True)).otherwise(F.lit(False)),
        )
        .drop("row_num")
    )

    create_table_if_not_exists(df_updates,table_name)

    # Reference to the Delta table
    delta_target = DeltaTable.forName(spark, table_name)
    updates = df_updates.alias("updates")
    target = delta_target.alias("target")

    # Merge condition on key columns
    merge_condition = " AND ".join([f"target.{k} = updates.{k}" for k in key_cols])

    # 1. Mark old record as not current if a new version arrives
    delta_target.alias("target").merge(
        updates.alias("updates"),
        f"({merge_condition} AND target.last_update = true) AND (updates.last_update = true) AND (updates.{update_col} > target.{update_col})",
    ).whenMatchedUpdate(
        set={"end_date": "updates.start_date", "last_update": F.lit(False)}
    ).execute()

    # 2. Always insert the new version as a new record
    df_updates.write.format("delta").mode("append").saveAsTable(table_name)

