### Import libraries

In [2]:
from pyspark.sql.functions import lit, to_date, col
from pyspark import SparkFiles
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, DecimalType
import requests
import base64
import os

StatementMeta(, bbb2dded-1266-4104-933b-c5e028b3a03f, 4, Finished, Available, Finished)

### Initialize parameters

In [None]:
GITHUB_PATH = (
    "https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales"
)
TENANT_ID="" # only if updating the semantic model from Metadata
CLIENT_ID = "" # only if updating the semantic model from Metadata
CLIENT_SECRET = "" # only if updating the semantic model from Metadata
WORKSPACE_NAME = "demo-bike-sales"

StatementMeta(, bbb2dded-1266-4104-933b-c5e028b3a03f, 5, Finished, Available, Finished)

### Load Addresses

In [None]:
file_name="Addresses.csv"

schema = StructType([
    StructField("address_id", IntegerType(), False, metadata={"description": "Unique identifier for the address", "comment": "Address identifier", "display_name": "Address ID", "primary_key": True}),
    StructField("city", StringType(), True, metadata={"description": "City of the address", "comment": "City name", "display_name": "City"}),
    StructField("postal_code", StringType(), True, metadata={"description": "Postal code of the address", "comment": "Postal code", "display_name": "Postal Code"}),
    StructField("street", StringType(), True, metadata={"description": "Street name of the address", "comment": "Street name", "display_name": "Street"}),
    StructField("building", StringType(), True, metadata={"description": "Building number of the address", "comment": "Building number", "display_name": "Building"}),
    StructField("country", StringType(), True, metadata={"description": "Country of the address", "comment": "Country code", "display_name": "Country"}),
    StructField("region", StringType(), True, metadata={"description": "Administrative region, state, or province", "comment": "Region code", "display_name": "Region"}),
    StructField("address_type", StringType(), True, metadata={"description": "Classification of the address such as billing, shipping, or home", "comment": "Address type", "display_name": "Address Type"}),
    StructField("validity_start_date", StringType(), True, metadata={"description": "Date from which the address is considered valid", "comment": "Validity start date", "display_name": "Valid From"}),
    StructField("validity_end_date", StringType(), True, metadata={"description": "Date until which the address is considered valid", "comment": "Validity end date", "display_name": "Valid To"}),
    StructField("latitude", DoubleType(), True, metadata={"description": "Geographical latitude coordinate of the address", "comment": "Latitude", "display_name": "Latitude"}),
    StructField("longitude", DoubleType(), True, metadata={"description": "Geographical longitude coordinate of the address", "comment": "Longitude", "display_name": "Longitude"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

addresses_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

addresses_df = addresses_df.withColumn("validity_start_date", to_date(col("validity_start_date"), "yyyyMMdd"))
addresses_df = addresses_df.withColumn("validity_end_date", to_date(col("validity_end_date"), "yyyyMMdd"))


StatementMeta(, bbb2dded-1266-4104-933b-c5e028b3a03f, 7, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/Addresses.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770568009607_0001/spark-4e4ee70a-3207-4b39-b7af-aa00a2e2be54/userFiles-bc845dab-4a20-4f93-8a29-d411e91d3f5f/Addresses.csv


### Load Employees and Employee Addresses

In [8]:
file_name="Employees.csv"

schema = StructType([
    StructField("employee_id", IntegerType(), False, metadata={"description": "Unique identifier of the employee", "comment": "Employee identifier", "display_name": "Employee ID", "primary_key": True}),
    StructField("first_name", StringType(), True, metadata={"description": "Employee's first name", "comment": "First name", "display_name": "First Name"}),
    StructField("middle_name", StringType(), True, metadata={"description": "Employee's middle name", "comment": "Middle name", "display_name": "Middle Name"}),
    StructField("last_name", StringType(), True, metadata={"description": "Employee's last name", "comment": "Last name", "display_name": "Last Name"}),
    StructField("initials", StringType(), True, metadata={"description": "Employee initials", "comment": "Initials", "display_name": "Initials"}),
    StructField("country", StringType(), True, metadata={"description": "Country associated with the employee", "comment": "Country", "display_name": "Country"}),
    StructField("gender", StringType(), True, metadata={"description": "Gender of the employee", "comment": "Gender", "display_name": "Gender"}),
    StructField("language", StringType(), True, metadata={"description": "Preferred language of the employee", "comment": "Language", "display_name": "Language"}),
    StructField("phone_no", StringType(), True, metadata={"description": "Employee phone number", "comment": "Phone number", "display_name": "Phone Number"}),
    StructField("email", StringType(), True, metadata={"description": "Employee email address", "comment": "Email address", "display_name": "Email"}),
    StructField("login_name", StringType(), True, metadata={"description": "Login name used by the employee", "comment": "Login name", "display_name": "Login Name"}),
    StructField("address_id", IntegerType(), True, metadata={"description": "Identifier of the employee's address", "comment": "Address identifier", "display_name": "Address ID"}),
    StructField("validity_start_date", StringType(), True, metadata={"description": "Date from which the employee record is valid", "comment": "Validity start date", "display_name": "Valid From"}),
    StructField("validity_end_date", StringType(), True, metadata={"description": "Date until which the employee record is valid", "comment": "Validity end date", "display_name": "Valid To"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

employees_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

employees_df = employees_df.withColumn("validity_start_date", to_date(col("validity_start_date"), "yyyyMMdd"))
employees_df = employees_df.withColumn("validity_end_date", to_date(col("validity_end_date"), "yyyyMMdd"))

employees_df.write.format("delta").mode("overwrite").saveAsTable("employees")

df_employee_addresses = addresses_df.join(
    employees_df,
    on="address_id",
    how="left_semi"
)

df_employee_addresses.write.format("delta").mode("overwrite").saveAsTable("employee_addresses")


StatementMeta(, 7e17825f-b6fa-45fb-b49f-f2aecc437fb6, 10, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/Employees.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770479201446_0001/spark-d7ad3cdc-6a3b-4e2a-a913-5ebe7243d33f/userFiles-4d17422f-1ff8-428e-bff3-73b493f4847d/Employees.csv


### Load Customers and Customer Addresses

In [None]:
file_name="Customers.csv"

schema = StructType([
    StructField("customer_id", IntegerType(), False, metadata={"description": "Unique identifier of the customer", "comment": "Customer identifier", "display_name": "Customer ID", "primary_key": True}),
    StructField("email", StringType(), True, metadata={"description": "Customer email address", "comment": "Email address", "display_name": "Email"}),
    StructField("phone_no", StringType(), True, metadata={"description": "Customer phone number", "comment": "Phone number", "display_name": "Phone Number"}),
    StructField("fax_no", StringType(), True, metadata={"description": "Customer fax number", "comment": "Fax number", "display_name": "Fax Number"}),
    StructField("url", StringType(), True, metadata={"description": "Customer website or URL", "comment": "Website URL", "display_name": "Website"}),
    StructField("address_id", IntegerType(), True, metadata={"description": "Identifier of the customer's address", "comment": "Address identifier", "display_name": "Address ID"}),
    StructField("company_name", StringType(), True, metadata={"description": "Legal or registered name of the company", "comment": "Company name", "display_name": "Customer"}),
    StructField("legal_form", StringType(), True, metadata={"description": "Legal form of the company, such as Ltd or Inc", "comment": "Legal form", "display_name": "Legal Form"}),
    StructField("created_by", IntegerType(), True, metadata={"description": "Identifier of the user who created the customer record", "comment": "Created by", "display_name": "Created By"}),
    StructField("created_date", StringType(), True, metadata={"description": "Date when the customer record was created", "comment": "Created date", "display_name": "Created Date"}),
    StructField("modified_by", IntegerType(), True, metadata={"description": "Identifier of the user who last modified the customer record", "comment": "Modified by", "display_name": "Modified By"}),
    StructField("modified_date", StringType(), True, metadata={"description": "Date when the customer record was last modified", "comment": "Modified date", "display_name": "Modified Date"}),
    StructField("currency", StringType(), True, metadata={"description": "Default currency associated with the customer", "comment": "Currency", "display_name": "Currency" })
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

customers_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

customers_df = customers_df.withColumn("created_date", to_date(col("created_date"), "yyyyMMdd"))
customers_df = customers_df.withColumn("modified_date", to_date(col("modified_date"), "yyyyMMdd"))

customers_df.write.format("delta").mode("overwrite").saveAsTable("customers")

df_customer_addresses = addresses_df.join(
    customers_df,
    on="address_id",
    how="left_semi"
)

df_customer_addresses.write.format("delta").mode("overwrite").saveAsTable("customer_addresses")


StatementMeta(, bbb2dded-1266-4104-933b-c5e028b3a03f, 8, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/Customers.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770568009607_0001/spark-4e4ee70a-3207-4b39-b7af-aa00a2e2be54/userFiles-bc845dab-4a20-4f93-8a29-d411e91d3f5f/Customers.csv


Py4JJavaError: An error occurred while calling o12143.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage 45.0 (TID 196) (vm-a4082385 executor 2): org.apache.spark.SparkFileNotFoundException: File file:/mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770568009607_0001/spark-4e4ee70a-3207-4b39-b7af-aa00a2e2be54/userFiles-bc845dab-4a20-4f93-8a29-d411e91d3f5f/Customers.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:783)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:285)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:348)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:188)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:413)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:900)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:900)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3111)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3047)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3046)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3046)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1303)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3318)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3249)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3238)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1037)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2580)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2601)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2645)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1056)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:411)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1055)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:480)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:149)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:354)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:349)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770568009607_0001/spark-4e4ee70a-3207-4b39-b7af-aa00a2e2be54/userFiles-bc845dab-4a20-4f93-8a29-d411e91d3f5f/Customers.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:783)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:285)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:348)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:188)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:413)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:900)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:900)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	... 3 more


