### Spark session configuration
This cell sets Spark session settings to enable _Verti-Parquet_ and _Optimize on Write_. More details about _Verti-Parquet_ and _Optimize on Write_ in tutorial document.

In [6]:
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, 130188a5-6608-4480-97e7-d80e27ce6821, 8, Finished, Available, Finished)

### Fact - Sale

This cell reads raw data from the _Files_ section of the lakehouse, adds additional columns for different date parts and the same information is being used to create partitioned fact delta table.

In [7]:
from pyspark.sql.functions import col, year, month, quarter

table_name = 'fact_sale'

# Update the format to CSV and set options for header and schema inference
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('Files/0e7c2ef8-0e90-4aa3-8f71-551c25cc3016/WideWorldImportersDW/csv/full/fact_sale_1y_full')

df = df.withColumn('Year', year(col("InvoiceDateKey")))
df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
df = df.withColumn('Month', month(col("InvoiceDateKey")))

df.write.mode("overwrite").format("delta").partitionBy("Year", "Quarter").save("Tables/" + table_name)


StatementMeta(, 130188a5-6608-4480-97e7-d80e27ce6821, 9, Finished, Available, Finished)

### Dimensions
This cell creates a function to read raw data from the _Files_ section of the lakehouse for the table name passed as a parameter. Next, it creates a list of dimension tables. Finally, it has a _for loop_ to loop through the list of tables and call above function with each table name as parameter to read data for that specific table and create delta table.

In [2]:
def loadFullDataFromSource(table_name):
    path = "Files/0e7c2ef8-0e90-4aa3-8f71-551c25cc3016/WideWorldImportersDW/csv/full/" + table_name
    # Read CSV with header and schema inference
    df = (
        spark.read
             .format("csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .load(path)
    )
    
    # Special case for dimension_customer which seems to have a more complex duplication issue
    if table_name == 'dimension_customer':
        # List all columns and manually rename them to ensure uniqueness
        print(f"Column names for {table_name}: {df.columns}")
        
        # Select each column with explicit aliases to avoid duplicates
        columns_with_aliases = [f"`{col}` as `{col}_{i}`" for i, col in enumerate(df.columns)]
        select_expr = ", ".join(columns_with_aliases)
        df = df.selectExpr(*columns_with_aliases)
    else:
        # Standard duplicate column handling for other tables
        cols = df.columns
        new_cols = []
        seen = {}
        for name in cols:
            if name in seen:
                seen[name] += 1
                new_cols.append(f"{name}_{seen[name]}")
            else:
                seen[name] = 0
                new_cols.append(name)
        
        # Rename DataFrame columns if duplicates were found
        if new_cols != cols:
            print(f"Renaming columns for table '{table_name}':")
            print("Original:", cols)
            print("New:     ", new_cols)
            df = df.toDF(*new_cols)
    
    # Write the DataFrame to Delta format
    df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

# List of dimension tables to ingest
dimension_tables = [
    'dimension_city',
    'dimension_customer',
    'dimension_date',
    'dimension_employee',
    'dimension_stock_item'
]

for table in dimension_tables:
    try:
        loadFullDataFromSource(table)
        print(f"Successfully loaded {table}")
    except Exception as e:
        print(f"Error loading {table}: {str(e)}")

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 4, Finished, Available, Finished)

Successfully loaded dimension_city
Column names for dimension_customer: ['CustomerKey', 'WWICustomerID', 'Customer', 'BillToCustomer', 'Category', 'BuyingGroup', 'PrimaryContact', 'PostalCode', 'ValidFrom', 'ValidTo', 'LineageKey']
Error loading dimension_customer: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: f128e820-453c-487a-9d79-2d4335ba0d89).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- CustomerKey: long (nullable = true)
-- WWICustomerID: long (nullable = true)
-- Customer: string (nullable = true)
-- BillToCustomer: string (nullable = true)
-- Category: string (nullable = true)
-- BuyingGroup: string (nullable = true)
-- PrimaryContact: string (nullable = true)


In [3]:
# Register dimension tables
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_city AS SELECT * FROM delta.`Tables/dimension_city`")
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_date AS SELECT * FROM delta.`Tables/dimension_date`")
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_employee AS SELECT * FROM delta.`Tables/dimension_employee`")

# Register fact table
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.fact_sale AS SELECT * FROM delta.`Tables/fact_sale`")

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 5, Finished, Available, Finished)

AnalysisException: [SCHEMA_NOT_FOUND] The schema `wwilakehouse` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.
To tolerate the error on drop use DROP SCHEMA IF EXISTS.

In [4]:
# First create the schema
spark.sql("CREATE SCHEMA IF NOT EXISTS wwilakehouse")

# Then register dimension tables
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_city AS SELECT * FROM delta.`Tables/dimension_city`")
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_date AS SELECT * FROM delta.`Tables/dimension_date`")
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.dimension_employee AS SELECT * FROM delta.`Tables/dimension_employee`") 

# Register fact table
spark.sql("CREATE OR REPLACE VIEW wwilakehouse.fact_sale AS SELECT * FROM delta.`Tables/fact_sale`")

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 6, Finished, Available, Finished)

