### Creating star schema ; dimensions and fact tables that I will use in queries

#### Kreiranje Delta Tabela

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS gold;

In [0]:
%sql
CREATE TABLE gold.dim_customers
USING DELTA
LOCATION 'dbfs:/FileStore/MedallionArchitecture/gold/Dim_Customers';

CREATE TABLE gold.dim_products
USING DELTA
LOCATION 'dbfs:/FileStore/MedallionArchitecture/gold/Dim_Products';

CREATE TABLE gold.dim_shippers
USING DELTA
LOCATION 'dbfs:/FileStore/MedallionArchitecture/gold/Dim_Shippers';

CREATE TABLE gold.dim_orderdate
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/Dim_Order_Date";

CREATE TABLE gold.dim_shipmentdate
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/Dim_Shipment_Date";


CREATE TABLE gold.factFINALtable
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/FactTableFINAL"


In [0]:
%sql
SELECT COUNT(DISTINCT(OrderId))
FROM silver.orderdetails

count(DISTINCT OrderId)
6571


In [0]:
%sql
SELECT od.OrderID
FROM silver.orderdetails od
LEFT JOIN silver.orders o ON od.OrderID = o.OrderID
WHERE o.CustomerID IS NULL;


OrderID


In [0]:
%sql
SELECT COUNT(DISTINCT(OrderID))
FROM silver.orders;

count(DISTINCT OrderID)
4266


In [0]:
# Load Silver Layer Tables
df_article_categories = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/ArticleCategories")
df_customers = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Customers")
df_divisions = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Divisions")
df_order_details = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/OrderDetails")
df_orders = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Orders")
df_products = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Products")
df_shipments = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Shipments")
df_shippers = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Shippers")

#### kreiranje dimenzije Dim_Customers

In [0]:
%sql
SELECT *
FROM silver.Customers
LIMIT 20;

CustomerID,CompanyName,ContactName,City,Country,DivisionID,Address,Fax,Phone,PostalCode,StateProvince,Street,StreetNumber
62,Roba di Piel,Jorge Alemaio,São Paulo,Brazil,4,"Alameda dos Canàrios, 891",,115551189.0,05487020,,Alameda dos Canàrios,891.0
35,El Zapato Rojo,Nicolas Balines,San Cristóbal,Venezuela,4,Carrera 22 con Ave. Carlos Soublette #8-35,55551948.0,55551340.0,5022,,Carrera con Ave. Carlos Soublette #8-35,22.0
6,Man Kleider,Herman Hinschler,Mannheim,Germany,1,Forsterstr. 57,62108924.0,62108460.0,68306,,Forsterstr,57.0
9,La Legion Mercenaire,Bernard de Gaule,Marseille,France,1,"12, rue des Bouchers",91244541.0,91244540.0,13008,,rue des Bouchers,12.0
13,Los Sombreros Gigantes,Speedy Gonzales,México D.F.,Mexico,2,El Barrio Chino 12,55557293.0,55553392.0,5021,,El Barrio Chino,12.0
16,The sharped dressed man,Ian Wright,London,UK,1,Berkeley Gardens,,,,,Berkeley Gardens,
73,Rode & Vite,Preben Elkjaer,København,Denmark,3,Vinbæltet 34,31133557.0,1234.0,1734,,Vinbæltet,34.0
52,Kohl Industries AG,Helmuth Klein,Leipzig,Germany,1,Heerstr. 22,,342023176.0,4179,,Heerstr,22.0
14,Das Alpen Shoe,Alfred Neumann,Bern,Switzerland,1,Hauptstr. 29,,452076545.0,3012,,Hauptstr,29.0
59,Extrawagens,Herbert Bernstorf,Salzburg,Austria,1,Geislweg 14,65629723.0,65629722.0,5020,,Geislweg,14.0


In [0]:
%sql
SELECT sc.CompanyName, sd.Division, sd.Subdivision
FROM silver.Customers AS sc
INNER JOIN silver.divisions AS sd
ON
sc.DivisionID = sd.DivisionID

CompanyName,Division,Subdivision
Roba di Piel,South America,South America
El Zapato Rojo,South America,South America
Man Kleider,Europe,Europe
La Legion Mercenaire,Europe,Europe
Los Sombreros Gigantes,North America,North America
The sharped dressed man,Europe,Europe
Rode & Vite,Europe,Scandinavia
Kohl Industries AG,Europe,Europe
Das Alpen Shoe,Europe,Europe
Extrawagens,Europe,Europe


In [0]:
df_dim_customers = (
    df_customers.join(df_divisions, "DivisionID", "left").select("CustomerID", "CompanyName", "ContactName", "City", "Country", 
            "Division", "Subdivision","Address", "Street", "PostalCode", "Phone")  
)

