In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,array,ArrayType,DateType,TimestampType,FloatType

In [0]:
df_customer_cdc = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv(
        "/Volumes/ctlg_electroniz/sch_bronze/ext_vol_cdc/store_transactions/customers/"
    )
)
display(df_customer_cdc)
df_customer_cdc.createOrReplaceTempView("df_customer_cdc")

In [0]:
%sql
MERGE INTO
  ctlg_electroniz.sch_bronze.tbl_bronze_customers
USING
  df_customer_cdc
ON
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.customer_id = df_customer_cdc.customer_id
WHEN MATCHED THEN UPDATE SET
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.customer_name = df_customer_cdc.customer_name,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.address = df_customer_cdc.address,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.city = df_customer_cdc.city,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.postalcode = df_customer_cdc.postalcode,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.country = df_customer_cdc.country,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.phone = df_customer_cdc.phone,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.email = df_customer_cdc.email,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.credit_card = df_customer_cdc.credit_card,
  ctlg_electroniz.sch_bronze.tbl_bronze_customers.updated_at = df_customer_cdc.updated_at
WHEN NOT MATCHED THEN INSERT (
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.customer_id,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.customer_name,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.address,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.city,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.postalcode,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.country,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.phone,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.email,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.credit_card,
    ctlg_electroniz.sch_bronze.tbl_bronze_customers.updated_at
  )
  VALUES (
    df_customer_cdc.customer_id,
    df_customer_cdc.customer_name,
    df_customer_cdc.address,
    df_customer_cdc.city,
    df_customer_cdc.postalcode,
    df_customer_cdc.country,
    df_customer_cdc.phone,
    df_customer_cdc.email,
    df_customer_cdc.credit_card,
    df_customer_cdc.updated_at
  );

In [0]:
from delta.tables import *
deltaTableOrders = DeltaTable.forName(spark, "ctlg_electroniz.sch_bronze.tbl_bronze_orders")

In [0]:
df_orders_cdc = (
    spark.read.option("header", "true")
    .option("inferSchema", "false")
    .csv(
        "/Volumes/ctlg_electroniz/sch_bronze/ext_vol_cdc/store_transactions/orders/"
    )
)
display(df_orders_cdc)
df_orders_cdc.createOrReplaceTempView("df_orders_cdc")
df_orders_cdc.printSchema()

In [0]:
deltaTableOrders.alias('orders') \
  .merge(
    df_orders_cdc.alias('df_orders_cdc'),
    'orders.order_number = df_orders_cdc.order_number'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [0]:
df_ecommerce_cdc=spark.read.option("multiline", "true").json("/Volumes/ctlg_electroniz/sch_bronze/ext_vol_cdc/ecommerce_transactions/2025/07/15/")
display(df_ecommerce_cdc)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS ctlg_electroniz.sch_bronze.tbl_bronze_ecommerce (
  customer_name STRING,
  address STRING,
  city STRING,
  country STRING,
  currency STRING,
  email STRING,
  order_date STRING,
  order_mode STRING,
  order_number STRING,
  phone STRING,
  postalcode STRING,
  product_name STRING,
  sale_price STRING
)

In [0]:
ECOMMERCE_SCHEMA =[
    ('customer_name', StringType()),
    ('address', StringType()),
    ('city', StringType()),
    ('country', StringType()),
    ('currency', StringType()),
    ('email', StringType()),
    ('order_date', StringType()),
    ('order_mode', StringType()),
    ('order_number', StringType()),
    ('phone', StringType()),
    ('postalcode', StringType()),
    ('product_name', StringType()),
    ('sale_price', StringType())
]

ecommerce_fields = [StructField(*field) for field in ECOMMERCE_SCHEMA]
schema_ecomm = StructType(ecommerce_fields)

df_ecommerce_cdc = spark.read.json(path="/Volumes/ctlg_electroniz/sch_bronze/ext_vol_cdc/ecommerce_transactions/2025/07/15/",schema=schema_ecomm,multiLine=True) 
#df_ecommerce_cdc.printSchema()
#display(df_ecommerce_cdc)
df_ecommerce_cdc.writeTo("ctlg_electroniz.sch_bronze.tbl_bronze_ecommerce").createOrReplace()

deltaTableEcommerce = DeltaTable.forName(spark, "ctlg_electroniz.sch_bronze.tbl_bronze_ecommerce")
deltaTableEcommerce.alias('ecommerce') \
  .merge(
    df_ecommerce_cdc.alias('df_ecommerce_cdc'),
    'ecommerce.order_number = df_ecommerce_cdc.order_number'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [0]:
ILOCATION_SCHEMA =[
    ('ip1', IntegerType()),
    ('ip2', IntegerType()),
    ('country_code', StringType()),
    ('country_name', StringType())
]

fields = [StructField(*field) for field in ILOCATION_SCHEMA]
schema_iplocation = StructType(fields)

df_iplocation_cdc = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .schema(schema_iplocation)
    .csv(
        "/Volumes/ctlg_electroniz/sch_bronze/ext_vol_cdc/iplocation/"
    )
)
display(df_iplocation_cdc)
df_iplocation_cdc.createOrReplaceTempView("df_iplocation_cdc")
df_iplocation_cdc.printSchema()

In [0]:
%sql
CREATE TABLE IF NOT EXISTS ctlg_electroniz.sch_bronze.tbl_bronze_iplocation (
  ip1 INTEGER,
  ip2 INTEGER,
  country_code STRING,
  country_name STRING
);

In [0]:
%%sql
CREATE OR REPLACE TEMP VIEW iplocation_src AS 
SELECT DISTINCT * FROM df_iplocation_cdc;

DELETE FROM ctlg_electroniz.sch_bronze.tbl_bronze_iplocation;
INSERT INTO ctlg_electroniz.sch_bronze.tbl_bronze_iplocation SELECT * FROM iplocation_src;
