### Create the spark context 

In [54]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import os

secret = os.environ.get("SPARK_AUTH_SECRET")
conf = SparkConf()
conf.set("spark.authenticate", "true")
conf.set("spark.driver.extraJavaOptions", f"-Dspark.authenticate.secret={secret}")

spark = SparkSession.builder.config(conf=conf)\
    .appName("merge_into_bronze") \
    .master("spark://node1.dw.felicity.net.bd:7077,node2.dw.felicity.net.bd:7077")\
    .getOrCreate()


### Creating tmp view for landing file

In [79]:
### geo_location_csv
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW geo_location_csv(
    location_id BIGINT, 
    country STRING, 
    state STRING, 
    city STRING, 
    postal_code STRING
)
USING csv
OPTIONS (
  path 's3a://spark-data/landing/geo_location.csv',
  header 'true'
);
""")

### customer_csv
spark.sql(f""" 
CREATE OR REPLACE TEMPORARY VIEW customer_csv (
    customer_id INT,
    full_name STRING,
    email STRING,
    phone_number STRING,
    location_id INT,
    created_at TIMESTAMP
)
USING csv
OPTIONS (
  path 's3a://spark-data/landing/customer.csv',
  header 'true'
);
""")

### product_csv
spark.sql(f""" 
CREATE OR REPLACE TEMPORARY VIEW product_csv (
    product_id INT,
    product_name STRING,
    description STRING,
    category STRING,
    price DECIMAL(10,2),
    in_stock BOOLEAN,
    created_at TIMESTAMP
)
USING csv
OPTIONS (
  path 's3a://spark-data/landing/product.csv',
  header 'true'
);
""");

### sales_order_csv
spark.sql(f""" 
CREATE OR REPLACE TEMPORARY VIEW sales_order_csv (
    order_id INT,
    customer_id INT,
    order_date TIMESTAMP,
    total_amount DECIMAL(12,2),
    status STRING,
    updated_at TIMESTAMP
)
USING csv
OPTIONS (
  path 's3a://spark-data/landing/sales_order.csv',
  header 'true'
);
""");

### order_item_csv
spark.sql(f""" 
CREATE OR REPLACE TEMPORARY VIEW order_item_csv (
    order_item_id INT,
    order_id INT,
    product_id INT,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_price DECIMAL(12,2)
)
USING csv
OPTIONS (
  path 's3a://spark-data/landing/order_item.csv',
  header 'true'
);
""");

### Merging landing data into bronze layer

In [80]:
spark.sql(f"""
MERGE INTO bronze.geo_location AS target
USING (
  SELECT *, current_timestamp() AS load_timestamp
  FROM geo_location_csv
) AS source
ON target.location_id = source.location_id

WHEN MATCHED THEN
  UPDATE SET
    target.country = source.country,
    target.state = source.state,
    target.city = source.city,
    target.postal_code = source.postal_code,
    target.load_timestamp = source.load_timestamp

WHEN NOT MATCHED THEN
  INSERT (location_id, country, state, city, postal_code, load_timestamp)
  VALUES (source.location_id, source.country, source.state, source.city, source.postal_code, source.load_timestamp);
""");

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `bronze`.`geo_location` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 2 pos 11;
'MergeIntoTable ('target.location_id = 'source.location_id), [updateaction(None, assignment('target.country, 'source.country), assignment('target.state, 'source.state), assignment('target.city, 'source.city), assignment('target.postal_code, 'source.postal_code), assignment('target.load_timestamp, 'source.load_timestamp))], [insertaction(None, assignment('location_id, 'source.location_id), assignment('country, 'source.country), assignment('state, 'source.state), assignment('city, 'source.city), assignment('postal_code, 'source.postal_code), assignment('load_timestamp, 'source.load_timestamp))]
:- 'SubqueryAlias target
:  +- 'UnresolvedRelation [bronze, geo_location], [__required_write_privileges__=INSERT,UPDATE], false
+- SubqueryAlias source
   +- Project [location_id#1010L, country#1011, state#1012, city#1013, postal_code#1014, current_timestamp() AS load_timestamp#1070]
      +- SubqueryAlias geo_location_csv
         +- View (`geo_location_csv`, [location_id#1010L,country#1011,state#1012,city#1013,postal_code#1014])
            +- Relation [location_id#1010L,country#1011,state#1012,city#1013,postal_code#1014] csv


In [77]:
spark.sql(""" select count(*) from  bronze.geo_location; """).show()

+--------+
|count(1)|
+--------+
|      15|
+--------+



In [64]:
spark.sql(f"""
MERGE INTO bronze.customer AS target
USING (
  SELECT *, current_timestamp() AS load_timestamp
  FROM customer_csv
) AS source
ON target.customer_id = source.customer_id

