# Sales Data Cleaning & Preparation (Databricks)

This notebook performs data cleaning, validation, and transformation on raw sales data
ingested into Azure Data Lake Storage Gen2.  

The transformations include:
- Loading data from ADLS-mount points  
- Handling missing or inconsistent values  
- Standardizing column formats  
- Creating derived metrics  
- Generating cleaned datasets for Synapse Analytics  

## Import Required Libraries
Import Required Libraries
We begin by importing PySpark functions needed for data cleaning and transformations.

In [None]:
from pyspark.sql.functions import col, sum, when


## üîê Mount Azure Data Lake Storage (ADLS Gen2) in Databricks

To perform data cleaning in Databricks, we first mount the Azure Data Lake Storage 
containers (`raw-data` and `transformed-data`) into the Databricks File System (DBFS).

### Why Mount ADLS?
Mounting allows:
- Secure access to Azure Blob/File storage  
- Direct read/write operations using `/mnt/...` paths  
- Simplified ETL workflows  

### Authentication
Access is authenticated using:
- A **Databricks Secret Scope** (`databricksScope`)
- A **Storage Account Access Key**, securely stored as a secret (`qarsme`)

### Important Notes
- Storage keys MUST never appear in plain text  
- Secrets must always be retrieved using `dbutils.secrets.get()`  
- Mounting is done once; future sessions can directly access `/mnt/raw-data` and `/mnt/transformed-data`


In [None]:
# Unmount (only if you are sure)
dbutils.fs.unmount("/mnt/raw-data")

# Then remount with new config
dbutils.fs.mount(
    source="wasbs://raw-data@crmstorageaccountyt.blob.core.windows.net",
    mount_point="/mnt/raw-data",
    extra_configs = {
       "fs.azure.account.key.crmstorageaccountyt.blob.core.windows.net":
           dbutils.secrets.get("databricksScope","secretskv")
    }
)

#Then remount with new config
dbutils.fs.mount(
    source="wasbs://transformed-data@crmstorageaccountyt.blob.core.windows.net",
    mount_point="/mnt/transformed-data",
    extra_configs = {
       "fs.azure.account.key.crmstorageaccountyt.blob.core.windows.net":
           dbutils.secrets.get("databricksScope","secretskv")
    }
)


In [None]:
# list all mounts (look for /mnt/raw-data)
for m in dbutils.fs.mounts():
    print(m.mountPoint, "->", m.source)

# list the content to confirm mount works
try:
    display(dbutils.fs.ls("/mnt/raw-data"))
except Exception as e:
    print("ls error:", e)


/databricks-datasets -> databricks-datasets
/Volumes -> UnityCatalogVolumes
/databricks/mlflow-tracking -> databricks/mlflow-tracking
/mnt/raw-data -> wasbs://raw-data@crmstorageaccountyt.blob.core.windows.net
/databricks-results -> databricks-results
/databricks/mlflow-registry -> databricks/mlflow-registry
/mnt/transformed-data -> wasbs://transformed-data@crmstorageaccountyt.blob.core.windows.net
/Volume -> DbfsReserved
/volumes -> DbfsReserved
/ -> DatabricksRoot
/volume -> DbfsReserved


path,name,size,modificationTime
dbfs:/mnt/raw-data/accounts.csv,accounts.csv,4670,1764567390000
dbfs:/mnt/raw-data/data_dictionary.csv,data_dictionary.csv,996,1764567405000
dbfs:/mnt/raw-data/products.csv,products.csv,171,1764567448000
dbfs:/mnt/raw-data/sales_pipeline.csv,sales_pipeline.csv,637773,1764567432000
dbfs:/mnt/raw-data/sales_teams.csv,sales_teams.csv,1284,1764567465000


## üìÇ Explore Raw Data in ADLS (Mounted Storage)

We list the raw files stored in the Azure Data Lake (mounted on `/mnt/raw-data`).  
This allows us to verify file availability before performing transformations.

In [None]:
dbutils.fs.ls("/mnt/raw-data")

[FileInfo(path='dbfs:/mnt/raw-data/accounts.csv', name='accounts.csv', size=4670, modificationTime=1764567390000),
 FileInfo(path='dbfs:/mnt/raw-data/data_dictionary.csv', name='data_dictionary.csv', size=996, modificationTime=1764567405000),
 FileInfo(path='dbfs:/mnt/raw-data/products.csv', name='products.csv', size=171, modificationTime=1764567448000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_pipeline.csv', name='sales_pipeline.csv', size=637773, modificationTime=1764567432000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_teams.csv', name='sales_teams.csv', size=1284, modificationTime=1764567465000)]

In [None]:
dbutils.fs.ls("/mnt/transformed-data")  

[]

## üì• Load Raw CSV Files

The raw datasets are loaded into Spark DataFrames for processing.

Datasets:
- accounts.csv  
- products.csv  
- sales_pipeline.csv  
- sales_teams.csv  
- data_dictionary.csv  