Py4JJavaError: An error occurred while calling o341.sql.
: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
	at com.microsoft.azure.trident.spark.TridentCoreProxy.failCreateDbIfTrident(TridentCoreProxy.java:275)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:309)
	at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.createNamespace(V2SessionCatalog.scala:327)
	at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.createNamespace(DelegatingCatalogExtension.java:163)
	at org.apache.spark.sql.execution.datasources.v2.CreateNamespaceExec.run(CreateNamespaceExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:160)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:148)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:126)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:231)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at com.microsoft.azure.trident.spark.TridentCoreProxy.failCreateDbIfTrident(TridentCoreProxy.java:272)
	... 46 more
Caused by: java.lang.RuntimeException: Feature not supported on Apache Spark in Microsoft Fabric. Provided context: { spark.trident.pbiHost=api.fabric.microsoft.com, fs.defaultFS=abfss://125317d8-020f-4416-b744-c2611ae9d283@onelake.dfs.fabric.microsoft.com/, trident.capacity.id=2005b2a4-a182-4d80-9c05-36d0b7f0a3c9, spark.fabric.environmentDetails={}, trident.operation.type=SessionCreation, trident.workspace.id=125317d8-020f-4416-b744-c2611ae9d283, trident.tokenservice.zkcache.enabled=*****, trident.catalog.metastore.workspaceId=125317d8-020f-4416-b744-c2611ae9d283, spark.fabric.pools.category=Starter, spark.fabric.pools.poolHitEventTime=2025-04-06T13:34:24.6954002Z, trident.artifact.workspace.id=125317d8-020f-4416-b744-c2611ae9d283, trident.activity.id=e83de5de-6890-4da9-9b6e-eb734c0fdd75, spark.trident.lineage.enabled=false, trident.artifact.type=SynapseNotebook, trident.lineage.enabled=False, trident.materializedview.libraries.enabled=false, trident.lakehouse.tokenservice.endpoint=htt***ken, spark.trident.disable_autolog=false, fs.homeDir=/b8977ecf-a27a-4dfe-9042-39d9955a0d83, spark.fabric.pool.name=Starter Pool, spark.synapse.nbs.session.timeout=1200000, trident.tenant.id=f74b1450-e46a-41df-abee-ebf3621bfd85, trident.esri.libraries.enabled=false, trident.catalog.metastore.lakehouseName=wwi_lakehouse, spark.fabric.resourceProfile=writeHeavy, spark.trident.autotune.fetchSAS.url=https://pbipncus19-northcentralus.pbidedicated.windows.net/webapi/capacities/2005b2a4-a182-4d80-9c05-36d0b7f0a3c9/workloads/SparkCore/SparkCoreService/automatic/v1/autotune/fetchAutotuneStorageSasUrl, trident.moniker.id=e83de5de-6890-4da9-9b6e-eb734c0fdd75, spark.trident.highconcurrency.enabled=false, spark.trident.session.submittedAt=1743946464768, trident.artifact.id=f5bb1cf2-8900-4ca2-9f01-0aba84e048a0, spark.cluster.type=trident, spark.synapse.context.notebookname=01-Create-Delta-Tables, spark.fabric.pools.vhdOverride=false, spark.fabric.pools.poolHit=true, trident.lakehouse.name=wwi_lakehouse, trident.lakehouse.id=b8977ecf-a27a-4dfe-9042-39d9955a0d83, spark.synapse.nbs.kernelid=63d4f92c-7090-402b-9b02-13d94b122e0f, trident.session.token=eyJ***DMQ }
	at com.microsoft.azure.trident.core.TridentHelper.failIfValidTridentContext(TridentHelper.java:249)
	... 51 more


In [5]:
# Use direct table references instead
df_fact_sale = spark.read.table("delta.`Tables/fact_sale`") 
df_dimension_date = spark.read.table("delta.`Tables/dimension_date`")
df_dimension_city = spark.read.table("delta.`Tables/dimension_city`")
df_dimension_employee = spark.read.table("delta.`Tables/dimension_employee`")

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 7, Finished, Available, Finished)

In [6]:
# Create temporary views
df_fact_sale.createOrReplaceTempView("fact_sale")
df_dimension_date.createOrReplaceTempView("dimension_date")
df_dimension_city.createOrReplaceTempView("dimension_city")
df_dimension_employee.createOrReplaceTempView("dimension_employee")

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 8, Finished, Available, Finished)

In [7]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
AS
SELECT
    DD.Date, DD.CalendarMonthLabel
    , DD.Day, DD.ShortMonth Month, CalendarYear Year
    ,DE.PreferredName, DE.Employee
    ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
    ,SUM(FS.TaxAmount) SumOfTaxAmount
    ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
    ,SUM(Profit) SumOfProfit 
FROM fact_sale FS
INNER JOIN dimension_date DD ON FS.InvoiceDateKey = DD.Date
INNER JOIN dimension_employee DE ON FS.SalespersonKey = DE.EmployeeKey
GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC

StatementMeta(, e83de5de-6890-4da9-9b6e-eb734c0fdd75, 9, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>