WHEN MATCHED THEN
  UPDATE SET
    full_name     = source.full_name,
    email         = source.email,
    phone_number  = source.phone_number,
    location_id   = source.location_id,
    created_at    = source.created_at,
    load_timestamp = source.load_timestamp

WHEN NOT MATCHED THEN
  INSERT (
    customer_id, full_name, email, phone_number, location_id, created_at, load_timestamp
  )
  VALUES (
    source.customer_id, source.full_name, source.email, source.phone_number,
    source.location_id, source.created_at, source.load_timestamp
  );

""");

In [76]:
spark.sql(""" select count(*) from  bronze.customer; """).show()

+--------+
|count(1)|
+--------+
|     250|
+--------+



In [65]:
spark.sql(f"""
MERGE INTO bronze.product AS target
USING (
  SELECT *, current_timestamp() AS load_timestamp
  FROM product_csv
) AS source
ON target.product_id = source.product_id

WHEN MATCHED THEN
  UPDATE SET
    product_name   = source.product_name,
    description    = source.description,
    category       = source.category,
    price          = source.price,
    in_stock       = source.in_stock,
    created_at     = source.created_at,
    load_timestamp = source.load_timestamp

WHEN NOT MATCHED THEN
  INSERT (
    product_id, product_name, description, category, price,
    in_stock, created_at, load_timestamp
  )
  VALUES (
    source.product_id, source.product_name, source.description, source.category,
    source.price, source.in_stock, source.created_at, source.load_timestamp
  );

""");

In [75]:
spark.sql(""" select count(*) from  bronze.product; """).show()

+--------+
|count(1)|
+--------+
|      25|
+--------+



In [71]:
spark.sql("""
MERGE INTO bronze.sales_order AS target
USING (
  SELECT 
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    current_timestamp() AS updated_at,
    current_timestamp() AS load_timestamp
  FROM sales_order_csv
) AS source
ON target.order_id = source.order_id

WHEN MATCHED THEN
  UPDATE SET
    customer_id    = source.customer_id,
    order_date     = source.order_date,
    total_amount   = source.total_amount,
    status         = source.status,
    updated_at     = source.updated_at,
    load_timestamp = source.load_timestamp

WHEN NOT MATCHED THEN
  INSERT (
    order_id, customer_id, order_date, total_amount, status,
    updated_at, load_timestamp
  )
  VALUES (
    source.order_id, source.customer_id, source.order_date,
    source.total_amount, source.status, source.updated_at, source.load_timestamp
  );
""")


25/06/08 17:35:50 WARN HiveConf: HiveConf of name hive.metastore.ssl.need.client.auth does not exist


DataFrame[]

In [74]:
spark.sql(""" select count(*) from  bronze.sales_order; """).show()

+--------+
|count(1)|
+--------+
|    1000|
+--------+



In [69]:
spark.sql(f"""
MERGE INTO bronze.order_item AS target
USING (
  SELECT 
    order_item_id,
    order_id,
    product_id,
    quantity,
    unit_price,
    total_price,
    current_timestamp() AS load_timestamp
  FROM order_item_csv
) AS source
ON target.order_item_id = source.order_item_id

WHEN MATCHED THEN
  UPDATE SET
    order_id       = source.order_id,
    product_id     = source.product_id,
    quantity       = source.quantity,
    unit_price     = source.unit_price,
    total_price    = source.total_price,
    load_timestamp = source.load_timestamp

WHEN NOT MATCHED THEN
  INSERT (
    order_item_id, order_id, product_id, quantity, unit_price, total_price, load_timestamp
  )
  VALUES (
    source.order_item_id, source.order_id, source.product_id, source.quantity,
    source.unit_price, source.total_price, source.load_timestamp
  );

""");

In [78]:
spark.sql(""" select count(*) from  bronze.order_item; """).show()

25/06/08 23:53:59 WARN HiveConf: HiveConf of name hive.metastore.ssl.need.client.auth does not exist


+--------+
|count(1)|
+--------+
|    3000|
+--------+