# Save to Gold Layer
df_dim_customers.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Customers")

#### kreiranje dimenzije Dim_Products

In [0]:
%sql
SELECT *
FROM silver.Products
LIMIT 20;

ProductID,ProductName,SupplierID,CategoryID,QuantityPerUnit,UnitCost,UnitPrice,UnitsInStock,UnitsOnOrder,Margin_Percent,Profit
1,Lenin Jeansshorts,1,1,10,15.37,20.0,39,0,23.15,4.63
2,Mr2 Trousers,1,1,10,16.17,19.0,17,40,14.89,2.83
3,Chantell Shirt,1,2,20,15.45,20.0,13,70,22.75,4.55
4,Rossi Shorts,2,2,15,19.34,22.0,53,0,12.09,2.66
5,O-Man Underwear,2,1,100,4.72,6.0,0,0,21.33,1.28
6,Shagall Socks,3,2,50,3.21,4.0,120,0,19.75,0.79
7,Runner Shoes,3,7,40,33.11,40.0,15,0,17.23,6.89
8,Tuxedo Top,3,2,20,16.14,20.0,6,0,19.3,3.86
9,Fuji Boots,4,6,10,32.9,39.0,29,0,15.64,6.1
10,Sapporoo Gloves,4,8,20,6.2,7.0,31,0,11.43,0.8


In [0]:
dbutils.fs.rm("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Products",True)

Out[11]: True

In [0]:
df_products = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/Products")
df_article_categories = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/silver/ArticleCategories")
df_dim_products = (
    df_products
    .join(df_article_categories, "CategoryID", "left") 
    .select("ProductID", "ProductName", "Category", "SubCategory")
)

# Save to Gold Layer
df_dim_products.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Products")


In [0]:
%sql
CREATE TABLE gold.dim_products
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/Dim_Products"

In [0]:
%sql
SELECT *
FROM gold.dim_products

ProductID,ProductName,Category,SubCategory
1,Lenin Jeansshorts,Clothes,Men's Clothes
2,Mr2 Trousers,Clothes,Men's Clothes
3,Chantell Shirt,Clothes,Women's Clothes
4,Rossi Shorts,Clothes,Women's Clothes
5,O-Man Underwear,Clothes,Men's Clothes
6,Shagall Socks,Clothes,Women's Clothes
7,Runner Shoes,Clothes,Children's Clothes
8,Tuxedo Top,Clothes,Women's Clothes
9,Fuji Boots,Footwear,Men's Footwear
10,Sapporoo Gloves,Clothes,Baby Clothes


### dimenzija dim Shippers

In [0]:
df_dim_shippers = df_shippers.select("ShipperID", "CompanyName")
df_dim_shippers.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Shippers")

### dimenzija Date that I decided not to use (instead I incorporated logic of using dimesion order and dimension shipment date)

In [0]:
from pyspark.sql.functions import col, year, month, quarter
df_dim_date = (
    df_orders.select(col("OrderDate").alias("Date"))
    .union(df_shipments.select(col("ShipmentDate").alias("Date")))
    .distinct()
    .withColumn("Year", year(col("Date")))
    .withColumn("Month", month(col("Date")))
    .withColumn("Quarter", quarter(col("Date")))
)
df_dim_date.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Date")


In [0]:
dbutils.fs.rm("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Date", recurse=True)

Out[13]: True

In [0]:
from pyspark.sql.functions import col, year, month, quarter, monotonically_increasing_id

df_dim_date = (
    df_orders.select(col("OrderDate").alias("Date"))
    .union(df_shipments.select(col("ShipmentDate").alias("Date")))
    .distinct()
    .withColumn("Year", year(col("Date")))
    .withColumn("Month", month(col("Date")))
    .withColumn("Quarter", quarter(col("Date")))
    .withColumn("DateKey", monotonically_increasing_id())  # Generate unique DateKey
)

# Save the updated Date Dimension table
df_dim_date.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Date")

In [0]:
%sql
CREATE TABLE gold.dim_date
USING DELTA
LOCATION 'dbfs:/FileStore/MedallionArchitecture/gold/Dim_Date';

In [0]:
%sql
SELECT *
FROM gold.dim_date

Date,Year,Month,Quarter,DateKey
2007-11-23,2007,11,4,0
2010-08-11,2010,8,3,1
2009-07-25,2009,7,3,2
2009-11-22,2009,11,4,3
2011-01-29,2011,1,1,4
2011-01-30,2011,1,1,5
2011-10-07,2011,10,4,6
2009-09-27,2009,9,3,7
2010-09-24,2010,9,3,8
2009-06-28,2009,6,2,9


