# 01-Load a Fact table
This notebook extracts data from various sources and loads into a staging table in Synapse.
This notebook executes some popular transformations you will encounter in real-life scenarios
					
## Contents
1. Extract
1. Transform
1. Load

In [1]:
# Set Parameters

# Set the date when the target table was last loaded
lastRefreshDate = "2021-10-09"

# Set the name of the delta lake table name
sourceTableName = "sparklakehouse.stg_internet_sales"

# Set path to source files
basePath = "abfss://data@REPLACE_DATALAKE_NAME.dfs.core.windows.net/sample/AdventureWorksDW2019/dbo/"


### 1. Extract

a. Extract from Delta Lake table

In [3]:
# Extract from Delta lake staging table

newDF = spark.sql("SELECT * FROM "+sourceTableName)

display(newDF.limit(10))

In [4]:
newDF.count()

In [5]:
# Check how many partitions the data is distributed into
newDF.rdd.getNumPartitions()

##### Extract only changes since the last refresh date
This is a brute force way of figuring out the changes. 

Best way would be to get only the changed data from the source system.

The below code is to demonstrate leveraging Delta table history details

In [6]:
# Get table details
tblHistDF = spark.sql("DESCRIBE HISTORY "+sourceTableName)
tblDetDF = spark.sql("DESCRIBE DETAIL "+sourceTableName)

In [7]:
# Get the version number from which to extract changes from source delta table
if tblHistDF.filter("timestamp <= '"+lastRefreshDate+" 00:00:00'").count()==0:
    # if there are no history records before last refresh date then it is an initial load
    # We set the prevVersion to -1 to indicate it's an initial load
    prevVersion = -1
else:
    prevVersion = tblHistDF.filter("timestamp <= '"+lastRefreshDate+" 00:00:00'").agg({'version': 'max'}).collect()[0][0]
currVersion = tblHistDF.filter("timestamp >= '"+lastRefreshDate+" 00:00:00'").agg({'version': 'max'}).collect()[0][0]

print("The latest version before last refresh date is "+str(prevVersion))
print("The latest version after last refresh date is "+str(currVersion))

In [8]:
# If this is an initial load get the latest data
if prevVersion == -1:
    newDF = spark.sql("SELECT * FROM "+sourceTableName)
# If its not an initial load then use minus to find the changed rows
# This process will get easier with future versions of delta.io project
else:
    prevDF = spark.read.format("delta").option("versionAsOf", prevVersion).load(tablePath)
    currDF = spark.read.format("delta").option("versionAsOf", currVersion).load(tablePath)
    prevDF.createOrReplaceTempView("prev_tmp")
    currDF.createOrReplaceTempView("curr_tmp")
    newDF = spark.sql("SELECT * FROM curr_tmp MINUS SELECT * FROM prev_tmp")

In [9]:
newDF.createOrReplaceTempView("new_data_tmp")

###### Extract Other Sources
Extract from Parquet file

In [10]:
# Create a spark dataframe with raw data
customerDF = spark.read.parquet(basePath + "DimCustomer")
customerDF.createOrReplaceTempView("customer_tmp")
display(customerDF.limit(10))

In [11]:
# Create a spark dataframe with raw data
geographyDF = spark.read.parquet(basePath + "DimGeography")
geographyDF.createOrReplaceTempView("geography_tmp")
display(geographyDF.limit(10))

Extract from JSON file from Web

In [12]:
import requests
from notebookutils import mssparkutils

url = "https://raw.githubusercontent.com/samayo/country-json/master/src/country-by-population.json"
r = requests.get(url, allow_redirects=True)

# Use Microsoft Spark Utilities to read the file and write to file path.
mssparkutils.fs.put(basePath+"country-by-population.json", r.text, True)

# Use the multiline option when the data in the json file is enclosed in []
countriesDF = spark.read.option("multiline","true").json(basePath+"country-by-population.json")
countriesDF.createOrReplaceTempView("countries_tmp")
display(countriesDF.limit(10))

