### 1. Setting up spark environment

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

storage_account="mystoacc1kad"
application_id="d3637ee5-7e31-4579-87ed-e334690b695e"
directory_id="26af9d76-35fe-404a-b312-869c37aec9c7"
bronze_container_path=f'abfss://bronze@{storage_account}.dfs.core.windows.net/Sales/'
silver_container_path=f'abfss://silver@{storage_account}.dfs.core.windows.net/Sales/'

### 2. configuring storage account

In [0]:
service_credential = dbutils.secrets.get(scope="databricks-secrets-sql-migration", key="appsecretvalue")

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               f"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

### Function to create and return dataframes from .csv files in bonze container

In [0]:
def creating_dataframe(df, file_path):
  print(f"Creating dataframes for {df}")
  df=spark.read \
    .format('csv') \
      .option("header", True) \
        .option('inferSchema', True) \
          .load(f"{bronze_container_path}{file_path}")

  return df

### Function to load Delta files into Silver Layer Container

In [0]:
def write_to_silver(df,path):
  df.coalesce(1).write \
    .format('delta') \
      .mode('overwrite') \
        .save(f"{silver_container_path}{path}/")
  
  print(f"File saved succesfully ")

### Function to change datatype of columns from timestamp to date

In [0]:
def changing_timestamp_todate(df):
    for col in df.columns:
        if "date" in col.lower():
            df=df.withColumn(col,to_date(col))
    return df

# Main Pipeline

In [0]:
def run_pipeline():
    print("Pipeline started...")
    
    for folder in dbutils.fs.ls(bronze_container_path):

        for file in dbutils.fs.ls(folder.path):
            file_path=folder.name+file.name
            df_name=file.name.split('.')[0]

            #Creating dataframe
            df=creating_dataframe(df_name,file_path)

            #Changing timestamp column to date 
            df=changing_timestamp_todate(df)

            #Writing transformed data in delta format to Silver Layer for future transformations
            write_to_silver(df,df_name)
    


### Running Pipeline

In [0]:
run_pipeline()