In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

In [0]:
#Contracts transformation
df_contracts_parquet = spark.read.format("parquet").load("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/contracts/parquet/contracts_data")

In [0]:
#dropping duplicates
df_contracts_cleaned = df_contracts_parquet.dropDuplicates()

In [0]:
#column
df_contracts_cleaned = df_contracts_cleaned.toDF(*[col_name.lower().replace(" " , "_") for col_name in df_contracts_cleaned.columns])

In [0]:
df_contracts_cleaned = df_contracts_cleaned.fillna({
    "contract_amount": 0,
    "contract_lead": "Unknown",
    "contract_id": "Unknown",
    "contractor": "Unknown"
})

In [0]:
# Ensure the date columns are in string format before conversion
df_contracts_cleaned = df_contracts_cleaned \
    .withColumn("Date_signed", to_date(col("Date_signed"), "M/d/yy")) \
    .withColumn("Maturity_date", to_date(col("Maturity_date"), "M/d/yy")) \
    .withColumn("contract_amount", col("contract_amount").cast("float")) \
    .withColumn("estimated_costs", round(col("estimated_costs"),0).cast("float"))

In [0]:
df_contracts_cleaned.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/contracts/delta/contracts_data_delta")

In [0]:
#Contracts transformation
df_transactions_parquet = spark.read.format("parquet").load("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/transactions/parquet/accounting_data")

In [0]:
#dropping duplicates
df_transactions_cleaned = df_transactions_parquet.dropDuplicates()

In [0]:
#unifying column name formats
df_transactions_cleaned = df_transactions_cleaned.toDF(*[col_name.lower().replace(" " , "_") for col_name in df_transactions_cleaned.columns])

In [0]:
#filling n/a values with the appropriate placeholders
df_transactions_cleaned = df_transactions_cleaned.fillna({
    "transaction_amount": 0,
    "invoice_no": "Unknown",
    "description": "Unknown"
    })

In [0]:
# changing to correct datatypes
df_transactions_cleaned = df_transactions_cleaned.withColumn("transaction_date", to_date(col("transaction_date"), "M/d/yy")).withColumn("transaction_amount", col("transaction_amount").cast("bigint"))

In [0]:
df_transactions_cleaned.write.format("delta").mode("append").save("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/transactions/delta/accounting_data_delta")

In [0]:
#employees data 
df_employees_parquet = spark.read.format("parquet").load("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/employees/parquet/employees_data")

In [0]:
#dropping duplicates
df_employees_cleaned = df_employees_parquet.dropDuplicates()


In [0]:
df_employees_cleaned = df_employees_cleaned.toDF(*[col_name.lower().replace(" " , "_") for col_name in df_employees_cleaned.columns])

In [0]:
df_employees_cleaned = df_employees_cleaned.fillna({
    "hourly_rate": 0,
    "number_of_hours": 0,
    "fixed_or_outsourcing": "Unknown",
    "supervisor_name": "Unknown",
    "grade": "Unknown"
})

In [0]:
df_employees_cleaned = df_employees_cleaned.withColumn("date", to_date(col("date"), "M/d/yy")).withColumn("hourly_rate", col("hourly_rate").cast("int")).withColumn("number_of_hours", col("number_of_hours").cast("int"))

In [0]:
df_employees_cleaned = df_employees_cleaned.withColumn("total_costs", col("hourly_rate") * col("number_of_hours"))

In [0]:
df_employees_cleaned.write.format("delta").mode("append").save("abfss://ifrs-project@storagezyesnazarov.dfs.core.windows.net/project-data/employees/delta/employees_data_delta")