In [0]:
%sql
DROP TABLE IF EXISTS gold.dim_date

### Fact tabela Sales

In [0]:
%sql
SELECT *
FROM silver.products

ProductID,ProductName,SupplierID,CategoryID,QuantityPerUnit,UnitCost,UnitPrice,UnitsInStock,UnitsOnOrder,Margin_Percent,Profit
1,Lenin Jeansshorts,1,1,10,15.37,20.0,39,0,23.15,4.63
2,Mr2 Trousers,1,1,10,16.17,19.0,17,40,14.89,2.83
3,Chantell Shirt,1,2,20,15.45,20.0,13,70,22.75,4.55
4,Rossi Shorts,2,2,15,19.34,22.0,53,0,12.09,2.66
5,O-Man Underwear,2,1,100,4.72,6.0,0,0,21.33,1.28
6,Shagall Socks,3,2,50,3.21,4.0,120,0,19.75,0.79
7,Runner Shoes,3,7,40,33.11,40.0,15,0,17.23,6.89
8,Tuxedo Top,3,2,20,16.14,20.0,6,0,19.3,3.86
9,Fuji Boots,4,6,10,32.9,39.0,29,0,15.64,6.1
10,Sapporoo Gloves,4,8,20,6.2,7.0,31,0,11.43,0.8


In [0]:
%sql
SELECT o.OrderID, s.ShipperID, s.ShipmentDate
FROM silver.orders o
LEFT JOIN silver.shipments s ON o.OrderID = s.OrderID
WHERE s.ShipperID IS NULL;

OrderID,ShipperID,ShipmentDate


### Kreiram fact tabelu. Ovde kreiram Shipment i Order date key. NE KORISTIM OVU FACT TABELU

In [0]:
# Load Other Tables
df_order_details = spark.read.table("silver.orderdetails").alias("od")
df_products = spark.read.table("silver.products").alias("p")
df_customers = spark.read.table("silver.customers").alias("c")
df_shippers = spark.read.table("silver.shippers").alias("sh")
df_dim_date = spark.read.table("gold.Dim_Date")
df_orders = spark.read.table("silver.orders").alias("o")
df_shipments = spark.read.table("silver.shipments").alias("s")

df_orders = (
    df_orders
    .join(df_dim_date.alias("d"), col("o.OrderDate") == col("d.Date"), "left")
    .select("o.*", col("d.DateKey").alias("OrderDateKey"))
    .drop("OrderDate")
)

# Join Shipments with Date Dimension to get ShipmentDateKey
df_shipments = (
    df_shipments
    .join(df_dim_date.alias("d"), col("s.ShipmentDate") == col("d.Date"), "left")
    .select("s.*", col("d.DateKey").alias("ShipmentDateKey"))
    .drop("ShipmentDate")
)

# Join Tables to Create Fact Sales with DateKeys
df_fact_sales = (
    df_order_details
    .join(df_orders, "OrderID", "inner")
    .join(df_products, "ProductID", "inner")
    .join(df_customers, "CustomerID", "left")
    .join(df_shippers, "ShipperID", "left")
    .join(df_shipments, "OrderID", "left")
)

# Select Required Columns (Now with DateKeys)
df_fact_sales = df_fact_sales.select(
   df_order_details["OrderID"],
    df_order_details["ProductID"],
    df_orders["CustomerID"], 
    df_orders["ShipperID"], 
    df_orders["OrderDateKey"], 
    df_shipments["ShipmentDateKey"], 
    df_order_details["Quantity"], 
    df_order_details["UnitPrice"],  
    df_order_details["TotalPrice"], 
    df_order_details["Total_Discount"],
    df_products["Profit"],         
    df_products["Margin_Percent"]
)

# Save FactSales Table
df_fact_sales.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/FactTableSales")


In [0]:
df = spark.read.format("delta").load("dbfs:/FileStore/MedallionArchitecture/gold/FactTableSales")
display(df)

OrderID,ProductID,CustomerID,ShipperID,OrderDateKey,ShipmentDateKey,Quantity,UnitPrice,TotalPrice,Total_Discount,Profit,Margin_Percent
15447,12,46,2,39,1430,29,12.72,313.55,55.33,2.08,20.8
15447,12,46,2,39,1430,29,12.72,313.55,55.33,2.08,20.8
15447,23,46,2,39,1430,67,11.74,668.59,117.99,1.57,17.44
15447,23,46,2,39,1430,67,11.74,668.59,117.99,1.57,17.44
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,14,25,2,656,822,21,24.21,508.41,0.0,2.89,12.43


In [0]:
%sql
CREATE TABLE gold.fact_table_sales
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/FactTableSales"