In [None]:
accounts_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/accounts.csv")
data_dictionary_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/data_dictionary.csv")
products_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/products.csv")
sales_pipeline_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_pipeline.csv")
sales_teams_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_teams.csv")

accounts_df.show(2)


+----------------+---------+----------------+-------+---------+---------------+-------------+
|         account|   sector|year_established|revenue|employees|office_location|subsidiary_of|
+----------------+---------+----------------+-------+---------+---------------+-------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|         NULL|
|      Betasoloin|  medical|            1999| 251.41|      495|  United States|         NULL|
+----------------+---------+----------------+-------+---------+---------------+-------------+
only showing top 2 rows


## üß™ Data Overview & Schema Validation

In [None]:
print(accounts_df.columns)
print(data_dictionary_df.columns)
print(products_df.columns)
print(sales_pipeline_df.columns)
print(sales_teams_df.columns)

['account', 'sector', 'year_established', 'revenue', 'employees', 'office_location', 'subsidiary_of']
['Table', 'Field', 'Description']
['product', 'series', 'sales_price']
['opportunity_id', 'sales_agent', 'product', 'account', 'deal_stage', 'engage_date', 'close_date', 'close_value']
['sales_agent', 'manager', 'regional_office']


## üßπ Data Cleaning & Transformations

Cleaning operations performed:
- Handling null values
- Renaming inconsistent columns
- Removing duplicates
- Type casting numeric fields
- Standardizing string formats

In [None]:
accounts_df = accounts_df.withColumnRenamed("subsidiary_of", "parent_company")
data_dictionary_df = data_dictionary_df.withColumnRenamed("Table", "table").withColumnRenamed("Field", "field").withColumnRenamed("Description", "description")

accounts_df.show(1)


+----------------+---------+----------------+-------+---------+---------------+--------------+
|         account|   sector|year_established|revenue|employees|office_location|parent_company|
+----------------+---------+----------------+-------+---------+---------------+--------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|          NULL|
+----------------+---------+----------------+-------+---------+---------------+--------------+
only showing top 1 row


In [None]:
data_dictionary_df.show(1)

+--------+-------+------------+
|   table|  field| description|
+--------+-------+------------+
|accounts|account|Company name|
+--------+-------+------------+
only showing top 1 row


In [None]:
# 1Ô∏è‚É£ NULL COUNTS FOR accounts_df
null_counts_accounts_df = accounts_df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in accounts_df.columns]
)

# 2Ô∏è‚É£ NULL COUNTS FOR data_dictionary_df
null_counts_data_dictionary_df = data_dictionary_df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in data_dictionary_df.columns]
)

# 3Ô∏è‚É£ NULL COUNTS FOR products_df
null_counts_products_df = products_df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in products_df.columns]
)

# 4Ô∏è‚É£ NULL COUNTS FOR sales_pipeline_df
null_counts_sales_pipeline_df = sales_pipeline_df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in sales_pipeline_df.columns]
)

# 5Ô∏è‚É£ NULL COUNTS FOR sales_teams_df
null_counts_sales_teams_df = sales_teams_df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in sales_teams_df.columns]
)

# SHOW RESULTS
null_counts_accounts_df.show()
null_counts_data_dictionary_df.show()
null_counts_products_df.show()
null_counts_sales_pipeline_df.show()
null_counts_sales_teams_df.show()

+-------+------+----------------+-------+---------+---------------+--------------+
|account|sector|year_established|revenue|employees|office_location|parent_company|
+-------+------+----------------+-------+---------+---------------+--------------+
|      0|     0|               0|      0|        0|              0|            70|
+-------+------+----------------+-------+---------+---------------+--------------+

+-----+-----+-----------+
|table|field|description|
+-----+-----+-----------+
|    0|    0|          0|
+-----+-----+-----------+

+-------+------+-----------+
|product|series|sales_price|
+-------+------+-----------+
|      0|     0|          0|
+-------+------+-----------+

+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|opportunity_id|sales_agent|product|account|deal_stage|engage_date|close_date|close_value|
+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|             0|          0|      

In [None]:
accounts_df = accounts_df.fillna({
    "parent_company": "Independent"
})

sales_pipeline_df = sales_pipeline_df.fillna({
    "account": "unknown"
})

In [None]:
null_counts_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])
null_counts_accounts_df.display()

account,sector,year_established,revenue,employees,office_location,parent_company
0,0,0,0,0,0,0


## üíæ Save Cleaned Data to ADLS (Transformed Container)

The cleaned datasets are written back to Azure Data Lake under the `transformed-data` container.  
These files are later used by Azure Synapse for querying via serverless SQL.


In [None]:
accounts_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/accounts")
data_dictionary_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/data_dictionary")
products_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/products")
sales_pipeline_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/sales_pipeline")
sales_teams_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/sales_teams")    