In [12]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("Gold_Data").getOrCreate()

# -------------------------
# Predictive Table (ML) standalone table
# -------------------------
# Will be added later for predictive values
# speak with team on what columns


# -------------------------
# Fact Tables
# -------------------------

# Fact_Trade_Import
spark.sql("""
CREATE TABLE IF NOT EXISTS Fact_Trade_Import (
    id BIGINT NOT NULL,
    trade_value FLOAT,
    Item_Dim_FK INT NOT NULL,
    Partner_Dim_FK INT NOT NULL,
    Reporter_Dim_FK INT NOT NULL,
    Date_Dim_FK INT NOT NULL,
    trade_flow_type_FK INT NOT NULL
)
USING DELTA
""")

# Fact_Trade_Export
spark.sql("""
CREATE TABLE IF NOT EXISTS Fact_Trade_Export (
    id BIGINT NOT NULL,
    trade_value FLOAT,
    Item_Dim_FK INT NOT NULL,
    Partner_Dim_FK INT NOT NULL,
    Reporter_Dim_FK INT NOT NULL,
    Date_Dim_FK INT NOT NULL,
    trade_flow_type_FK INT NOT NULL
)
USING DELTA
""")


# -------------------------
# Dimension Tables
# -------------------------

# Dim_Date
spark.sql("""
CREATE TABLE IF NOT EXISTS Dim_Date (
    date_id BIGINT NOT NULL,
    full_date DATE,
    year_num INT,
    month_num INT,
    month_name STRING,
    quarter_num INT
)
USING DELTA
""")

# Dim_Reporter (Ontario only)
spark.sql("""
CREATE TABLE IF NOT EXISTS Dim_Reporter (
    reporter_id INT NOT NULL, -- location key
    reporter_iso STRING, -- country prefix
    province STRING
)
USING DELTA
""")

# Dim_Partner (US states)
spark.sql("""
CREATE TABLE IF NOT EXISTS Dim_Partner (
    partner_id INT NOT NULL, -- location key
    partner_iso STRING, -- country prefix
    state STRING
)
USING DELTA
""")

# Dim_Item (HS Sections)
spark.sql("""
CREATE TABLE IF NOT EXISTS Dim_Item (
    item_id INT NOT NULL, -- hs code
    item_name STRING -- descipriton
)
USING DELTA
""")

# TRADE FLOW TABLE

spark.sql("""
CREATE TABLE IF NOT EXISTS trade_flow_type (
    trade_flow_type_id INT NOT NULL, -- hs code
    trade_flow_type_name STRING -- descipriton
)
USING DELTA
""")


StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 13, Finished, Available, Finished)

DataFrame[]

In [13]:
%%sql

-- LOGIC Get DISITNCT DATES FROM SilverData trade_details
-- OPERATIONS DATE_ID, count year num,month num, derive month name and quarter_name




MERGE INTO dim_date  as dd
 USING
 (
    SELECT DISTINCT
        CAST(date_format(td.date,'ddMMyyyy') AS BIGINT) AS date_id,
        td.date       AS full_date,
        YEAR(td.date) AS year_num,
        MONTH(td.date) AS month_num,                                  
        date_format(td.date,'MMMM') AS month_name,
        QUARTER(td.date)  AS quarter_num                                
    FROM SilverData.trades AS td
    )  
    AS src 
    ON src.date_id=dd.date_id
    WHEN NOT MATCHED THEN
        INSERT  (date_id,full_date,year_num,month_num,month_name,quarter_num)
            VALUES(
                src.date_id,
                src.full_date,
                src.year_num,
                src.month_num,
                src.month_name,
                src.quarter_num);





StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 14, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [14]:
%%sql
MERGE INTO dim_reporter as dr
USING(
SELECT 
    DISTINCT(sr.reporter_id) as reporter_id,
    sc.iso as reporter_iso,
    sr.reporter_name as province
 FROM 
SilverData.reporter sr
JOIN SilverData.country as sc 
ON sc.country_id=sr.country_id
)
AS src 
ON src.reporter_id=dr.reporter_id
WHEN NOT MATCHED THEN
INSERT (reporter_id,reporter_iso,province)
VALUES(src.reporter_id,src.reporter_iso,src.province) ;


StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 15, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [15]:
%%sql
MERGE INTO dim_partner as dr 
USING (
    SELECT 
        DISTINCT(st.trade_state_id) as partner_id,
        sc.iso as partner_iso,
        st.state_name as state

        FROM SilverData.trade_partner as st
        JOIN SilverData.country as sc 
        ON st.country_id = sc.country_id
)
as src 
ON src.partner_id = dr.partner_id
WHEN NOT MATCHED THEN 
INSERT (partner_id,partner_iso,state) 
VALUES(src.partner_id,src.partner_iso,src.state);



StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 16, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [16]:
%%sql

MERGE INTO dim_item as di
USING 
(
    SELECT 
        DISTINCT (sh.section_code) as item_id,
        sh.description as item_name

        FROM SilverData.hs_section as sh
) 
AS src 
ON di.item_id=src.item_id
WHEN NOT MATCHED THEN
INSERT (item_id,item_name)
VALUES(src.item_id,item_name) ;



StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 17, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [17]:
%%sql
MERGE INTO trade_flow_type tf 
USING (
SELECT 
    trade_flow_type_id,
    trade_flow_type_name 
    FROM SilverData.trade_flow_type
)
AS src 
ON tf.trade_flow_type_id = src.trade_flow_type_id
WHEN NOT MATCHED THEN
INSERT (trade_flow_type_id,trade_flow_type_name)
VALUES(src.trade_flow_type_id,src.trade_flow_type_name);

-- SELECT * FROM trade_flow_type;

StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 18, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [18]:
%%sql

MERGE INTO Fact_Trade_Export fx 
USING(
    SELECT  
      tdt.dollar_value      AS trade_value, --DOLLAR_VALUE
      hs.section_code      AS item_Dim_FK, --HS_CODE
      td.trade_state_id      AS Partner_Dim_FK,
      td.reporter_id      AS Reporter_Dim_FK,
      CAST(date_format(td.date,"ddMMyyyy") AS BIGINT)      AS Date_Dim_FK,
      td.trade_flow_type_id      AS trade_flow_type_FK

    FROM SilverData.trade_details as tdt
    JOIN  SilverData.hs_section as hs
    ON hs.section_id=tdt.hs_sections_id
    JOIN SilverData.trades as td 
    ON td.trades_id=tdt.trades_id
    WHERE td.trade_flow_type_id IN (1,2,3)
)
AS src
ON fx.trade_value = src.trade_value
AND fx.item_Dim_FK = src.item_Dim_FK
AND fx.Partner_Dim_FK = src.Partner_Dim_FK
AND fx.Reporter_Dim_FK= src.Reporter_Dim_FK
AND fx.Date_Dim_FK = src.Date_Dim_FK
AND fx.trade_flow_type_FK =  src.trade_flow_type_FK
WHEN NOT MATCHED THEN 
INSERT(id,trade_value,item_Dim_FK,Partner_Dim_FK,Reporter_Dim_FK,Date_Dim_FK,trade_flow_type_FK)
VALUES(
    ABS(HASH(src.trade_value, src.item_Dim_FK, src.Partner_Dim_FK, src.Reporter_Dim_FK, src.Date_Dim_FK, src.trade_flow_type_FK)),
    src.trade_value, src.item_Dim_FK, src.Partner_Dim_FK, src.Reporter_Dim_FK, src.Date_Dim_FK, src.trade_flow_type_FK);


StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 19, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [19]:
%%sql

MERGE INTO Fact_Trade_Import fi 
USING(
    SELECT  
      tdt.dollar_value      AS trade_value, --DOLLAR_VALUE
      hs.section_code      AS item_Dim_FK, --HS_CODE
      td.trade_state_id      AS Partner_Dim_FK,
      td.reporter_id      AS Reporter_Dim_FK,
      CAST(date_format(td.date,"ddMMyyyy") AS BIGINT)      AS Date_Dim_FK,
      td.trade_flow_type_id      AS trade_flow_type_FK

    FROM SilverData.trade_details as tdt
    JOIN  SilverData.hs_section as hs
    ON hs.section_id=tdt.hs_sections_id
    JOIN SilverData.trades as td 
    ON td.trades_id=tdt.trades_id
    WHERE td.trade_flow_type_id IN (4)
)
AS src
ON fi.trade_value = src.trade_value
AND fi.item_Dim_FK = src.item_Dim_FK
AND fi.Partner_Dim_FK = src.Partner_Dim_FK
AND fi.Reporter_Dim_FK= src.Reporter_Dim_FK
AND fi.Date_Dim_FK = src.Date_Dim_FK
AND fi.trade_flow_type_FK =  src.trade_flow_type_FK
WHEN NOT MATCHED THEN 
INSERT(id,trade_value,item_Dim_FK,Partner_Dim_FK,Reporter_Dim_FK,Date_Dim_FK,trade_flow_type_FK)
VALUES(
    ABS(HASH(src.trade_value, src.item_Dim_FK, src.Partner_Dim_FK, src.Reporter_Dim_FK, src.Date_Dim_FK, src.trade_flow_type_FK)),
    src.trade_value, src.item_Dim_FK, src.Partner_Dim_FK, src.Reporter_Dim_FK, src.Date_Dim_FK, src.trade_flow_type_FK);



StatementMeta(, 868b9c7f-3471-47e3-a3d1-1baf7ffd6c5e, 20, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>