In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<CLIENT_ID",
"fs.azure.account.oauth2.client.secret": "<SECRET>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<TENANT_ID>/oauth2/token"}


In [0]:
#dbutils.fs.mount(
#source = "abfss://finance-etl@storagezyesnazarov.dfs.core.windows.net",
#mount_point = "/mnt/finance-etl",
#extra_configs = configs)

In [0]:
spark = SparkSession.builder.appName("financials-etl-pipeline").getOrCreate()
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")


In [0]:
bs_csv_data = spark.read.csv("/mnt/finance-etl/raw_data/balance_sheet/", header=True, inferSchema=True)
pl_csv_data = spark.read.csv("/mnt/finance-etl/raw_data/profit_and_loss/", header=True, inferSchema=True)
cf_csv_data = spark.read.csv("/mnt/finance-etl/raw_data/cashflow/", header=True, inferSchema=True)

In [0]:
## Preparing column names for further UNION function, making sure that all the column names are in lowercase and have no special characters, i.e. standardized

def clean_column_names(df):
    new_columns = []
    for col_name in df.columns:
        clean_name = re.sub(r'[^a-zA-Z0-9]', '_', col_name).lower()
        clean_name = re.sub(r'(_00_00_00)$', '', clean_name)
        clean_name = clean_name.replace('__', '_').strip('_')
        new_columns.append(clean_name)

    return df.toDF(*new_columns)

df_bs_columns_formatted = clean_column_names(bs_csv_data)
df_pl_columns_formatted = clean_column_names(pl_csv_data)
df_cf_columns_formatted = clean_column_names(cf_csv_data)
df_overview_columns_formatted = clean_column_names(overview_csv_data)

In [0]:
# Transferring data to delta lake format for further manipulation
df_bs_columns_formatted.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("/mnt/finance-etl/delta_data/bs_data")

In [0]:
df_pl_columns_formatted.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("/mnt/finance-etl/delta_data/pl_data")

In [0]:
df_cf_columns_formatted.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("/mnt/finance-etl/delta_data/cf_data")

In [0]:
df_bs = df_bs_columns_formatted
df_pl = df_pl_columns_formatted
df_cf = df_cf_columns_formatted

In [0]:
# Merging everything into one big table with all pl, bs, cf lines
# Will be done based on type, symbol and columns
# choosing date columns based on regular expressions with patterns 4 digits YYYY, 2 digits MM, 2 digits DD
# since the financial information is quarterly, the date columns will be the same for bs, pl, cf data
# it's indifferent from which source dates will be pulled in, let's choose bs as a source

date_columns = [col for col in df_bs.columns if re.match(r"\d{4}_\d{2}_\d{2}", col)]
common_columns = ["symbol","description","type"] + date_columns

In [0]:
combined_financials = df_bs.select(common_columns).unionByName(df_pl.select(common_columns)).unionByName(df_cf.select(common_columns))

In [0]:
#saving as Delta table fur further queriying via SQL, since the column can change, based on the available info, we will need to overwrite schema
combined_financials.write.format("delta").option("overwriteSchema","true").mode("overwrite").save("/mnt/delta/financials")

In [0]:
%sql 
CREATE OR REPLACE TABLE financials
USING DELTA
AS
SELECT * FROM delta.`/mnt/delta/financials`

num_affected_rows,num_inserted_rows


In [0]:
#Creating a function to move all the raw data to the processed data folder once the data is processed
source_root = "/mnt/finance-etl/raw_data/"
destination_root = "/mnt/finance-etl/processed_data/"

subfolders = [folder.name for folder in dbutils.fs.ls(source_root) if folder.isDir()]

for subfolder in subfolders:
    subfolder_path = source_root + subfolder + "/"
    destination_subfolder_path = destination_root + subfolder + "/"

    files = dbutils.fs.ls(subfolder_path)
    for file in files:
        file_name = file.name
        source_file_path = file.path
        destination_file_path = destination_subfolder_path + file_name

        dbutils.fs.mv(source_file_path, destination_file_path)