#  Bronze Layer Notebook

##  Objective:
This notebook handles:
- Reading raw CSV files from the Azure Blob Storage (`raw` container).
- Adding audit columns.
- Converting the data into optimized Parquet format.
- Writing the processed files to the Bronze layer in ADLS Gen2.

---

##  Step 1: Start Spark Session
Initializes the Spark session required to perform all operations.

---

##  Step 2: Mount Raw Container
Connects Databricks to the Azure Blob storage (`raw`) container using a mount point, enabling access to CSV files.

---

##  Step 3: List Files
Lists all files present in the raw container to verify visibility and structure.

---

##  Step 4: Read CSV Files into DataFrames
Each raw CSV file (e.g., `players.csv`, `matches.csv`, etc.) is read into separate PySpark DataFrames.

---

##  Step 5: Add Audit Columns
Adds:
- `ingestion_date`: Timestamp when the file is processed.
- `source_file`: Name of the file being read.

These help with data lineage and tracking.

---

##  Step 6: Write to Bronze Layer
Data is saved in Parquet format (optimized for analytics) to the Bronze container using `overwrite` mode.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Statement of Work").getOrCreate()

In [0]:
dbutils.fs.unmount("/mnt/blob_storage")

/mnt/blob_storage has been unmounted.
Out[4]: True

In [0]:
dbutils.fs.mount(
  source="wasbs://raw@iplblob.blob.core.windows.net",
  mount_point="/mnt/blob_storage",
  extra_configs={"fs.azure.account.key.iplblob.blob.core.windows.net": "g+DtFx28yRFLrml30yo8yBqpc02Vmc/c/igV+t5ulx46pziXxHEobQUiWvPCROCnrYPqRqLUD+HY+AStg9eUWw=="}
)

Out[5]: True

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/blob_storage,wasbs://raw@iplblob.blob.core.windows.net,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/mnt/adls_storage/silver,wasbs://silver-ipl@adlsipl.blob.core.windows.net,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/adls_storage/bronze,wasbs://bronze-ipl@adlsipl.blob.core.windows.net,
/Volume,DbfsReserved,
/volumes,DbfsReserved,


In [0]:
display(dbutils.fs.ls("/mnt/blob_storage"))

path,name,size,modificationTime
dbfs:/mnt/blob_storage/match_performance.csv,match_performance.csv,482,1743689136000
dbfs:/mnt/blob_storage/match_stadium.csv,match_stadium.csv,180,1743689028000
dbfs:/mnt/blob_storage/player_performance.csv,player_performance.csv,396,1743689136000
dbfs:/mnt/blob_storage/player_team.csv,player_team.csv,602,1743689136000
dbfs:/mnt/blob_storage/stadium.csv,stadium.csv,225,1743689028000
dbfs:/mnt/blob_storage/team.csv,team.csv,281,1743689028000


In [0]:
df_match_performance = spark.read.csv("dbfs:/mnt/blob_storage/match_performance.csv", header=True, inferSchema=True)\
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("match_performance.csv"))

In [0]:
# df_match_performance.display()

In [0]:
df_match_stadium = spark.read.csv("dbfs:/mnt/blob_storage/match_stadium.csv", header=True, inferSchema=True) \
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("match_stadium.csv"))

In [0]:
# df_match_stadium.display()

In [0]:
df_player_team = spark.read.csv("dbfs:/mnt/blob_storage/player_team.csv", header=True, inferSchema=True) \
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("player_team.csv"))

In [0]:
# df_player_team.display()

In [0]:
df_player_performance = spark.read.csv("dbfs:/mnt/blob_storage/player_performance.csv", header=True, inferSchema=True) \
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("player_performance.csv"))

In [0]:
# df_player_performance.display()

In [0]:
df_stadium = spark.read.csv("dbfs:/mnt/blob_storage/stadium.csv", header=True, inferSchema=True) \
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("stadium.csv"))

In [0]:
# df_stadium.display()

In [0]:
df_team = spark.read.csv("dbfs:/mnt/blob_storage/team.csv", header=True, inferSchema=True) \
    .withColumn("ingestion_date", current_date()).withColumn("source_file", lit("team.csv"))

In [0]:
# df_team.display()

In [0]:
dbutils.fs.unmount("/mnt/adls_storage/bronze")

/mnt/adls_storage/bronze has been unmounted.
Out[22]: True

In [0]:
dbutils.fs.mount(
  source="wasbs://bronze-ipl@adlsipl.blob.core.windows.net",
  mount_point="/mnt/adls_storage/bronze",
  extra_configs={
      "fs.azure.account.key.adlsipl.blob.core.windows.net": "xwfw4Y9fMPe4mMkfTzU8HckStsCQeSgHxkeyKWeENgpuVE7YYPqRJdYvKbvr+WtUe1GfrtHKRVnF+AStyw9REw=="
  }
)


Out[23]: True

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/blob_storage,wasbs://raw@iplblob.blob.core.windows.net,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/mnt/adls_storage/silver,wasbs://silver-ipl@adlsipl.blob.core.windows.net,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/adls_storage/bronze,wasbs://bronze-ipl@adlsipl.blob.core.windows.net,
/Volume,DbfsReserved,
/volumes,DbfsReserved,


In [0]:
display(dbutils.fs.ls("/mnt/adls_storage/bronze"))

path,name,size,modificationTime
dbfs:/mnt/adls_storage/bronze/match_performance_bronze.parquet/,match_performance_bronze.parquet/,0,0
dbfs:/mnt/adls_storage/bronze/match_stadium_bronze.parquet/,match_stadium_bronze.parquet/,0,0
dbfs:/mnt/adls_storage/bronze/player_performance_bronze.parquet/,player_performance_bronze.parquet/,0,0
dbfs:/mnt/adls_storage/bronze/player_team_bronze.parquet/,player_team_bronze.parquet/,0,0
dbfs:/mnt/adls_storage/bronze/stadium_bronze.parquet/,stadium_bronze.parquet/,0,0
dbfs:/mnt/adls_storage/bronze/team_bronze.parquet/,team_bronze.parquet/,0,0


In [0]:
df_match_performance.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/match_performance_bronze.parquet")
df_match_stadium.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/match_stadium_bronze.parquet")
df_player_performance.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/player_performance_bronze.parquet")
df_player_team.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/player_team_bronze.parquet")
df_stadium.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/stadium_bronze.parquet")
df_team.write.format("parquet").mode("overwrite").save("/mnt/adls_storage/bronze/team_bronze.parquet") 