### Step 1: Initialize Parameters
This step sets up the widgets to receive parameters specific for Sales transformations.


In [0]:
# Set up widgets to receive parameters
dbutils.widgets.text("business_unit", "")
dbutils.widgets.text("source_path", "")
dbutils.widgets.text("destination_path", "")
dbutils.widgets.text("source_name", "")
dbutils.widgets.text("transformation_rules", "{}")

# Retrieve the parameter values
business_unit = dbutils.widgets.get("business_unit")
source_path = dbutils.widgets.get("source_path")
destination_path = dbutils.widgets.get("destination_path")
source_name = dbutils.widgets.get("source_name")
transformation_rules = dbutils.widgets.get("transformation_rules")

# Convert transformation rules to JSON (if provided)
import json
try:
    rules = json.loads(transformation_rules)
except Exception as e:
    rules = {}

### Step 2: Read Data from Bronze Layer
This step reads the Sales data from the Bronze layer.


In [0]:
source_path = "bronze"
business_unit = "Sales"
source_name = "SalesTable"

In [0]:
# Build the file path for the Bronze layer file (assuming Parquet format)
spark.conf.set(
    "fs.azure.account.key.stgdatapoc.dfs.core.windows.net",
    "QD5wJQ6uA9+sdAdrFma1Qvn/GhfQN5Ivs7rIqUxUy4tfS//N+TPaeU1xpEWMHJaKtvzndAa2EHGj+ASt32QmjQ=="
)
bronze_file = f"abfss://{source_path}@stgdatapoc.dfs.core.windows.net/{business_unit}/{source_name}.parquet"

# Read the data from the Bronze layer
df = spark.read.parquet(bronze_file)


### Step 3: Apply Sales-Specific Transformations
This step applies both the common transformations and Sales-specific business logic.


In [0]:
# Apply common transformations: remove duplicates
df_transformed = df.dropDuplicates()

# Sales-specific transformation: if business_unit is Sales, apply extra logic
if business_unit.lower() == "sales":
    from pyspark.sql.functions import col
    # Example: convert 'amount' from USD to EUR using a conversion rate from transformation rules or default value
    conversion_rate = rules.get("conversion_rate", 0.85)
    df_transformed = df_transformed.withColumn("amount_eur", col("amount") * conversion_rate)
    
    # Filter records with a valid 'amount'
    df_transformed = df_transformed.filter(col("amount") > 0)


### Step 4: Write Transformed Data to Silver Layer
Write the transformed Sales data to the Silver layer.


In [0]:
# Build the file path for the Silver layer output
silver_file = f"abfss://{destination_path}@stgdatapoc.dfs.core.windows.net/{business_unit}/{source_name}.parquet"

# Write the transformed data to the Silver layer (overwrite mode)
df_transformed.write.mode("overwrite").parquet(silver_file)

# Exit the notebook indicating success
dbutils.notebook.exit("Success")
