# Transforming data from silver to gold 

In [5]:
df = spark.read.table("trade_silver")


StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 7, Finished, Available, Finished)

# Creating date dimension table 

In [6]:
from pyspark.sql.types import *
from delta.tables import*

# # Define the schema for the dimdate_gold table
DeltaTable.createIfNotExists(spark) \
.tableName("dimdate_gold") \
.addColumn("Date", DateType()) \
.addColumn("DateID", LongType())\
.addColumn("Day", IntegerType()) \
.addColumn("Month", IntegerType()) \
.addColumn("Year", IntegerType()) \
.addColumn("mmmyyyy", StringType()) \
.addColumn("yyyymm", StringType()) \
.execute()

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 8, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x75bdd8c5f890>

In [7]:
from pyspark.sql.functions import col, dayofmonth, month, year, date_format

# # Create dataframe for dimDate_gold

dfdimDate_gold = df.dropDuplicates(["Date"]).select(col("Date"), \
dayofmonth("Date").alias("Day"), \
month("Date").alias("Month"),\
year("Date").alias("Year"), \
date_format(col("Date"), "MMM-yyyy").alias("mmmyyyy"), \
date_format(col("Date"), "yyyyMM").alias("yyyymm"), \
).orderBy("Date")

# # Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9b74435b-3307-48bb-8613-afbb592d95cf)

# Creating Date ID

In [8]:
from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

dfdimDate_temp = spark.read.table("dimDate_gold")

MAXDateID = dfdimDate_temp.select(coalesce(max(col("DateID")),lit(0)).alias("MAXDateID")).first()[0]

dfdimDate_gold = dfdimDate_gold.join(dfdimDate_temp,(dfdimDate_gold.Date == dfdimDate_temp.Date), "left_anti")

dfdimDate_gold = dfdimDate_gold.withColumn("DateID",monotonically_increasing_id() + MAXDateID + 1)

# # Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a5d884fb-afc6-4813-a9cd-b39fec344a2a)

# Writing the dim date table into the dimdate_gold table


In [9]:
dfdimDate_gold.write.format("delta").mode("overwrite").saveAsTable("dimDate_gold")


StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 11, Finished, Available, Finished)

In [10]:
spark.read.table("dbo.dimDate_gold").show(10, False)

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 12, Finished, Available, Finished)

+----------+------+---+-----+----+--------+------+
|Date      |DateID|Day|Month|Year|mmmyyyy |yyyymm|
+----------+------+---+-----+----+--------+------+
|2015-01-01|1     |1  |1    |2015|Jan-2015|201501|
|2015-04-01|2     |1  |4    |2015|Apr-2015|201504|
|2015-07-01|3     |1  |7    |2015|Jul-2015|201507|
|2015-10-01|4     |1  |10   |2015|Oct-2015|201510|
|2016-01-01|5     |1  |1    |2016|Jan-2016|201601|
|2016-04-01|6     |1  |4    |2016|Apr-2016|201604|
|2016-07-01|7     |1  |7    |2016|Jul-2016|201607|
|2016-10-01|8     |1  |10   |2016|Oct-2016|201610|
|2017-01-01|9     |1  |1    |2017|Jan-2017|201701|
|2017-04-01|10    |1  |4    |2017|Apr-2017|201704|
+----------+------+---+-----+----+--------+------+
only showing top 10 rows



# Create a dimension table for region with ID

In [11]:
from pyspark.sql.types import *
from delta.tables import *

# # Create customer_gold dimension delta table
DeltaTable.createIfNotExists(spark) \
.tableName("dimRegion_gold") \
.addColumn("Region", StringType()) \
.addColumn("RegionID", LongType()) \
.execute()

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 13, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x75bdd8c5ce50>

In [12]:
from pyspark.sql.functions import col, trim

# Create Region dimension table
dfdimRegion_gold = df.select(trim(col("Region")).alias("Region")) \
                     .filter((col("Region").isNotNull()) & (col("Region") != "")) \
                   

# Preview
dfdimRegion_gold.show()

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 14, Finished, Available, Finished)

+-------------+
|       Region|
+-------------+
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
+-------------+
only showing top 20 rows



In [13]:
display(dfdimRegion_gold.show(10))

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 15, Finished, Available, Finished)

+-------------+
|       Region|
+-------------+
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
|All countries|
+-------------+
only showing top 10 rows



# Adding regional ID 

In [14]:
from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

dfdimRegion_temp = spark.read.table("dimRegion_gold")

MAXRegionID = dfdimRegion_temp.select(coalesce(max(col("RegionID")),lit(0)).alias("MAXRegionID")).first()[0]

dfdimRegion_gold = dfdimRegion_gold.join(dfdimRegion_temp,(dfdimRegion_gold.Region == dfdimRegion_temp.Region), "left_anti")

dfdimRegion_gold = dfdimRegion_gold.withColumn("RegionID",monotonically_increasing_id() + MAXRegionID + 1)

# # Display the first 10 rows of the dataframe to preview your data


display(dfdimRegion_gold.head(10))

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3fd90c3a-2d47-48b2-9615-70f254bc6a71)

# Writing the updated region data into the dimregion_gold table


In [15]:
dfdimRegion_gold.write.format("delta").mode("overwrite").saveAsTable("dimRegion_gold")

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 17, Finished, Available, Finished)

In [16]:
spark.read.table("dimRegion_gold").show(10, False)

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 18, Finished, Available, Finished)

+-------------+--------+
|Region       |RegionID|
+-------------+--------+
|All countries|1       |
|All countries|2       |
|All countries|3       |
|All countries|4       |
|All countries|5       |
|All countries|6       |
|All countries|7       |
|All countries|8       |
|All countries|9       |
|All countries|10      |
+-------------+--------+
only showing top 10 rows



# Create Fact trade table


In [17]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
.tableName("facttrade_gold") \
.addColumn("RegionID", LongType()) \
.addColumn("DateID", LongType())\
.addColumn("E_Total_Current_Account", StringType())\
.addColumn("E_Good_and_Services", StringType())\
.addColumn("E_Goods", StringType())\
.addColumn("E_Services", StringType())\
.addColumn("E_Primary_Income", StringType())\
.addColumn("E_Compensation_of_Employee", StringType())\
.addColumn("E_Investment_Income", StringType())\
.addColumn("E_Direct_Investment_Income", StringType())\
.addColumn("E_Portfolio_Investment_Income", StringType())\
.addColumn("E_Other_Investment_Income", StringType())\
.addColumn("E_Secondary_Income", StringType())\
.addColumn("E_Private_Transfer", StringType())\
.addColumn("E_Government_Transfer", StringType())\
.addColumn("I_Total_Current_Account", StringType())\
.addColumn("I_Goods_and_Services", StringType())\
.addColumn("I_Goods", StringType())\
.addColumn("I_Services", StringType())\
.addColumn("I_Primary_Income", StringType())\
.addColumn("I_Compensation_of_Employee", StringType())\
.addColumn("I_Investment_Income", StringType())\
.addColumn("I_Direct_Investment_Income", StringType())\
.addColumn("I_Portfolio_Investment_Income", StringType())\
.addColumn("I_Other_Investment_Income", StringType())\
.addColumn("I_Secondary_Income", StringType())\
.addColumn("I_Private_Transfer", StringType())\
.addColumn("I_Government_Transfer", StringType())\
.execute()


StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 19, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x75bdd8c43050>

In [18]:

dffacttrade_gold = df.alias("df1") \
    .join(dfdimDate_temp.alias("df2"), col("df1.Date") == col("df2.Date"), "left") \
    .join(dfdimRegion_temp.alias("df3"), col("df1.Region") == col("df3.Region"), "left") \
    .select(
        col("df2.DateID"),
        col("df3.RegionID"),
        col("df1.E_Total_Current_Account"),
        col("df1.E_Good_and_Services"),
        col("df1.E_Goods"),
        col("df1.E_Services"),
        col("df1.E_Primary_Income"),
        col("df1.E_Compensation_of_Employee"),
        col("df1.E_Investment_Income"),
        col("df1.E_Direct_Investment_Income"),
        col("df1.E_Portfolio_Investment_Income"),
        col("df1.E_Other_Investment_Income"), 
        col("df1.E_Secondary_Income"),
        col("df1.E_Private_Transfer"), 
        col("df1.E_Government_Transfer"),
        col("df1.I_Total_Current_Account"),
        col("df1.I_Goods_and_Services"),
        col("df1.I_Goods"),
        col("df1.I_Services"),
        col("df1.I_Primary_Income"), 
        col("df1.I_Compensation_of_Employee"), 
        col("df1.I_Investment_Income"), 
        col("df1.I_Direct_Investment_Income"),
        col("df1.I_Portfolio_Investment_Income"),
        col("df1.I_Other_Investment_Income"),
        col("df1.I_Secondary_Income"),
        col("df1.I_Private_Transfer"),
        col("df1.I_Government_Transfer")
    ).orderBy(col("df1.Date"), col("df2.DateID"), col("df3.RegionID"))
display(dffacttrade_gold.head(10))

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f1baf25e-f18d-421c-a314-72d21d1e178e)

# Writing the updated Fact table into fact trade table

In [19]:
dffacttrade_gold.write.format("delta").mode("overwrite").saveAsTable("facttrade_gold")

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 21, Finished, Available, Finished)

In [20]:
spark.read.table("facttrade_gold").show(10, False)

StatementMeta(, 51b415ed-aefb-4147-97d2-963d3a00779f, 22, Finished, Available, Finished)

+--------+------+-----------------------+-------------------+-------+----------+----------------+--------------------------+-------------------+--------------------------+-----------------------------+-------------------------+------------------+------------------+---------------------+-----------------------+--------------------+-------+----------+----------------+--------------------------+-------------------+--------------------------+-----------------------------+-------------------------+------------------+------------------+---------------------+
|RegionID|DateID|E_Total_Current_Account|E_Good_and_Services|E_Goods|E_Services|E_Primary_Income|E_Compensation_of_Employee|E_Investment_Income|E_Direct_Investment_Income|E_Portfolio_Investment_Income|E_Other_Investment_Income|E_Secondary_Income|E_Private_Transfer|E_Government_Transfer|I_Total_Current_Account|I_Goods_and_Services|I_Goods|I_Services|I_Primary_Income|I_Compensation_of_Employee|I_Investment_Income|I_Direct_Investment_Income