### Config (ADLS Gen2 via SAS) + DB setup

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, DoubleType, DateType


SAS_TOKEN = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-08-24T00:33:44Z&st=2025-08-23T16:18:44Z&spr=https&sig=23T04F6gi0UoDFPg6LFEylxjw4da2Zicur8%2Fpl7CRLg%3D"

# Your storage info
ACCOUNT_NAME = "sivagamistudentstorage"
CONTAINER    = "mycontainer"

# Lake paths 

LAKE_BASE   = f"abfss://{CONTAINER}@{ACCOUNT_NAME}.dfs.core.windows.net"
RAW_PATH    = f"{LAKE_BASE}/raw/employees_csv"         
DELTA_PATH  = f"{LAKE_BASE}/silver/employees_delta"  

print("RAW_PATH :", RAW_PATH)
print("DELTA_PATH:", DELTA_PATH)

# Configure Spark to use SAS for ADLS Gen2 

fqdn = f"{ACCOUNT_NAME}.dfs.core.windows.net"
spark.conf.set(f"fs.azure.account.auth.type.{fqdn}", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{fqdn}", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{fqdn}", SAS_TOKEN)

# Lightweight performance defaults

spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)

# Create / use a workspace-local database (you’re not using Unity Catalog)
spark.sql("CREATE DATABASE IF NOT EXISTS employees_demo")
spark.sql("USE employees_demo")


RAW_PATH : abfss://mycontainer@sivagamistudentstorage.dfs.core.windows.net/raw/employees_csv
DELTA_PATH: abfss://mycontainer@sivagamistudentstorage.dfs.core.windows.net/silver/employees_delta


DataFrame[]

### Read CSV from the lake (RAW zone)

In [0]:
# Point to the exact file in RAW
raw_csv = f"{RAW_PATH}/employees.csv"

# Read with header + infer schema
df_raw = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(raw_csv))

df_raw.show(5)
df_raw.printSchema()
print("Rows:", df_raw.count())


+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+--------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03| AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|  MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|  MK_REP|  6000|            - |       201|           20|
+-----------+---

### Minimal typing/cleanup + Temp View

In [0]:
# Cast common columns (adjust names if your CSV differs)
df = (df_raw
      .withColumn("EMPLOYEE_ID",   col("EMPLOYEE_ID").cast(IntegerType()))
      .withColumn("SALARY",        col("SALARY").cast(DoubleType()))
      .withColumn("COMMISSION_PCT",col("COMMISSION_PCT").cast(DoubleType()))
      .withColumn("MANAGER_ID",    col("MANAGER_ID").cast(IntegerType()))
      .withColumn("DEPARTMENT_ID", col("DEPARTMENT_ID").cast(IntegerType()))


      .withColumn("HIRE_DATE",     to_date(col("HIRE_DATE"), "dd-MMM-yy").cast(DateType()))
      )

df.createOrReplaceTempView("employees_raw")

In [0]:
# Quick sanity
spark.sql("""
SELECT COUNT(*) total,
       MIN(SALARY) min_sal,
       MAX(SALARY) max_sal,
       ROUND(AVG(SALARY),2) avg_sal
FROM employees_raw
""").show()

+-----+-------+-------+-------+
|total|min_sal|max_sal|avg_sal|
+-----+-------+-------+-------+
|   50| 2100.0|24000.0|6182.32|
+-----+-------+-------+-------+



### Write Delta (Silver) to the lake + Register table

In [0]:
# Write to Delta (Silver)
(df.coalesce(1)
   .write
   .format("delta")
   .mode("overwrite")
   .save(DELTA_PATH))

# Register external Delta table
spark.sql("DROP TABLE IF EXISTS employees_delta")
spark.sql(f"""
CREATE TABLE employees_delta
USING DELTA
LOCATION '{DELTA_PATH}'
""")
spark.sql("DESCRIBE DETAIL employees_delta").show(truncate=False)
spark.sql("SELECT COUNT(*) AS total FROM employees_delta").show()


+------+------------------------------------+--------------------------------------------+-----------+--------------------------------------------------------------------------------------+-----------------------+-------------------+----------------+-----------------+--------+-----------+-------------------------------------+----------------+----------------+-----------------------------------------+---------------------------------------------------------------+-------------+
|format|id                                  |name                                        |description|location                                                                              |createdAt              |lastModified       |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties                           |minReaderVersion|minWriterVersion|tableFeatures                            |statistics                                                     |clusterByAuto|
+------+----------------------------

###  PySparkSQL exploration queries

In [0]:
# Top 5 highest paid
spark.sql("""
SELECT FIRST_NAME, LAST_NAME, JOB_ID, SALARY
FROM employees_delta
ORDER BY SALARY DESC
LIMIT 5
""").show()

