#Cloning

In [0]:
USE usertraining.flights;

CREATE OR REPLACE TABLE airport_codes_clone_bronze
DEEP CLONE airportcodes_bronze;  --SHALLOW


#Complete overwrite

In [0]:
CREATE OR REPLACE TABLE departuredelays_bronze AS
  SELECT * FROM read_files("dbfs:/databricks-datasets/flights/departuredelays.csv",
    format => "csv",
    sep => ",",
    header => true,
    mode => "FAILFAST");

DESCRIBE HISTORY departuredelays_bronze;

In [0]:
INSERT OVERWRITE departuredelays_bronze  --note this wont support scheme evolution unlike CRAS
  SELECT * FROM read_files("dbfs:/databricks-datasets/flights/departuredelays.csv",
      format => "csv",
      sep => ",",
      header => true,
      mode => "FAILFAST");

  DESCRIBE HISTORY departuredelays_bronze;

#Merge updates - Upserts

In [0]:
create schema if not exists tpch;
use tpch;

CREATE OR REPLACE TABLE customer_bronze AS
   SELECT *, current_timestamp() as updated
   FROM DELTA.`dbfs:/databricks-datasets/tpch/delta-001/customer/`;

select * from customer_bronze;

In [0]:
CREATE OR REPLACE TEMP VIEW customer_updates_view10u AS
   SELECT
      c_custkey,
      c_name,
      CONCAT('Address_', CAST(RAND() * 1000000 AS INT)) AS c_address,
      c_nationkey,
      c_phone,
      c_acctbal,
      c_mktsegment,
      c_comment,
      current_timestamp() as updated
   FROM customer_bronze
   ORDER BY RAND()
   LIMIT 10;

CREATE OR REPLACE TEMP VIEW customer_updates_view20i AS
   SELECT
      CAST(RAND() * 1000000 AS INT) AS c_custkey,
      c_name,
      CONCAT('Address_', CAST(RAND() * 1000000 AS INT)) AS c_address,
      c_nationkey,
      c_phone,
      c_acctbal,
      c_mktsegment,
      c_comment,
      current_timestamp() as updated
   FROM customer_bronze
   ORDER BY RAND()
   LIMIT 20;

CREATE OR REPLACE TEMP VIEW customer_updates_view AS
   SELECT * FROM customer_updates_view10u
   UNION
   SELECT * FROM customer_updates_view20i;


select * from customer_updates_view

In [0]:
MERGE INTO customer_bronze a
USING customer_updates_view b
ON a.c_custkey = b.c_custkey
WHEN MATCHED THEN
  UPDATE SET c_address = b.c_address, updated = b.updated
WHEN NOT MATCHED THEN
  INSERT *

#Insert only merge for deduplication

In [0]:
MERGE INTO customer_bronze a
USING customer_updates_view b
ON a.c_custkey = b.c_custkey
WHEN NOT MATCHED AND b.c_mktsegment = 'HOUSEHOLD' THEN
  INSERT *

#Filter & rename columns

In [0]:
CREATE OR REPLACE TABLE customer_narrow AS
   SELECT
      c_custkey as id,
      c_name as name,
      c_address as address
   FROM customer_bronze;

SELECT * FROM customer_narrow;

#Declare schema & generated columns

In [0]:
CREATE OR REPLACE TABLE customer_enhanced_bronze (
      c_custkey BIGINT,
      c_name STRING,
      c_address STRING,
      c_nationkey BIGINT,
      c_phone STRING,
      c_acctbal DECIMAL(18,2),
      c_mktsegment STRING,
      c_comment STRING,
      updated TIMESTAMP,
      date DATE GENERATED ALWAYS AS (
        cast(updated AS DATE))
        COMMENT 'generated based on updated'
      );

In [0]:
-- SET spark.databricks.delta.schema.autoMerge.enabled=true;

MERGE INTO customer_enhanced_bronze a
USING customer_bronze b
ON a.c_custkey = b.c_custkey
WHEN NOT MATCHED THEN
  INSERT (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, updated)
  VALUES (b.c_custkey, b.c_name, b.c_address, b.c_nationkey, b.c_phone, b.c_acctbal, b.c_mktsegment, b.c_comment, b.updated);
  -- INSERT *;

SELECT * FROM customer_enhanced_bronze;

#Table constraint

In [0]:
ALTER TABLE customer_enhanced_bronze ADD CONSTRAINT valide_date CHECK (date > '2025-01-01')

In [0]:
DESCRIBE EXTENDED customer_enhanced_bronze