### Load Vendors and Vendor Addresses

In [None]:
file_name="Vendors.csv"

schema = StructType([
    StructField("vendor_id", IntegerType(), False, metadata={"description": "Unique identifier of the vendor", "comment": "Vendor identifier", "display_name": "Vendor ID", "primary_key": True}),
    StructField("email", StringType(), True, metadata={"description": "Vendor email address", "comment": "Email address", "display_name": "Email"}),
    StructField("phone_no", StringType(), True, metadata={"description": "Vendor phone number", "comment": "Phone number", "display_name": "Phone Number"}),
    StructField("fax_no", StringType(), True, metadata={"description": "Vendor fax number", "comment": "Fax number", "display_name": "Fax Number"}),
    StructField("url", StringType(), True, metadata={"description": "Vendor website or URL", "comment": "Website URL", "display_name": "Website"}),
    StructField("address_id", IntegerType(), True, metadata={"description": "Identifier for the vendor's address", "comment": "Address identifier", "display_name": "Address ID"}),
    StructField("company_name", StringType(), True, metadata={"description": "Legal or registered name of the vendor company", "comment": "Company name", "display_name": "Vendor"}),
    StructField("legal_form", StringType(), True, metadata={"description": "Legal form of the vendor company, such as Ltd or Inc", "comment": "Legal form", "display_name": "Legal Form"}),
    StructField("created_by", IntegerType(), True, metadata={"description": "Identifier of the user who created the vendor record", "comment": "Created by", "display_name": "Created By"}),
    StructField("created_date", StringType(), True, metadata={"description": "Date when the vendor record was created", "comment": "Created date", "display_name": "Created Date"}),
    StructField("modified_by", IntegerType(), True, metadata={"description": "Identifier of the user who last modified the vendor record", "comment": "Modified by", "display_name": "Modified By"}),
    StructField("modified_date", StringType(), True, metadata={"description": "Date when the vendor record was last modified", "comment": "Modified date", "display_name": "Modified Date"}),
    StructField("currency", StringType(), True, metadata={"description": "Default currency associated with the vendor", "comment": "Currency", "display_name": "Currency"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

vendors_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

vendors_df = vendors_df.withColumn("created_date", to_date(col("created_date"), "yyyyMMdd"))
vendors_df = vendors_df.withColumn("modified_date", to_date(col("modified_date"), "yyyyMMdd"))

vendors_df.write.format("delta").mode("overwrite").saveAsTable("vendors")

df_vendor_addresses = addresses_df.join(
    vendors_df,
    on="address_id",
    how="left_semi"
)

df_vendor_addresses.write.format("delta").mode("overwrite").saveAsTable("vendor_addresses")


StatementMeta(, cbc235ee-e949-4a4f-bc4e-8f3159d74f2d, 10, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/Vendors.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/Vendors.csv


Py4JJavaError: An error occurred while calling o14943.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 4 times, most recent failure: Lost task 0.3 in stage 60.0 (TID 221) (vm-22754175 executor 3): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://1a7264cf-5a08-47ab-aa00-41a6f005f2d3@onelake.dfs.fabric.microsoft.com/6f4189fd-1e9b-4b03-b188-66b54ecf0c9b/Tables/vendors.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:777)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:621)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$4(DeltaFileFormatWriter.scala:387)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/Vendors.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:783)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:285)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:348)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:188)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:121)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$3(DeltaFileFormatWriter.scala:603)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:611)
	... 12 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3111)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3047)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3046)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3046)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1303)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3318)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3249)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3238)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1037)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2580)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:384)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:418)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:315)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:283)
	at org.apache.spark.sql.delta.files.TransactionalWrite.callDeltaFileFormatWriter$1(TransactionalWrite.scala:582)
	at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$tryWriteFiles$5(TransactionalWrite.scala:593)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$tryWriteFiles$3(TransactionalWrite.scala:593)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$2(SQLExecution.scala:274)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:331)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:270)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:955)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:261)
	at org.apache.spark.sql.delta.files.TransactionalWrite.tryWriteFiles(TransactionalWrite.scala:528)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:412)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:386)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:151)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:383)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:373)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:151)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:258)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:254)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:151)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:243)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:240)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:151)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeFiles(WriteIntoDelta.scala:362)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeAndReturnCommitData(WriteIntoDelta.scala:310)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.doDeltaWrite$1(CreateDeltaTableCommand.scala:258)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.handleCreateTableAsSelect(CreateDeltaTableCommand.scala:285)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.handleCommit(CreateDeltaTableCommand.scala:158)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.$anonfun$run$4(CreateDeltaTableCommand.scala:118)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:167)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordFrameProfile(CreateDeltaTableCommand.scala:59)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:137)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:111)
	at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:93)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:59)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:136)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:126)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:116)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:59)
	at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:118)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$1(DeltaCatalog.scala:214)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:167)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:66)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:102)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.$anonfun$commitStagedChanges$1(DeltaCatalog.scala:601)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:169)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:167)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:66)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.commitStagedChanges(DeltaCatalog.scala:587)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:595)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:588)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:582)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:196)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:231)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$2(SQLExecution.scala:274)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:331)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:270)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:955)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:261)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:223)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:36)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:278)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:274)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:36)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:36)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:223)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:207)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:283)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:915)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:675)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:604)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://1a7264cf-5a08-47ab-aa00-41a6f005f2d3@onelake.dfs.fabric.microsoft.com/6f4189fd-1e9b-4b03-b188-66b54ecf0c9b/Tables/vendors.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:777)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:621)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$4(DeltaFileFormatWriter.scala:387)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:636)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:639)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/Vendors.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:783)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:285)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:348)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:188)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:121)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$3(DeltaFileFormatWriter.scala:603)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:611)
	... 12 more


