## Data Ingestion Sales  Data Set
We are going to load the Sales Data Set from a private Azure Storage Account.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>No changes are required to this cell, This cell have all the necessary credentials to Ingest data from storage account
</font>

In [24]:
from azure.identity import DefaultAzureCredential, ClientSecretCredential
from azure.keyvault.secrets import SecretClient
import notebookutils

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 26, Finished, Available, Finished)

# Example of a function
This is a function that abstracts the complexity of getting an oauth token for use in fabric rest apis

In [17]:
def get_api_token_via_akv(kv_uri:str, client_id_secret:str, tenant_id_secret:str, client_secret_name:str)->str:
    """
    Function to retrieve an api token used to authenticate with Microsoft Fabric APIs

    kv_uri:str: The uri of the azure key vault
    client_id_secret:str: The name of the key used to store the value for the client id in the akv
    tenant_id_secret:str: The name of the key used to store the value for the tenant id in the akv
    client_secret_name:str: The name of the key used to store the value for the client secret in the akv

    """
    client_id = notebookutils.credentials.getSecret(kv_uri, client_id_secret)
    tenant_id = notebookutils.credentials.getSecret(kv_uri, tenant_id_secret)
    client_secret = notebookutils.credentials.getSecret(kv_uri, client_secret_name)

    credential = ClientSecretCredential(tenant_id, client_id, client_secret)
    scope = 'https://analysis.windows.net/powerbi/api/.default'
    token = credential.get_token(scope).token

    return token

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 19, Finished, Available, Finished)

In [None]:

kv_uri = 'https://kvfabricprodeus2rh.vault.azure.net/'
client_id_secret = 'fuam-spn-client-id'
tenant_id_secret = 'fuam-spn-tenant-id'
client_secret_name = 'fuam-spn-secret'


# Example using a sas token

In [23]:
# get sas token
kv_uri = 'https://kvfabricprodeus2rh.vault.azure.net/'
secret_name = 'public-sa-sas'

sas_value = notebookutils.credentials.getSecret(kv_uri, secret_name)

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 25, Finished, Available, Finished)

In [25]:
# Providing the details for the Azure Storage account
# Mention about the SAS key
# sv=2024-11-04&ss=bf&srt=c&sp=rl&se=2025-09-04T20:57:43Z&st=2025-08-01T12:42:43Z&spr=https&sig=Cfe8rDObBLLws3fItrJZ4BlYdYhrSI5M5sT5Z8TKkK0%3D
storage_account = "publicdatasetssa"
container = "fabricdatafactorylab"


# sas_token = "sv=2024-11-04&ss=bf&srt=c&sp=rl&se=2025-09-04T20:57:43Z&st=2025-08-01T12:42:43Z&spr=https&sig=Cfe8rDObBLLws3fItrJZ4BlYdYhrSI5M5sT5Z8TKkK0%3D" 

# Set Spark config to access  dfs storage
spark.conf.set("fs.azure.sas.%s.%s.dfs.core.windows.net" % (container, storage_account),sas_value)


StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 27, Finished, Available, Finished)

In [26]:
# We specify the path for the csv files we need to read. By reading the *.csv, we are reading all the files from the folder Facts
abfss_path1 = f"abfss://{container}@{storage_account}.dfs.core.windows.net/csv/Facts/Sales_File1.csv"
abfss_path2 = f"abfss://{container}@{storage_account}.dfs.core.windows.net/csv/Facts/Sales_File2.csv"

# We are specifying the file type, csv and displaying the headers of the sales files
sales_df1 = spark.read.format("csv").option("delimiter", ";").option("header","true").option("inferSchema", "true").load(abfss_path1)
sales_df2 = spark.read.format("csv").option("delimiter", ";").option("header","true").option("inferSchema", "true").load(abfss_path2)

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 28, Finished, Available, Finished)

In [27]:
# We are currently showcasing both data frames
display(sales_df2)
display(sales_df1)

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 29, Submitted, Running, Running)

SynapseWidget(Synapse.DataFrame, 5c821a31-4a05-4c81-9965-ff43575904bd)

In [None]:
# We print only the schema to better understand the dataframes data types structure

sales_df1.printSchema()
sales_df1.count()


StatementMeta(, , -1, Waiting, , Waiting)

In [7]:
# We print only the schema to better understand the dataframes data types structure. 
# As you can see these two dataframes have different columns. Hence a merge join is required.

sales_df2.printSchema()
sales_df2.count()

StatementMeta(, 95611309-dc88-4dc3-a398-c8b106f472e2, 9, Finished, Available, Finished)

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)
 |-- SalesQuantity: integer (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- ReturnQuantity: integer (nullable = true)
 |-- ReturnAmount: string (nullable = true)
 |-- DiscountQuantity: integer (nullable = true)
 |-- DiscountAmount: string (nullable = true)
 |-- TotalCost: string (nullable = true)
 |-- UnitCost: string (nullable = true)
 |-- UnitPrice: string (nullable = true)



1000000

In [None]:
# By ordering the data frames based on the columns that will be used for the merge join, we can significantly improve the efficiency of the join operation.
# In this case the column SalesOrderNumber
from pyspark.sql.functions import col

sorted_sales_df1 = sales_df1.orderBy(col("SalesOrderNumber"),col("SalesOrderLineNumber"))

display(sorted_sales_df1)

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
# By ordering the data frames based on the columns that will be used for the merge join, we can significantly improve the efficiency of the join operation.
# In this case the column SalesOrderNumber
from pyspark.sql.functions import col

sorted_sales_df2 = sales_df2.orderBy(col("SalesOrderNumber"),col("SalesOrderLineNumber"))
display(sorted_sales_df2)

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
# This cell contains an inner join operation on the 'SalesOrderNumber' and 'SalesOrderLineNumber' columns.

merged_df = sorted_sales_df1.join(
    sorted_sales_df2,
    on=["SalesOrderNumber", "SalesOrderLineNumber"],
    how="inner"
)

# Show the result
merged_df.printSchema()

StatementMeta(, , -1, Waiting, , Waiting)

##### Prior to providing a table name, we verify that no other table shares the same name.

In [None]:
%%sql
DROP TABLE IF EXISTS Sales;


StatementMeta(, , -1, Waiting, , Waiting)

##  Sales DataSet ingestion onto a Managed Lakehouse Table


<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>
</font>

In [None]:
# Writing the Data Frame directly into the Delta Table from Managed Zone
table_name = 'Sales'

merged_df \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .save("Tables/" + table_name)

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
# Now that the table has been created, we can utilize Pyspark SQL to generate a new data frame and load the table
table_name = "Sales"

# Read the table into a DataFrame
df = spark.read.table(table_name)

# Calculate the number of rows
row_count = df.count()

# Print or use the row count as needed
print("Number of rows:", row_count)

StatementMeta(, , -1, Waiting, , Waiting)

In [None]:
# To display the content of the table as dataframe we run the display command:
display(df)

StatementMeta(, , -1, Waiting, , Waiting)