# Transform data from Silver (cleansed and conformed data) to Gold 'curated business-level tables' layer
##### Once the data is available in open delta parquet format, we can do the transformations and aggregations on the data using Spark or sql compute. In this notebook we showcase how we reach the gold layer of data which is curated for direct consumption in the Power BI reports. 
##### These gold layer data products can also be made available to other departments without the need to copy this data anywhere.

![Medallion Architecture](https://fabricddib.blob.core.windows.net/notebookimage/MedallionArchitecture.png)

## Spark configurations

In [4]:
spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, , , Waiting, )

## Load silver layer delta tables in Spark Dataframe
##### First we created shortcuts to the delta tables we created in the silver layer lakehouse earlier. Now those tables are directly mounted (synched) and available for the consumption here in the gold layer lakehouse. We can read them in Spark DataFrames, then perform transformation and aggregations operations. 

In [2]:
df_fact_sale = spark.read.table("#LAKEHOUSE_GOLD#.fact_sales") 
df_dim_date = spark.read.table("#LAKEHOUSE_GOLD#.dim_date")
df_dimension_product = spark.read.table("#LAKEHOUSE_GOLD#.dimension_product")
df_fact_campaigndata= spark.read.table("#LAKEHOUSE_GOLD#.fact_campaigndata")

StatementMeta(, , , Waiting, )

## Aggregated Table: Total Sales By Product
##### Here we are creating an aggregate table from the facts and dimension tables using the PySpark. We also have the option of using a SQL syntax which we will explore in the next cell.

In [3]:
sale_by_date_product = df_fact_sale.alias("sale") \
.join(df_dim_date.alias("date"), df_fact_sale.TransactionDate == df_dim_date.DateValue, "inner") \
.join(df_dimension_product.alias("product"), df_fact_sale.ProductId == df_dimension_product.Products_ID, "inner") \
.select("date.DateValue", "date.MonthName","product.Name", "product.Category", "sale.TotalAmount", "sale.ProfitAmount")\
.groupBy("date.DateValue", "date.MonthName", "product.Name", "product.Category")\
.sum("sale.TotalAmount", "sale.ProfitAmount")\
.withColumnRenamed("sum(TotalAmount)", "SumOfTotalAmount")\
.withColumnRenamed("sum(ProfitAmount)", "SumOfProfit")\
.orderBy("date.DateValue", "product.Name")
sale_by_date_product.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_product")

StatementMeta(, , , Waiting, )

## Aggregated Table: Total Campaign Revenue By Product Category

In [4]:
revenue_by_campaign_product = df_fact_campaigndata.alias("factcampdata") \
.join(df_dimension_product.alias("product"), df_fact_campaigndata.ProductCategory == df_dimension_product.Category, "inner") \
.select("product.Category", "factcampdata.Revenue","factcampdata.Campaign_Name", "factcampdata.Revenue_Target")\
.groupBy( "product.Category","factcampdata.Campaign_Name")\
.sum("factcampdata.Revenue", "factcampdata.Revenue_Target")\
.withColumnRenamed("sum(Revenue_Target)", "TotalRevenueTarget")\
.withColumnRenamed("sum(Revenue)", "TotalRevenue")\
.withColumnRenamed("Category", "ProductCategory")\
.withColumnRenamed("Campaign_Name", "CampaignName")\
.orderBy("factcampdata.Campaign_Name")
revenue_by_campaign_product.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("total_campaigns_revenue_by_product")

StatementMeta(, , , Waiting, )

## Temporary view
##### Here is an example of aggregation using the sql syntax. Based on the experties of your data citizen you can choose your own syntax and operate in parallel on thesame lakehouse with real time tracking feature that enables parallel collaboration and allows you to see where your collegues are at in the same notebook.

In [11]:
%%sql
CREATE OR REPLACE Temporary VIEW total_sale_by_customer
AS
SELECT
	DE.FirstName, DE.LastName
	,SUM(FS.TotalAmount) SumOfTotalExcludingTax
	,SUM(FS.ProfitAmount) SumOfTotalProfit
FROM _LAKEHOUSE_GOLD_.fact_sales FS
INNER JOIN _LAKEHOUSE_GOLD_.dimension_Customer DE ON FS.CustomerId = DE.Id
GROUP BY  DE.FirstName, DE.LastName
ORDER BY  DE.FirstName ASC


StatementMeta(, , , Waiting, )

<Spark SQL result set with 0 rows and 0 fields>

### View Results From Table 'total_campaigns_revenue_by_product'

In [None]:
%%sql
SELECT * FROM _LAKEHOUSE_GOLD_.total_sale_by_customer

StatementMeta(, , , Waiting, )

<Spark SQL result set with 0 rows and 4 fields>