+----------+---------+-------+-------+
|FIRST_NAME|LAST_NAME| JOB_ID| SALARY|
+----------+---------+-------+-------+
|    Steven|     King|AD_PRES|24000.0|
|     Neena|  Kochhar|  AD_VP|17000.0|
|       Lex|  De Haan|  AD_VP|17000.0|
|   Michael|Hartstein| MK_MAN|13000.0|
|   Shelley|  Higgins| AC_MGR|12008.0|
+----------+---------+-------+-------+



In [0]:
# Employees per department
spark.sql("""
SELECT DEPARTMENT_ID, COUNT(*) AS emp_count
FROM employees_delta
GROUP BY DEPARTMENT_ID
ORDER BY emp_count DESC
""").show()

+-------------+---------+
|DEPARTMENT_ID|emp_count|
+-------------+---------+
|           50|       23|
|          100|        6|
|           30|        6|
|           60|        5|
|           90|        3|
|           20|        2|
|          110|        2|
|           10|        1|
|           40|        1|
|           70|        1|
+-------------+---------+



In [0]:
# Average salary by job role
spark.sql("""
SELECT JOB_ID, ROUND(AVG(SALARY),2) AS avg_salary, COUNT(*) AS emp_count
FROM employees_delta
GROUP BY JOB_ID
ORDER BY avg_salary DESC
""").show()

+----------+----------+---------+
|    JOB_ID|avg_salary|emp_count|
+----------+----------+---------+
|   AD_PRES|   24000.0|        1|
|     AD_VP|   17000.0|        2|
|    MK_MAN|   13000.0|        1|
|    AC_MGR|   12008.0|        1|
|    FI_MGR|   12008.0|        1|
|    PU_MAN|   11000.0|        1|
|    PR_REP|   10000.0|        1|
|AC_ACCOUNT|    8300.0|        1|
|FI_ACCOUNT|    7920.0|        5|
|    ST_MAN|    7280.0|        5|
|    HR_REP|    6500.0|        1|
|    MK_REP|    6000.0|        1|
|   IT_PROG|    5760.0|        5|
|   AD_ASST|    4400.0|        1|
|  PU_CLERK|    2780.0|        5|
|  ST_CLERK|    2750.0|       16|
|  SH_CLERK|    2600.0|        2|
+----------+----------+---------+



In [0]:
# Manager-employee counts
spark.sql("""
SELECT MANAGER_ID, COUNT(*) AS direct_reports
FROM employees_delta
WHERE MANAGER_ID IS NOT NULL
GROUP BY MANAGER_ID
ORDER BY direct_reports DESC
""").show()

+----------+--------------+
|MANAGER_ID|direct_reports|
+----------+--------------+
|       100|             9|
|       101|             5|
|       108|             5|
|       114|             5|
|       123|             4|
|       121|             4|
|       103|             4|
|       122|             4|
|       120|             4|
|       124|             2|
|       102|             1|
|       205|             1|
|       201|             1|
+----------+--------------+



### Optimize Delta

In [0]:
# Compact small files
spark.sql("OPTIMIZE employees_delta")

# Compute stats to help the optimizer
spark.sql("ANALYZE TABLE employees_delta COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE employees_delta COMPUTE STATISTICS FOR COLUMNS SALARY, DEPARTMENT_ID, JOB_ID")


DataFrame[]

### Example: filtered query + plan 

In [0]:
query = """
SELECT DEPARTMENT_ID,
       COUNT(*) AS emp_count,
       ROUND(AVG(SALARY),2) AS avg_sal,
       MAX(SALARY) AS max_sal
FROM employees_delta
WHERE SALARY >= 5000
GROUP BY DEPARTMENT_ID
ORDER BY avg_sal DESC
"""
                     
spark.sql(query).show()
                     
# Explain plan to verify partition/pruning/skipping benefits
spark.sql(f"EXPLAIN FORMATTED {query}").show(truncate=False)

                   

+-------------+---------+--------+-------+
|DEPARTMENT_ID|emp_count| avg_sal|max_sal|
+-------------+---------+--------+-------+
|           90|        3|19333.33|24000.0|
|           30|        1| 11000.0|11000.0|
|          110|        2| 10154.0|12008.0|
|           70|        1| 10000.0|10000.0|
|           20|        2|  9500.0|13000.0|
|          100|        6| 8601.33|12008.0|
|           60|        2|  7500.0| 9000.0|
|           50|        5|  7280.0| 8200.0|
|           40|        1|  6500.0| 6500.0|
+-------------+---------+--------+-------+

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Quick chart in Databricks UI

In [0]:
dept_avg = spark.sql("""
SELECT DEPARTMENT_ID, ROUND(AVG(SALARY),2) AS avg_salary
FROM employees_delta
GROUP BY DEPARTMENT_ID
ORDER BY DEPARTMENT_ID
""")
display(dept_avg) 


DEPARTMENT_ID,avg_salary
10,4400.0
20,9500.0
30,4150.0
40,6500.0
50,3721.74
60,5760.0
70,10000.0
90,19333.33
100,8601.33
110,10154.0


Databricks visualization. Run in Databricks to view.