### Load Product Categories

In [6]:
file_name="ProductCategories.csv"

schema = StructType([
    StructField("product_category_id", StringType(), False, metadata={"description": "Unique identifier of the product category", "comment": "Product category ID", "display_name": "Product Category ID", "primary_key": True}),
    StructField("created_by", IntegerType(), True, metadata={"description": "Identifier of the user who created the product category", "comment": "Created by", "display_name": "Created By"}),
    StructField("created_date", StringType(), True, metadata={"description": "Date when the product category record was created", "comment": "Created date", "display_name": "Created Date"})
])
# schema = StructType([
#     StructField("product_category_id", StringType(), False),
#     StructField("created_by", IntegerType(), True),
#     StructField("created_date", StringType(), True)
# ])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

product_categories_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

product_categories_df = product_categories_df.withColumn("created_date", to_date(col("created_date"), "yyyyMMdd"))

product_categories_df.write.format("delta").mode("overwrite").saveAsTable("product_categories")

StatementMeta(, cbc235ee-e949-4a4f-bc4e-8f3159d74f2d, 8, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/ProductCategories.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/ProductCategories.csv


### Load Product Category Texts

In [None]:
file_name="ProductCategoryTexts.csv"

schema = StructType([
    StructField("product_category_id", StringType(), False, metadata={"description": "Unique identifier of the product category", "comment": "Product category ID", "display_name": "Product Category ID"}),
    StructField("language", StringType(), True, metadata={"description": "Language code for the product category description", "comment": "Language", "display_name": "Language"}),
    StructField("short_description", StringType(), True, metadata={"description": "Category", "comment": "Short description", "display_name": "Short Description"}),
    StructField("medium_description", StringType(), True, metadata={"description": "Medium-length description of the product category", "comment": "Medium description", "display_name": "Medium Description"}),
    StructField("long_description", StringType(), True, metadata={"description": "Long, detailed description of the product category", "comment": "Long description", "display_name": "Long Description"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

product_category_texts_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

product_category_texts_df.write.format("delta").mode("overwrite").saveAsTable("product_category_texts")

StatementMeta(, bbb2dded-1266-4104-933b-c5e028b3a03f, 6, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/ProductCategoryTexts.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770568009607_0001/spark-4e4ee70a-3207-4b39-b7af-aa00a2e2be54/userFiles-bc845dab-4a20-4f93-8a29-d411e91d3f5f/ProductCategoryTexts.csv


### Load Products

In [4]:
file_name="Products.csv"

schema = StructType([
    StructField("product_id", StringType(), False, metadata={"description": "Unique identifier of the product", "comment": "Product ID", "display_name": "Product ID", "primary_key": True}),
    StructField("type_code", StringType(), True, metadata={"description": "Type code of the product", "comment": "Type code", "display_name": "Type Code"}),
    StructField("product_category_id", StringType(), True, metadata={"description": "Identifier of the product category", "comment": "Product category ID", "display_name": "Product Category ID"}),
    StructField("created_by", IntegerType(), True, metadata={"description": "Identifier of the user who created the product record", "comment": "Created by", "display_name": "Created By"}),
    StructField("created_date", StringType(), True, metadata={"description": "Date when the product record was created", "comment": "Created date", "display_name": "Created Date"}),
    StructField("modified_by", IntegerType(), True, metadata={"description": "Identifier of the user who last modified the product record", "comment": "Modified by", "display_name": "Modified By"}),
    StructField("modified_date", StringType(), True, metadata={"description": "Date when the product record was last modified", "comment": "Modified date", "display_name": "Modified Date"}),
    StructField("vendor_id", IntegerType(), True, metadata={"description": "Identifier of the vendor supplying the product", "comment": "Vendor ID", "display_name": "Vendor ID"}),
    StructField("tax_tariff_code", StringType(), True, metadata={"description": "Tax tariff or classification code for the product", "comment": "Tax tariff code", "display_name": "Tax Tariff Code"}),
    StructField("quantity_unit", StringType(), True, metadata={"description": "Unit of measure for the product quantity", "comment": "Quantity unit", "display_name": "Quantity Unit"}),
    StructField("weight_measure", DoubleType(), True, metadata={"description": "Weight of the product", "comment": "Weight measure", "display_name": "Weight"}),
    StructField("weight_unit", StringType(), True, metadata={"description": "Unit of weight", "comment": "Weight unit", "display_name": "Weight Unit"}),
    StructField("currency", StringType(), True, metadata={"description": "Currency for the product pricing", "comment": "Currency", "display_name": "Currency"}),
    StructField("price", DecimalType(), True, metadata={"description": "Price of the product", "comment": "Price", "display_name": "Price"}),
    StructField("width", DoubleType(), True, metadata={"description": "Width of the product", "comment": "Width", "display_name": "Width"}),
    StructField("depth", DoubleType(), True, metadata={"description": "Depth of the product", "comment": "Depth", "display_name": "Depth"}),
    StructField("height", DoubleType(), True, metadata={"description": "Height of the product", "comment": "Height", "display_name": "Height"}),
    StructField("dimension_unit", StringType(), True, metadata={"description": "Unit for product dimensions", "comment": "Dimension unit", "display_name": "Dimension Unit"}),
    StructField("product_pic_url", StringType(), True, metadata={"description": "URL to the product image", "comment": "Product picture URL", "display_name": "Product Picture URL"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

products_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

products_df = products_df.withColumn("created_date", to_date(col("created_date"), "yyyyMMdd"))
products_df = products_df.withColumn("modified_date", to_date(col("modified_date"), "yyyyMMdd"))

products_df.write.format("delta").mode("overwrite").saveAsTable("products")


StatementMeta(, 0958bb78-4d2e-4bb0-8a5e-e96cb671914c, 6, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/Products.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770543756420_0001/spark-ba0537c7-9332-4ab2-a377-5e54be62443f/userFiles-d84a831e-0708-41c7-b574-f77b3fc6db24/Products.csv


### Load Product Texts

In [None]:
file_name="ProductTexts.csv"

schema = StructType([
    StructField("product_id", StringType(), False, metadata={"description": "Unique identifier of the product", "comment": "Product ID", "display_name": "Product ID"}),
    StructField("language", StringType(), False, metadata={"description": "Language code for the product description", "comment": "Language", "display_name": "Language"}),
    StructField("short_description", StringType(), True, metadata={"description": "Product", "comment": "Short description", "display_name": "Short Description"}),
    StructField("medium_description", StringType(), True, metadata={"description": "Medium-length description of the product", "comment": "Medium description", "display_name": "Medium Description"}),
    StructField("long_description", StringType(), True, metadata={"description": "Long, detailed description of the product", "comment": "Long description", "display_name": "Long Description"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

product_texts_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

product_texts_df.write.format("delta").mode("overwrite").saveAsTable("product_texts")

StatementMeta(, cbc235ee-e949-4a4f-bc4e-8f3159d74f2d, 7, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/ProductTexts.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/ProductTexts.csv


### Load Sales Orders

In [None]:
file_name="SalesOrders.csv"

schema = StructType([
    StructField("sales_order_id", IntegerType(), False, metadata={"description": "Unique identifier of the sales order", "comment": "Sales order ID", "display_name": "Sales Order ID", "primary_key": True}),
    StructField("created_by", IntegerType(), True, metadata={"description": "Identifier of the user who created the sales order", "comment": "Created by", "display_name": "Created By"}),
    StructField("created_date", StringType(), True, metadata={"description": "Date when the sales order was created", "comment": "Created date", "display_name": "Created Date"}),
    StructField("modified_by", IntegerType(), True, metadata={"description": "Identifier of the user who last modified the sales order", "comment": "Modified by", "display_name": "Modified By"}),
    StructField("modified_date", StringType(), True, metadata={"description": "Date when the sales order was last modified", "comment": "Modified date", "display_name": "Modified Date"}),
    StructField("fisc_variant", StringType(), True, metadata={"description": "Fiscal variant associated with the sales order", "comment": "Fiscal variant", "display_name": "Fiscal Variant"}),
    StructField("fiscal_year_period", StringType(), True, metadata={"description": "Fiscal year and period for the sales order", "comment": "Fiscal year period", "display_name": "Fiscal Year/Period"}),
    StructField("note_id", IntegerType(), True, metadata={"description": "Identifier of associated note", "comment": "Note ID", "display_name": "Note ID"}),
    StructField("customer_id", IntegerType(), False, metadata={"description": "Identifier of the customer for this sales order", "comment": "Customer ID", "display_name": "Customer ID"}),
    StructField("sales_org", StringType(), True, metadata={"description": "Sales organization responsible for the order", "comment": "Sales organization", "display_name": "Sales Organization"}),
    StructField("currency", StringType(), True, metadata={"description": "Currency of the sales order amounts", "comment": "Currency", "display_name": "Currency"}),
    StructField("gross_amount", DecimalType(), True, metadata={"description": "Gross amount of the sales order", "comment": "Gross amount", "display_name": "Gross Amount"}),
    StructField("net_amount", DecimalType(), True, metadata={"description": "Net amount of the sales order", "comment": "Net amount", "display_name": "Net Amount"}),
    StructField("tax_amount", DecimalType(), True, metadata={"description": "Tax amount of the sales order", "comment": "Tax amount", "display_name": "Tax Amount"}),
    StructField("lifecycle_status", StringType(), True, metadata={"description": "Lifecycle status of the sales order (In Process, Completed, Cancelled, Unknown)", "comment": "Lifecycle status", "display_name": "Lifecycle Status"}),
    StructField("billing_status", StringType(), True, metadata={"description": "Billing status of the sales order (Not Billed, Partially Billed, Completely Billed, Cancelled, Unknown)", "comment": "Billing status", "display_name": "Billing Status"}),
    StructField("delivery_status", StringType(), True, metadata={"description": "Delivery status of the sales order (Not Delivered, Partially Delivered, Completely Delivered, Cancelled, Unknown)", "comment": "Delivery status", "display_name": "Delivery Status"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

sales_orders_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

sales_orders_df = sales_orders_df.withColumn("created_date", to_date(col("created_date"), "yyyyMMdd"))
sales_orders_df = sales_orders_df.withColumn("modified_date", to_date(col("modified_date"), "yyyyMMdd"))

sales_orders_df = sales_orders_df.withColumn(
    "lifecycle_status",
    F.when(F.col("lifecycle_status") == "I", "In Process")
     .when(F.col("lifecycle_status") == "C", "Completed")
     .when(F.col("lifecycle_status") == "X", "Cancelled")
     .otherwise("Unknown")
)

sales_orders_df = sales_orders_df.withColumn(
    "billing_status",
    F.when(F.col("billing_status") == "N", "Not Billed")
     .when(F.col("billing_status") == "P", "Partially Billed")
     .when(F.col("billing_status") == "C", "Completely Billed")
     .when(F.col("billing_status") == "X", "Cancelled")
     .otherwise("Unknown")
)

sales_orders_df = sales_orders_df.withColumn(
    "delivery_status",
    F.when(F.col("delivery_status") == "N", "Not Delivered")
     .when(F.col("delivery_status") == "P", "Partially Delivered")
     .when(F.col("delivery_status") == "C", "Completely Delivered")
     .when(F.col("delivery_status") == "X", "Cancelled")
     .otherwise("Unknown")
)

sales_orders_df.write.format("delta").mode("overwrite").saveAsTable("sales_orders")


StatementMeta(, 0958bb78-4d2e-4bb0-8a5e-e96cb671914c, 7, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/SalesOrders.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-secondary-local-dir/usercache/trusted-service-user/appcache/application_1770543756420_0001/spark-ba0537c7-9332-4ab2-a377-5e54be62443f/userFiles-d84a831e-0708-41c7-b574-f77b3fc6db24/SalesOrders.csv


### Load Sales Order Items

In [3]:
file_name="SalesOrderItems.csv"

schema = StructType([
    StructField("sales_order_id", IntegerType(), False, metadata={"description": "Identifier of the sales order", "comment": "Sales order ID", "display_name": "Sales Order ID"}),
    StructField("sales_order_item_id", IntegerType(), False, metadata={"description": "Identifier of the sales order item", "comment": "Sales order item ID", "display_name": "Sales Order Item ID"}),
    StructField("product_id", StringType(), False, metadata={"description": "Identifier of the product for this order item", "comment": "Product ID", "display_name": "Product ID"}),
    StructField("note_id", IntegerType(), True, metadata={"description": "Identifier of associated note for this item", "comment": "Note ID", "display_name": "Note ID"}),
    StructField("currency", StringType(), True, metadata={"description": "Currency of the amounts for this order item", "comment": "Currency", "display_name": "Currency"}),
    StructField("gross_amount", DecimalType(), True, metadata={"description": "Gross amount for this order item", "comment": "Gross amount", "display_name": "Gross Amount"}),
    StructField("net_amount", DecimalType(), True, metadata={"description": "Net amount for this order item", "comment": "Net amount", "display_name": "Net Amount"}),
    StructField("tax_amount", DecimalType(), True, metadata={"description": "Tax amount for this order item", "comment": "Tax amount", "display_name": "Tax Amount"}),
    StructField("item_atp_status", StringType(), True, metadata={"description": "Available-to-promise status of the order item (Confirmed, Partially Confirmed, Not Confirmed, Cancelled / Not Relevant, Unknown)", "comment": "Item ATP status", "display_name": "Item ATP Status"}),
    StructField("op_item_pos", StringType(), True, metadata={"description": "Operational item position identifier", "comment": "Operational item position", "display_name": "OP Item Pos"}),
    StructField("quantity", DecimalType(), True, metadata={"description": "Quantity ordered for this item", "comment": "Quantity", "display_name": "Quantity"}),
    StructField("quantity_unit", DecimalType(), True, metadata={"description": "Unit of measure for the ordered quantity", "comment": "Quantity unit", "display_name": "Quantity Unit"}),
    StructField("delivery_date", DateType(), True, metadata={"description": "Planned delivery date for the order item", "comment": "Delivery date", "display_name": "Delivery Date"})
])

gh_full_file_name = f"{GITHUB_PATH}/{file_name}"
display(f"Downloading {gh_full_file_name}")

sc.addFile(gh_full_file_name)
gh_file_name  = 'file://' +SparkFiles.get(file_name)

sales_order_items_df = spark.read.csv(path=gh_file_name,header=True,schema=schema)
print(f"Downloaded data {gh_file_name}")

sales_order_items_df = sales_order_items_df.withColumn("delivery_date", to_date(col("delivery_date"), "yyyyMMdd"))

sales_order_items_df = sales_order_items_df.withColumn(
    "item_atp_status",
    F.when(F.col("item_atp_status") == "C", "Confirmed")
     .when(F.col("item_atp_status") == "P", "Partially Confirmed")
     .when(F.col("item_atp_status") == "N", "Not Confirmed")
     .when(F.col("item_atp_status") == "X", "Cancelled / Not Relevant")
     .otherwise("Unknown")
)
sales_order_items_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("sales_order_items")


StatementMeta(, cbc235ee-e949-4a4f-bc4e-8f3159d74f2d, 5, Finished, Available, Finished)

'Downloading https://raw.githubusercontent.com/martinabrle/demo-data/main/Sample_Bike_Sales/SalesOrderItems.csv'

Downloaded data file:///mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1770492411878_0001/spark-400262e1-c607-4b54-bca3-63b28d3d28fd/userFiles-fcb2385f-723c-40b8-9274-532143b2746f/SalesOrderItems.csv


### Display table info

In [None]:
def show_table_metadata(table_name):
    df=spark.table(table_name)
    """
    Prints schema details and metadata for a Spark DataFrame.

    Args:
        df: Spark DataFrame
    """
    print(" ")
    print(" ")
    print(f"Table: {table_name}:")
    for field in df.schema.fields:
        print(f"Column: {field.name}")
        print(f"  Type: {field.dataType}")
        print(f"  Nullable: {field.nullable}")
        # Print metadata nicely if it exists
        if field.metadata:
            for key, value in field.metadata.items():
                print(f"  {key}: {value}")
        else:
            print("  Metadata: None")
        print("-" * 50)

StatementMeta(, 7e17825f-b6fa-45fb-b49f-f2aecc437fb6, -1, Cancelled, , Cancelled)

In [None]:
#Check Metadata has been successfully written
show_table_metadata("sales_order_items")

### Check data consistency - Vendors

In [7]:
vendors_df = spark.table("vendors")
vendor_addresses_df = spark.table("vendor_addresses")

vendors_with_missing_addresses_df = (
    vendors_df
        .join(
            vendor_addresses_df,
            vendors_df.address_id == vendor_addresses_df.address_id,
            how="left_anti"
        )
)
display(vendors_with_missing_addresses_df)

addresses_with_missing_vendors_df = (
    vendor_addresses_df
        .join(
            vendors_df,
            vendor_addresses_df.address_id == vendors_df.address_id,
            how="left_anti"
        )
)

display(addresses_with_missing_vendors_df)

StatementMeta(, 0958bb78-4d2e-4bb0-8a5e-e96cb671914c, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8393ed1e-0689-40a2-b0e9-c0af8bab768e)

SynapseWidget(Synapse.DataFrame, 2aca9896-f7f5-4827-98df-ebfd6e0e1156)

### Check data consistency - Customers

In [8]:
customers_df = spark.table("vendors")
customer_addresses_df = spark.table("vendor_addresses")

customers_with_missing_addresses_df = (
    customers_df
        .join(
            customer_addresses_df,
            customers_df.address_id == customer_addresses_df.address_id,
            how="left_anti"
        )
)
display(customers_with_missing_addresses_df)

addresses_with_missing_customers_df = (
    customer_addresses_df
        .join(
            customers_df,
            customer_addresses_df.address_id == customers_df.address_id,
            how="left_anti"
        )
)

display(addresses_with_missing_customers_df)

StatementMeta(, 0958bb78-4d2e-4bb0-8a5e-e96cb671914c, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cfc1b496-1dca-48ee-be3f-f15d7d4d3ae9)

SynapseWidget(Synapse.DataFrame, 29da78fc-7378-411a-b6dc-61a981a69e0c)

### Check data consistency - Employees

In [9]:
employees_df = spark.table("vendors")
employee_addresses_df = spark.table("vendor_addresses")

employees_with_missing_addresses_df = (
    employees_df
        .join(
            employee_addresses_df,
            employees_df.address_id == employee_addresses_df.address_id,
            how="left_anti"
        )
)
display(employees_with_missing_addresses_df)

addresses_with_missing_employees_df = (
    employee_addresses_df
        .join(
            employees_df,
            employee_addresses_df.address_id == employees_df.address_id,
            how="left_anti"
        )
)

display(addresses_with_missing_employees_df)

StatementMeta(, 0958bb78-4d2e-4bb0-8a5e-e96cb671914c, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7e28ccc5-78fe-49ba-b607-97e0bbd62ae3)

SynapseWidget(Synapse.DataFrame, 3ec29633-6862-4e4a-9617-a0e60f4b784c)

**Retrieve access token**

In [None]:
import requests

scope = "https://analysis.windows.net/powerbi/api/.default"

url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"

data = {
    "grant_type": "client_credentials",
    "client_id": CLIENT_ID,
    "client_secret": CLIENT_SECRET,
    "scope": scope
}

response = requests.post(url, data=data)
access_token = response.json().get("access_token")
print("Bearer token:", access_token)

In [None]:
import requests

# --- Configuration ---
workspace_name = "demo-bike-sales"
xmla_url = f"https://{workspace_name}.asazure.windows.net/xmla?role=Admin"

# Replace this with a valid bearer token obtained in the notebook


headers = {
    "Content-Type": "text/xml",
    "Authorization": f"Bearer {access_token}"
}

# Minimal XMLA Discover request to list catalogs
xmla_request = """<?xml version="1.0"?>
<Envelope xmlns="http://schemas.xmlsoap.org/soap/envelope/">
  <Body>
    <Discover xmlns="urn:schemas-microsoft-com:xml-analysis">
      <RequestType>DBSCHEMA_CATALOGS</RequestType>
      <Restrictions/>
      <Properties/>
    </Discover>
  </Body>
</Envelope>
"""

# Send request
response = requests.post(xmla_url, headers=headers, data=xmla_request)

# Print status and partial content
print("Status Code:", response.status_code)
print("Response (first 500 chars):")
print(response.text[:500])


In [13]:
from fabric import semantic_model
from sempy.tables import Table

def update_lakehouse_metadata(table_name: str, token: str):
    """
    Connects to a Fabric workspace semantic model, loops over columns
    of the given Lakehouse table, retrieves metadata, and updates 
    description, comment, and display_name.

    Parameters:
        table_name (str): Lakehouse table name (e.g., 'product_texts')
        token (str): Bearer token with XMLA/Admin access
    """
    
    # Load Spark table
    df = spark.table(f"{database_name}.{table_name}")

    # Connect to semantic model
    model = Model(workspace_name=WORKSPACE_NAME,token=token)
    
    # Get the table object
    table = model.get_table(table_name)
    
    print(f"Updating metadata for table: {table_name}")
    
    for field in df.schema.fields:
        col_name = field.name
        col_meta = field.metadata if field.metadata else {}
        
        # Retrieve metadata values
        display_name = col_meta.get("display_name", col_name.replace("_", " ").title())
        description  = col_meta.get("description", "")
        comment      = col_meta.get("comment", "")
        
        # Get the column in the semantic model
        col = table.get_column(col_name)
        
        # Print current and new metadata
        print(f"Column: {col_name}")
        print(f"  Current display_name: {col.display_name}")
        print(f"  Current description: {col.description}")
        print(f"  Current comment: {col.comment}")
        print(f"  New display_name: {display_name}")
        print(f"  New description: {description}")
        print(f"  New comment: {comment}")
        print("-" * 50)
        
        # Update semantic model metadata
        col.display_name = display_name
        col.description = description
        col.comment = comment
        col.save()
    
    update_lakehouse_metadata("products", bearer_token)
    print(f"Metadata update completed for table: {table_name}")


StatementMeta(, 70648f3f-687c-463f-a292-3bf7ebb83fd9, 15, Finished, Available, Finished)

ImportError: cannot import name 'semantic_model' from 'fabric' (unknown location)

In [None]:
update_lakehouse_metadata("products", bearer_token, workspace_name)

### Update semantic model's metadata

In [7]:
import requests

def update_semantic_model_from_spark(df, xmla_url, bearer_token, database_name, table_name):
    """
    Updates a Fabric / Power BI semantic model table's column properties
    using Spark DataFrame metadata (display_name, description, nullable, comment).
    
    Args:
        df: Spark DataFrame with metadata in schema
        xmla_url: XMLA endpoint URL (Admin role)
        bearer_token: Azure AD access token with workspace permissions
        database_name: Semantic model (dataset) name
        table_name: Table name in the semantic model
    """
    headers = {
        "Content-Type": "text/xml",
        "Authorization": f"Bearer {bearer_token}"
    }

    # Build XMLA payload for multiple columns in one request
    columns_xml = ""
    for field in df.schema.fields:
        column_name = field.name
        display_name = field.metadata.get("display_name", column_name)
        description = field.metadata.get("description", "")
        nullable = str(field.nullable).lower()  # 'true' or 'false'
        comment = field.metadata.get("comment", "")

        # Each column alteration XML
        columns_xml += f"""
        <Column>
            <ColumnID>{column_name}</ColumnID>
            <Name>{display_name}</Name>
            <Description>{description}</Description>
            <IsNullable>{nullable}</IsNullable>
            <Annotations>
                <Annotation>
                    <Name>comment</Name>
                    <Value>{comment}</Value>
                </Annotation>
            </Annotations>
        </Column>
        """

    # Full XMLA request
    xmla_payload = f"""<?xml version="1.0"?>
<Envelope xmlns="http://schemas.xmlsoap.org/soap/envelope/">
  <Body>
    <Execute xmlns="urn:schemas-microsoft-com:xml-analysis">
      <Command>
        <Alter xmlns="http://schemas.microsoft.com/analysisservices/2003/engine">
          <Object>
            <DatabaseID>{database_name}</DatabaseID>
            <CubeID>{table_name}</CubeID>
          </Object>
          <AlterCommand>
            <Table>
                {columns_xml}
            </Table>
          </AlterCommand>
        </Alter>
      </Command>
      <Properties>
        <PropertyList>
          <LocaleIdentifier>1033</LocaleIdentifier>
        </PropertyList>
      </Properties>
    </Execute>
  </Body>
</Envelope>
"""

    response = requests.post(xmla_url, headers=headers, data=xmla_payload)
    if response.status_code != 200:
        print(f"Failed to update table {table_name}: {response.text}")
    else:
        print(f"Updated table {table_name} columns successfully!")


StatementMeta(, 70648f3f-687c-463f-a292-3bf7ebb83fd9, 9, Finished, Available, Finished)

In [3]:
import requests

scope = "https://analysis.windows.net/powerbi/api/.default"

url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"

data = {
    "grant_type": "client_credentials",
    "client_id": CLIENT_ID,
    "client_secret": CLIENT_SECRET,
    "scope": scope
}

response = requests.post(url, data=data)
access_token = response.json().get("access_token")
print("Bearer token:", access_token)

StatementMeta(, 70648f3f-687c-463f-a292-3bf7ebb83fd9, 5, Finished, Available, Finished)

Bearer token: eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IlBjWDk4R1g0MjBUMVg2c0JEa3poUW1xZ3dNVSIsImtpZCI6IlBjWDk4R1g0MjBUMVg2c0JEa3poUW1xZ3dNVSJ9.eyJhdWQiOiJodHRwczovL2FuYWx5c2lzLndpbmRvd3MubmV0L3Bvd2VyYmkvYXBpIiwiaXNzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvM2JiMWQzNDgtMTE5NS00MjY3LTllMDItYWMzNDI1Nzc4MGZjLyIsImlhdCI6MTc3MDM4Mzg5NiwibmJmIjoxNzcwMzgzODk2LCJleHAiOjE3NzAzODc3OTYsImFpbyI6ImsyWmdZTmoyYUx2TnRlTVhWTGtjZGk2ODRlWnNBZ0E9IiwiYXBwaWQiOiIyYmY1ZjkyZS0wYTdjLTQ1NGEtOGNjOC0zMWY3MzViZGY3NmMiLCJhcHBpZGFjciI6IjEiLCJpZHAiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC8zYmIxZDM0OC0xMTk1LTQyNjctOWUwMi1hYzM0MjU3NzgwZmMvIiwiaWR0eXAiOiJhcHAiLCJvaWQiOiJlNWU1OTc4NS1hMTFhLTQ5NjktYTZjZi1lY2IxMTM2MGNjZTAiLCJyaCI6IjEuQVVzQVNOT3hPNVVSWjBLZUFxdzBKWGVBX0FrQUFBQUFBQUFBd0FBQUFBQUFBQUFBQUFCTEFBLiIsInN1YiI6ImU1ZTU5Nzg1LWExMWEtNDk2OS1hNmNmLWVjYjExMzYwY2NlMCIsInRpZCI6IjNiYjFkMzQ4LTExOTUtNDI2Ny05ZTAyLWFjMzQyNTc3ODBmYyIsInV0aSI6IjBTaEhYN0tGcGstQUp1UklaWklkQUEiLCJ2ZXIiOiIxLjAiLCJ4bXNfYWN0X2ZjdCI6IjMgOSIsInhtc19mdGQiOiJsTjZBMGNPV

In [8]:
# Load Spark DataFrame
df = spark.table("products")

# XMLA endpoint (Admin role)
xmla_url = f"https://{WORKSPACE_NAME}.asazure.windows.net/xmla?role=Admin"
# xmla_url = f"https://asazure.windows.net/xmla?role=Admin&workspaceId=6f4189fd-1e9b-4b03-b188-66b54ecf0c9b"


# Azure AD token
bearer_token = access_token

# Semantic model (dataset) and table
database_name = "test"
table_name = "products"

# Push Spark metadata to semantic model
update_semantic_model_from_spark(df, xmla_url, bearer_token, database_name, table_name)


StatementMeta(, 70648f3f-687c-463f-a292-3bf7ebb83fd9, 10, Finished, Available, Finished)

ConnectionError: HTTPSConnectionPool(host='demo-bike-sales.asazure.windows.net', port=443): Max retries exceeded with url: /xmla?role=Admin (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x722c16aaec50>: Failed to resolve 'demo-bike-sales.asazure.windows.net' ([Errno -2] Name or service not known)"))

In [11]:
spark.sql("TRUNCATE TABLE customer_addresses")
spark.sql("TRUNCATE TABLE customers")
spark.sql("TRUNCATE TABLE employee_addresses")
spark.sql("TRUNCATE TABLE employees")
spark.sql("TRUNCATE TABLE product_categories")
spark.sql("TRUNCATE TABLE product_category_texts")
spark.sql("TRUNCATE TABLE product_texts")
spark.sql("TRUNCATE TABLE products")
spark.sql("TRUNCATE TABLE sales_order_items")
spark.sql("TRUNCATE TABLE sales_orders")
spark.sql("TRUNCATE TABLE vendor_addresses")
spark.sql("TRUNCATE TABLE vendors")

StatementMeta(, e6cf73bc-46af-4134-b40e-760563cc657c, 25, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint]