In [0]:
from pyspark.sql import functions as F

source_path = "/Volumes/workspace/demo_etl/raw/sales.csv"

In [0]:
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(source_path))

df_clean = (df
    .withColumn("order_id", F.col("order_id").cast("long"))
    .withColumn("order_date", F.to_date("order_date"))
    .withColumn("qty", F.col("qty").cast("int"))
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("revenue", F.col("qty") * F.col("price"))
)



In [0]:
target_table = "workspace.demo_etl.sales_raw"

# pastikan tabel ada
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} (
  order_id BIGINT,
  order_date DATE,
  product STRING,
  qty INT,
  price DOUBLE,
  revenue DOUBLE
) USING DELTA
""")

# jadikan data baru sebagai temp view
df_clean.createOrReplaceTempView("incoming_sales")

# incremental MERGE berdasarkan order_id
spark.sql(f"""
MERGE INTO {target_table} t
USING incoming_sales s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
display(spark.table(target_table))

order_id,order_date,product,qty,price,revenue
1001,2026-02-18,Apple,2,15000.0,30000.0
1002,2026-02-18,Banana,5,7000.0,35000.0
1003,2026-02-19,Orange,3,12000.0,36000.0
1004,2026-02-19,Apple,1,15000.0,15000.0
1005,2026-02-20,Banana,4,7000.0,28000.0
1006,2026-02-20,Orange,2,12000.0,24000.0
1007,2026-02-21,Apple,5,15000.0,75000.0
1008,2026-02-21,Banana,3,7000.0,21000.0
1009,2026-02-22,Orange,4,12000.0,48000.0
1010,2026-02-22,Apple,2,15000.0,30000.0