In [0]:
%sql
SELECT *
FROM gold.fact_table_sales

OrderID,ProductID,CustomerID,ShipperID,OrderDateKey,ShipmentDateKey,Quantity,UnitPrice,TotalPrice,Total_Discount,Profit,Margin_Percent
15447,12,46,2,39,1430,29,12.72,313.55,55.33,2.08,20.8
15447,12,46,2,39,1430,29,12.72,313.55,55.33,2.08,20.8
15447,23,46,2,39,1430,67,11.74,668.59,117.99,1.57,17.44
15447,23,46,2,39,1430,67,11.74,668.59,117.99,1.57,17.44
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,19,25,2,656,822,15,9.35,126.23,14.03,1.8,19.57
10623,14,25,2,656,822,21,24.21,508.41,0.0,2.89,12.43


### Kreiranje Dim_OrderDate i Dim_ShipmentDate

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, year, month, quarter, row_number

df_orders = spark.read.table("silver.orders")
df_dim_order_date = (
    df_orders.select(col("OrderDate").alias("Date"))
    .distinct()
    .withColumn("Year", year(col("Date")))
    .withColumn("Month", month(col("Date")))
    .withColumn("Quarter", quarter(col("Date")))
)

df_dim_order_date = df_dim_order_date.withColumn(
    "DateKey", row_number().over(Window.orderBy("Date"))
)

df_dim_order_date.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Order_Date")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, year, month, quarter, row_number

df_shipments = spark.read.table("silver.shipments")

df_dim_shipment_date = (
    df_shipments.select(col("ShipmentDate").alias("Date"))
    .distinct()
    .withColumn("Year", year(col("Date")))
    .withColumn("Month", month(col("Date")))
    .withColumn("Quarter", quarter(col("Date")))
)

# Generate Stable DateKey
df_dim_shipment_date = df_dim_shipment_date.withColumn(
    "DateKey", row_number().over(Window.orderBy("Date"))
)

df_dim_shipment_date.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/Dim_Shipment_Date")


In [0]:
%sql
CREATE TABLE gold.dim_orderdate
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/Dim_Order_Date";

In [0]:
%sql
CREATE TABLE gold.dim_shipmentdate
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/Dim_Shipment_Date";

In [0]:
# Load Other Tables
df_order_details = spark.read.table("silver.orderdetails").alias("od")
df_products = spark.read.table("silver.products").alias("p")
df_customers = spark.read.table("silver.customers").alias("c")
df_shippers = spark.read.table("silver.shippers").alias("sh")
df_orders = spark.read.table("silver.orders").alias("o")
df_shipments = spark.read.table("silver.shipments").alias("s")

df_dim_order_date = spark.read.table("gold.Dim_OrderDate").alias("d1")
df_dim_shipment_date = spark.read.table("gold.Dim_ShipmentDate").alias("d2")

df_orders = (
    df_orders
    .join(df_dim_order_date, df_orders["OrderDate"] == df_dim_order_date["Date"], "left")
    .select("o.*", col("d1.DateKey").alias("OrderDateKey"))
    .drop("OrderDate")
)

df_shipments = (
    df_shipments
    .join(df_dim_shipment_date, df_shipments["ShipmentDate"] == df_dim_shipment_date["Date"], "left")
    .select("s.*", col("d2.DateKey").alias("ShipmentDateKey"))
    .drop("ShipmentDate")
)

df_fact_sales = (
    df_order_details
    .join(df_orders, "OrderID", "inner")
    .join(df_products, "ProductID", "inner")
    .join(df_customers, "CustomerID", "left")
    .join(df_shippers, "ShipperID", "left")
    .join(df_shipments, "OrderID", "left")
)

df_fact_sales = df_fact_sales.select(
    df_order_details["OrderID"],
    df_order_details["ProductID"],
    df_orders["CustomerID"], 
    df_orders["ShipperID"], 
    df_orders["OrderDateKey"], 
    df_shipments["ShipmentDateKey"], 
    df_order_details["Quantity"], 
    df_order_details["UnitPrice"],  
    df_order_details["TotalPrice"], 
    df_order_details["Total_Discount"],
    df_products["Profit"],         
    df_products["Margin_Percent"]
)

df_fact_sales.write.format("delta").mode("overwrite").save("dbfs:/FileStore/MedallionArchitecture/gold/FactTableFINAL")


In [0]:
%sql
CREATE TABLE gold.factFINALtable
USING DELTA
LOCATION "dbfs:/FileStore/MedallionArchitecture/gold/FactTableFINAL"

In [0]:
%sql
DROP TABLE IF EXISTS gold.fact_table_sales

In [0]:
%sql
DROP TABLE IF EXISTS gold.dim_date