![Verne Academy Summit 2024](https://github.com/javendia/verne-academy-summit-2024/blob/main/header.png?raw=true)

## Paso 1: Ingesta de datos
- Leemos el fichero **StockItems.csv**, especificando el formato (CSV) e indicando que el archivo contiene las cabeceras de las columnas
- Creamos la tabla delta destino en caso de no existir

In [1]:
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.functions import when, lit, col, regexp_replace

df = spark.read.load('Files/wwi/StockItems.csv',
    format='csv',
    header=True
)

DeltaTable.createIfNotExists(spark) \
     .tableName("Stock_Items") \
     .addColumn("StockItemID", IntegerType()) \
     .addColumn("StockItemName", StringType()) \
     .addColumn("SupplierID", IntegerType()) \
     .addColumn("ColorID", IntegerType()) \
     .addColumn("UnitPackageID", IntegerType()) \
     .addColumn("OuterPackageID", IntegerType()) \
     .addColumn("Brand", StringType()) \
     .addColumn("Size", StringType()) \
     .addColumn("LeadTimeDays", IntegerType()) \
     .addColumn("QuantityPerOuter", IntegerType()) \
     .addColumn("IsChillerStock", BooleanType()) \
     .addColumn("Barcode", StringType()) \
     .addColumn("TaxRate", DecimalType(18,3)) \
     .addColumn("UnitPrice", DecimalType(18,2)) \
     .addColumn("RecommendedRetailPrice", DecimalType(18,2)) \
     .addColumn("TypicalWeightPerUnit", DecimalType(18,3)) \
     .addColumn("LastUpdated", TimestampType()) \
     .execute()

deltaTable = DeltaTable.forPath(spark, 'Tables/stock_items')

StatementMeta(, 3d92735e-823e-4fe3-ae58-7c3ecdca218d, 3, Finished, Available)

## Paso 2: Transformación
- Devolvemos un valor controlado para las columnas **StockItemName**, **Brand**, **Size** y **Barcode** en caso de ser nulas o vacías
- Adecuamos el separador de miles, reemplazando **','** por **'.'** y definiendo el tipo de datos para el destino

In [2]:
df = df.withColumn("StockItemName", when((col("StockItemName").isNull() | (col("StockItemName")=="")),lit("N/A")).otherwise(col("StockItemName"))) \
        .withColumn("Brand", when((col("Brand").isNull() | (col("Brand")=="")),lit("N/A")).otherwise(col("Brand"))) \
        .withColumn("Size", when((col("Size").isNull() | (col("Size")=="")),lit("N/A")).otherwise(col("Size"))) \
        .withColumn("Barcode", when((col("Barcode").isNull() | (col("Barcode")=="")),lit("N/A")).otherwise(col("Barcode")))

df = df.withColumn('TaxRate', regexp_replace('TaxRate', ',', '.').cast(DecimalType(18,3))) \
        .withColumn('UnitPrice', regexp_replace('UnitPrice', ',', '.').cast(DecimalType(18,2))) \
        .withColumn('RecommendedRetailPrice', regexp_replace('RecommendedRetailPrice', ',', '.').cast(DecimalType(18,2))) \
        .withColumn('TypicalWeightPerUnit', regexp_replace('TypicalWeightPerUnit', ',', '.').cast(DecimalType(18,3)))

StatementMeta(, 3d92735e-823e-4fe3-ae58-7c3ecdca218d, 4, Finished, Available)

## Paso 3: Creación de vista temporal
- Creamos la vista temporal **vw_stock_items** donde almacenar los datos tratados anteriormente

In [3]:
df.createOrReplaceTempView("vw_stock_items")

StatementMeta(, 3d92735e-823e-4fe3-ae58-7c3ecdca218d, 5, Finished, Available)

## Paso 4: Instrucción MERGE

- Si el valor del registro para la columna **StockItemID** existe **(MATCHED)** en la tabla destino y alguna columna difiere del registro existente, actualizamos la fila
- Si el valor del registro para la columna **StockItemID** no existe **(NOT MATCHED)** en la tabla destino, insertamos una nueva fila
- En caso de que el registro de la tabla destino no exista en el fichero origen, eliminamos esa fila

In [4]:
%%sql

MERGE INTO stock_items AS target
USING vw_stock_items AS source
ON target.StockItemID = source.StockItemID
WHEN MATCHED AND 
(
        target.StockItemName <> source.StockItemName
        OR target.SupplierID <> source.SupplierID
        OR target.ColorID <> source.ColorID
        OR target.UnitPackageID <> source.UnitPackageID
        OR target.OuterPackageID <> source.OuterPackageID
        OR target.Brand <> source.Brand
        OR target.Size <> source.Size
        OR target.LeadTimeDays <> source.LeadTimeDays
        OR target.QuantityPerOuter <> source.QuantityPerOuter
        OR target.IsChillerStock <> source.IsChillerStock    
        OR target.Barcode <> source.Barcode
        OR target.TaxRate <> source.TaxRate
        OR target.UnitPrice <> source.UnitPrice
        OR target.RecommendedRetailPrice <> source.RecommendedRetailPrice
        OR target.TypicalWeightPerUnit <> source.TypicalWeightPerUnit
)
THEN
UPDATE SET
        target.StockItemName = source.StockItemName
        ,target.SupplierID = source.SupplierID
        ,target.ColorID = source.ColorID
        ,target.UnitPackageID = source.UnitPackageID
        ,target.OuterPackageID = source.OuterPackageID
        ,target.Brand = source.Brand
        ,target.Size = source.Size
        ,target.LeadTimeDays = source.LeadTimeDays
        ,target.QuantityPerOuter = source.QuantityPerOuter
        ,target.IsChillerStock = source.IsChillerStock    
        ,target.Barcode = source.Barcode
        ,target.TaxRate = source.TaxRate
        ,target.UnitPrice = source.UnitPrice
        ,target.RecommendedRetailPrice = source.RecommendedRetailPrice
        ,target.TypicalWeightPerUnit = source.TypicalWeightPerUnit
        ,target.LastUpdated = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (StockItemID, StockItemName, SupplierID, ColorID, UnitPackageID, OuterPackageID, Brand, Size, 
        LeadTimeDays, QuantityPerOuter, IsChillerStock, Barcode, TaxRate, UnitPrice, RecommendedRetailPrice, TypicalWeightPerUnit, LastUpdated)
VALUES (source.StockItemID, source.StockItemName, source.SupplierID, source.ColorID, source.UnitPackageID, source.OuterPackageID, source.Brand, source.Size, 
        source.LeadTimeDays, source.QuantityPerOuter, source.IsChillerStock, source.Barcode, source.TaxRate, source.UnitPrice, 
        source.RecommendedRetailPrice, source.TypicalWeightPerUnit, CURRENT_TIMESTAMP())
WHEN NOT MATCHED BY SOURCE THEN
DELETE;

StatementMeta(, 3d92735e-823e-4fe3-ae58-7c3ecdca218d, 6, Finished, Available)

<Spark SQL result set with 1 rows and 4 fields>

## Paso 5: Eliminación vista temporal

In [5]:
spark.catalog.dropTempView("vw_stock_items")

StatementMeta(, 3d92735e-823e-4fe3-ae58-7c3ecdca218d, 7, Finished, Available)

True