# Dimension tables

In [1]:
# Dimension table - City

from pyspark.sql.types import *

table_name = 'dimension_city'

dimension_city_schema = StructType([
    StructField('CityKey', IntegerType(), True), 
    StructField('WWICityID', IntegerType(), True), 
    StructField('City', StringType(), True), 
    StructField('StateProvince', StringType(), True), 
    StructField('Country', StringType(), True), 
    StructField('Continent', StringType(), True), 
    StructField('SalesTerritory', StringType(), True), 
    StructField('Region', StringType(), True), 
    StructField('Subregion', StringType(), True), 
    StructField('Location', StringType(), True), 
    StructField('LatestRecordedPopulation', LongType(), True), 
    StructField('ValidFrom', TimestampType(), True), 
    StructField('ValidTo', TimestampType(), True), 
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load('Files/full/' + table_name)
df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 3, Finished, Available, Finished)

In [2]:
%%sql
SELECT *
FROM dimension_city;

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 4, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 14 fields>

In [3]:
# Dimension table - customer

from pyspark.sql.types import *

table_name = 'dimension_customer'

dimension_customer_schema = StructType([
    StructField('CustomerKey', IntegerType(), True), 
    StructField('WWICustomerID', IntegerType(), True), 
    StructField('Customer', StringType(), True), 
    StructField('BillToCustomer', StringType(), True), 
    StructField('Category', StringType(), True), 
    StructField('BuyingGroup', StringType(), True), 
    StructField('PrimaryContact', StringType(), True), 
    StructField('PostalCode', StringType(), True), 
    StructField('ValidFrom', TimestampType(), True), 
    StructField('ValidTo', TimestampType(), True), 
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_customer_schema).option("header","true").load('Files/full/' + table_name)
df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 5, Finished, Available, Finished)

In [4]:
# Dimension table - date

from pyspark.sql.types import *

table_name = 'dimension_date'

dimension_date_schema = StructType([
    StructField('Date', TimestampType(), True), 
    StructField('DayNumber', IntegerType(), True), 
    StructField('Day', StringType(), True), 
    StructField('Month', StringType(), True), 
    StructField('ShortMonth', StringType(), True), 
    StructField('CalendarMonthNumber', IntegerType(), True), 
    StructField('CalendarMonthLabel', StringType(), True), 
    StructField('CalendarYear', IntegerType(), True), 
    StructField('CalendarYearLabel', StringType(), True), 
    StructField('FiscalMonthNumber', IntegerType(), True), 
    StructField('FiscalMonthLabel', StringType(), True), 
    StructField('FiscalYear', IntegerType(), True), 
    StructField('FiscalYearLabel', StringType(), True), 
    StructField('ISOWeekNumber', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_date_schema).option("header","true").load('Files/full/' + table_name)
df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 6, Finished, Available, Finished)

In [6]:
# Dimension table - employee 

from pyspark.sql.types import *

table_name = 'dimension_employee'

dimension_employee_schema = StructType([
    StructField('EmployeeKey', IntegerType(), True), 
    StructField('WWIEmployeeID', IntegerType(), True), 
    StructField('Employee', StringType(), True), 
    StructField('PreferredName', StringType(), True), 
    StructField('IsSalesperson', BooleanType(), True), 
    StructField('Photo', StringType(), True), 
    StructField('ValidFrom', TimestampType(), True), 
    StructField('ValidTo', TimestampType(), True), 
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_employee_schema).option("header","true").load('Files/full/' + table_name)
df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 8, Finished, Available, Finished)

In [7]:
# Dimension table - stock item 

from pyspark.sql.types import *

table_name = 'dimension_stock_item'

dimension_stock_item_schema = StructType([
    StructField('StockItemKey', IntegerType(), True), 
    StructField('WWIStockItemID', IntegerType(), True), 
    StructField('StockItem', StringType(), True), 
    StructField('Color', StringType(), True), 
    StructField('SellingPackage', StringType(), True), 
    StructField('BuyingPackage', StringType(), True), 
    StructField('Brand', StringType(), True), 
    StructField('Size', StringType(), True), 
    StructField('LeadTimeDays', IntegerType(), True), 
    StructField('QuantityPerOuter', IntegerType(), True), 
    StructField('IsChillerStock', BooleanType(), True), 
    StructField('Barcode', StringType(), True), 
    StructField('TaxRate', DecimalType(18,3), True), 
    StructField('UnitPrice', DecimalType(18,2), True), 
    StructField('RecommendedRetailPrice', DecimalType(18,2), True), 
    StructField('TypicalWeightPerUnit', DecimalType(18,3), True), 
    StructField('Photo', StringType(), True), 
    StructField('ValidFrom', TimestampType(), True), 
    StructField('ValidTo', TimestampType(), True), 
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_stock_item_schema).option("header","true").load('Files/full/' + table_name)
df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

StatementMeta(, f8eb6fd9-ff3f-46ed-ab6c-e9809cff3ab4, 9, Finished, Available, Finished)

# Fact Table

In [1]:
# Fact table - fact_sale

from pyspark.sql.types import *
from pyspark.sql.functions import col, year, month, quarter

table_name = 'fact_sale'

fact_sale_schema = StructType([
    StructField('SaleKey', LongType(), True), 
    StructField('CityKey', IntegerType(), True), 
    StructField('CustomerKey', IntegerType(), True), 
    StructField('BillToCustomerKey', IntegerType(), True), 
    StructField('StockItemKey', IntegerType(), True), 
    StructField('InvoiceDateKey', TimestampType(), True), 
    StructField('DeliveryDateKey', TimestampType(), True), 
    StructField('SalespersonKey', IntegerType(), True), 
    StructField('WWIInvoiceID', IntegerType(), True), 
    StructField('Description', StringType(), True), 
    StructField('Package', StringType(), True), 
    StructField('Quantity', IntegerType(), True), 
    StructField('UnitPrice', DecimalType(18,2), True), 
    StructField('TaxRate', DecimalType(18,3), True), 
    StructField('TotalExcludingTax', DecimalType(29,2), True), 
    StructField('TaxAmount', DecimalType(38,6), True), 
    StructField('Profit', DecimalType(18,2), True), 
    StructField('TotalIncludingTax', DecimalType(38,6), True), 
    StructField('TotalDryItems', IntegerType(), True), 
    StructField('TotalChillerItems', IntegerType(), True), 
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(fact_sale_schema).option("header","true").load('Files/full/' + table_name)

df = df.withColumn('Year', year(col("InvoiceDateKey")))
df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
df = df.withColumn('Month', month(col("InvoiceDateKey")))

df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)

StatementMeta(, b6be8f03-70de-487a-9256-d4469cfa8959, 3, Finished, Available, Finished)

In [3]:
%%sql
SELECT Year, Quarter, Month, count(*)
FROM fact_sale 
GROUP BY Year, Quarter, Month
ORDER BY Year, Quarter, Month;

StatementMeta(, b6be8f03-70de-487a-9256-d4469cfa8959, 5, Finished, Available, Finished)

<Spark SQL result set with 11 rows and 4 fields>