###### Some important performance settings to be adjusted and experimented with

In [13]:
# Check how many partitions will be used to shuffle data between executors
# As a rule of thumb, each partition shouldn't be larger than 128MB
spark.conf.get("spark.sql.shuffle.partitions")

In [14]:
# Check how many cores will be used
print(spark.sparkContext.defaultParallelism)

In [15]:
# Set this to a high number just to be safe for large data sets
spark.conf.set("spark.sql.shuffle.partitions", "1000")

In [16]:
# Check if AQE is enable. 
spark.conf.get("spark.sql.adaptive.enabled")

In [17]:
# Enable AQE to ensure Shuffle partition number gets set automatically depending on the data set
spark.conf.set("spark.sql.adaptive.enabled", "true")

### 2. Transform

In [18]:
# Always import these two sets of libraries at a minimum for spark transformations
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [19]:
%%sql
describe new_data_tmp

In [20]:
%%sql
SELECT 
ndt.order_date_sk,
ndt.order_date,
ndt.product_sk,
ndt.customer_sk,
ndt.ship_date_sk,
ndt.due_date_sk,
ndt.promotion_sk,
ndt.currency_sk,
ndt.sales_territory_sk,
ndt.sales_order_no,
ndt.sales_order_line_no,
ndt.revision_no,
cnt.country,
cnt.population,
ndt.order_qty as quantity,
ndt.sales_amount as sales,
ndt.discount_amount as discount
FROM 
new_data_tmp ndt, 
customer_tmp ct, 
geography_tmp gt,
countries_tmp cnt
WHERE 
ndt.customer_sk = ct.CustomerKey 
AND ct.GeographyKey = gt.GeographyKey
AND gt.EnglishCountryRegionName = cnt.Country
LIMIT 100

In [22]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW stg_sales_by_country AS
SELECT 
ndt.order_date_sk,
ndt.order_date,
ndt.product_sk,
ndt.customer_sk,
ndt.ship_date_sk,
ndt.due_date_sk,
ndt.promotion_sk,
ndt.currency_sk,
ndt.sales_territory_sk,
ndt.sales_order_no,
ndt.sales_order_line_no,
ndt.revision_no,
cnt.country,
cnt.population,
ndt.order_qty as quantity,
ndt.sales_amount as sales,
ndt.discount_amount as discount
FROM 
new_data_tmp ndt, 
customer_tmp ct, 
geography_tmp gt,
countries_tmp cnt
WHERE 
ndt.customer_sk = ct.CustomerKey 
AND ct.GeographyKey = gt.GeographyKey
AND gt.EnglishCountryRegionName = cnt.Country

### 3. Load

In [25]:
%%sql
DROP TABLE IF EXISTS sparklakehouse.stg_sales_by_country

In [26]:
%%sql
--First Load data into a Delta Lake table
CREATE TABLE sparklakehouse.stg_sales_by_country USING DELTA AS
SELECT * FROM stg_sales_by_country

In [77]:
# -- %%sql
# -- -- We can merge only changes into the target table
# -- -- This cell should be commented out during the initial load
# -- -- MERGE INTO sparklakehouse.stg_sales_by_country t
# -- -- USING stg_sales_by_country s 
# -- -- ON t.country = s.country
# -- -- WHEN MATCHED THEN UPDATE SET *
# -- -- WHEN NOT MATCHED THEN INSERT *

In [78]:
# -- %%sql
# -- -- VACUUM sparklakehouse.stg_sales_by_country RETAIN 168 HOURS;

In [27]:
%%spark
// Create a scala data frame from the Temporary table
val scala_df = spark.sqlContext.sql ("select * from stg_sales_by_country")
					
// Create a new staging table in Synapse from which we will upload to the final table using a stored procedure executed after this notebook is run
scala_df.write.synapsesql("SQLTestPool.dbo.StgFactSalesByCountry", Constants.INTERNAL)