In [0]:
from datetime import datetime , timedelta
from pyspark.sql import Window
from pyspark.sql.functions import col, lit, input_file_name, current_timestamp,regexp_extract, to_date , explode , from_json
from pyspark.sql.types import ArrayType , StringType , StructField , StructType
dbutils.widgets.text("start_date", datetime.now().strftime("%Y-%m-%d"),"Start date")
dbutils.widgets.text("end_date", datetime.now().strftime("%Y-%m-%d"),"End date")

dbutils.widgets.text("mode", "INCREMENTAL","mode")

start_date_str = dbutils.widgets.get("start_date")
end_date_str = dbutils.widgets.get("end_date")
mode = dbutils.widgets.get("mode")

date_format = "%Y-%m-%d"

start_date = datetime.strptime(start_date_str, date_format).date()
end_date = datetime.strptime(end_date_str, date_format).date()

if start_date > end_date:
    raise ValueError(f"CRITICAL CONFIG ERROR: Start Date ({start_date}) is after End Date ({end_date}). Please check your parameters.")


In [0]:


ACCESS_KEY = dbutils.secrets.get(scope = "ticker", key = "access_key")
SECRET_KEY = dbutils.secrets.get(scope = "ticker", key = "secret_key")
SESSION_TOKEN = dbutils.secrets.get(scope = "ticker", key = "session_key")

temp_ak = dbutils.jobs.taskValues.get(taskKey="Init_Auth", key="temp_ak", debugValue="debug-key")
temp_sk = dbutils.jobs.taskValues.get(taskKey="Init_Auth", key="temp_sk", debugValue="debug-secret")
temp_token = dbutils.jobs.taskValues.get(taskKey="Init_Auth", key="temp_token", debugValue="debug-token")




# Architecture Overview: Distributed Quality Gate & Environment Constraints

## 1. Environment Constraints
This pipeline operates under strict environmental constraints:
- **No Instance Profiles**: Unable to assign IAM roles directly to clusters.
- **Shared Compute**: Restricted access to cluster-level configurations (`spark.conf.set`) and prohibited mount points.
- **Legacy Catalog**: Reliance on `hive_metastore` due to Unity Catalog absence.

## 2. Strategic Solution: Session-Based Access
To bypass these limitations, we implement **Session-Based Authentication**:
- **Mechanism**: AWS credentials (ID/Secret/Token) are injected securely into the options of individual Reader and Writer objects.
- **Outcome**: This allows direct, isolated S3 access (`s3a://`) without requiring cluster-level privileges.

---

## 3. Distributed Quality Gate Implementation

We implement a **Distributed Validation Framework** to ensure data quality at scale:
1.  **Contract Definition**: A reference SQL schema defines the expected types and constraints.
2.  **Alignment**: Incoming data is cast to match the contract, with missing columns filled as NULL.
3.  **Validation Logic**: Row-level checks identify type mismatches or constraint violations.
4.  **Routing**: 
    - **Valid Rows** -> Silver Layer (Idempotent Merge)
    - **Invalid Rows** -> Quarantine Layer (Append Only for audit)

---

### A. Setup & Schema Definition
```python
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, lit, when, concat_ws
from delta.tables import *

# 1. Configuration
ACCESS_KEY = dbutils.secrets.get(scope="ticker", key="access_key")
SECRET_KEY = dbutils.secrets.get(scope="ticker", key="secret_key")
SESSION_TOKEN = dbutils.secrets.get(scope="ticker", key="session_key")

# 2. Define the "Contract" (Target Schema) using SQL
# We use a managed table just to hold the definition.
spark.sql("""
CREATE TABLE IF NOT EXISTS company_financials_schema_holder (
    date DATE NOT NULL,
    symbol STRING NOT NULL,
    revenue BIGINT NOT NULL,
    fiscalYear INT,
    eps DECIMAL(10, 4),
    -- ... add all other columns ...
    reportedCurrency STRING
) USING DELTA
""")

target_schema = spark.table("company_financials_schema_holder").schema

In [0]:
# 1. LOAD MASTER SCHEMA
target_schema = spark.table("company_financials_master_def").schema
base_path = "s3a://mzon-to-databricks-5482/bronze/source=fmp/"
# 1. Always read from the Base Path (The Root)
# Delta will automatically look at _delta_log to find the files.
df_bronze_raw = (spark.read
      .format("delta")
       .option("fs.s3a.access.key", temp_ak)
      .option("fs.s3a.secret.key", temp_sk)
      .option("fs.s3a.session.token", temp_token)
      .option("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
      .option("dateFormat", "yyyy-MM-dd") \
      .option("mode" , "PERMESSIVE")
      .option("dateFormat", "yyyy-MM-dd") 
      .option("columnNameOfCorruptRecord" , "_rescued_data")
      .load(base_path)  # <--- No wildcards, no date=... loops
)

# 2. Apply "Pushdown Predicate" (The Filter)
# Spark sends this logic to the Delta Log BEFORE reading data.
if mode == "INCREMENTAL":
    print(f"Filtering for range: {start_date} to {end_date}")
    df_bronze = df_bronze_raw.filter(
        (col("date") >= lit(start_date)) & 
        (col("date") <= lit(end_date))
    )
else:
    # FULL_RELOAD: Just take everything
    print("Full Reload: Reading all history.")
    df_bronze = df_bronze_raw

# 3. Verify
df_bronze.printSchema()
print(f"Row Count: {df_bronze.count()}")

In [0]:
def schema_generator(schema):
    schema_modified = schema.add("_corrupt_record", StringType(), True)
    json_schema = ArrayType(schema_modified)
    return schema_modified , json_schema

In [0]:

bronze_schema = spark.table("bronze_schema_holder").schema
bronze_schema_modified, bronze_schema_json = schema_generator(bronze_schema)

income_statement_schema = spark.table("income_staement_silver_schema_holder").schema
income_statement_schema_modified, income_statement_schema_json = schema_generator(income_statement_schema)

balance_sheet_statement_schema = spark.table("balance_sheet_staement_silver_schema_holder").schema
balance_sheet_statement_schema_modified, balance_sheet_statement_schema_json = schema_generator(balance_sheet_statement_schema)

cashflow_statement_schema = spark.table("cash_flow_staement_silver_schema_holder").schema
cashflow_statement_schema_modified, cashflow_statement_schema_json = schema_generator(cashflow_statement_schema)




In [0]:
from pyspark.sql.functions import col, lit, when, concat_ws,row_number
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType

def align_and_validate_strict(df: DataFrame, target_schema: StructType):
    """
    STRICT VERSION:
    - If a column is missing from the input DF, the row is marked BAD (Quarantined).
    - If a column exists but has bad data (Type mismatch), the row is marked BAD.
    """
    existing_cols = df.columns
    selected_cols = []

    for field in target_schema:
        if field.name in existing_cols:
            selected_cols.append(field.name)
        else:
            selected_cols.append(lit(None).try_cast(field.dataType).alias(field.name))

    df_aligned = df.select(*selected_cols)
    row_validation = []
    for field in target_schema:
        col_name = field.name
        col_type = field.dataType
        is_nullable = field.nullable
        if not is_nullable:
            is_valid_rule = col(col_name).isNotNull()
        err_ms = when(~is_valid_rule , lit(col_name)).otherwise(lit(None))
        row_validation.append(err_ms)

    df_scored = df_aligned.withColumn("_failed_cols" , concat_ws(",",*row_validation))
    return df_scored




The function `DeltaTable.isDeltaTable(spark, path)` creates a fresh connection to S3 to check for the `_delta_log` folder.

**The Problem**: This new connection does not know about the AWS keys (ACCESS_KEY, etc.) injected into the reader. It attempts an anonymous connection and is rejected by AWS.

**The Constraint**: On a Shared Cluster, global keys (`spark.conf.set`) are banned, making DeltaTable utilities effectively absent.

In [0]:
def parse_df(df,json_schema,statement):
    
    #handle icome statement normalization
    df = (df.withColumn("statement_type", lit(statement))
    .drop(statement)
   .withColumn("fiscal_year_norm", explode(from_json(col("value") , json_schema,options={"mode": "PERMISSIVE", "columnNameOfCorruptRecord": "_corrupt_record"})) ) ) 
    df = df.select(col("ingestion_timestamp"),col("source_file") , col("fiscal_year_norm.*"))
    return df



In [0]:
def clean_df(df):
    df_deduplicated = df
    window_spec = Window.partitionBy("symbol","date") \
                        .orderBy(col("ingestion_timestamp").desc())
    df_deduplicated = df_deduplicated.withColumn("_rank", row_number().over(window_spec)) \
                                    .filter(col("_rank") == "1") \
                                    .drop("_rank")
    return df_deduplicated


In [0]:
def validate_df(df,schema):
    df_evaluated = align_and_validate_strict(df, schema)
    #PERSIST TABLE is not supported on serverless compute. SQLSTATE: 0A000
    #df_evaluated.cache()
    valid_records = df_evaluated.filter(col("_failed_cols") == "").drop("_failed_cols")
    invalid_records = df_evaluated.filter((col("_failed_cols") != "") | (col("_corrupt_record").isNotNull()) )
    # df_evaluated.unpersist()
    return valid_records, invalid_records


In [0]:
df_bronze_valid_records, df_bronze_invalid_records = validate_df(df_bronze,bronze_schema_modified)


In [0]:

df_income_statement =parse_df(df_bronze_valid_records.filter("statement_type = 'income-statement'"),income_statement_schema_json,"income-statement")
df_income_statement.show()
df_income_statement = clean_df(df_income_statement)
df_income_statement.show()
df_income_statement_valid_records, df_income_statement_invalid_records = validate_df(df_income_statement,income_statement_schema_modified)
df_income_statement_valid_records = df_income_statement_valid_records.drop('_corrupt_record')
df_income_statement_valid_records.show()

df_income_statement_invalid_records.show()


In [0]:
df_balance_sheet =parse_df(df_bronze_valid_records.filter("statement_type = 'balance-sheet-statement'"),balance_sheet_statement_schema_json,"balance-sheet-statement")
df_balance_sheet.show()
df_balance_sheet = clean_df(df_balance_sheet)
df_balance_sheet.show()
df_balance_sheet_valid_records, df_balance_sheet_invalid_records = validate_df(df_balance_sheet,balance_sheet_statement_schema_modified)
df_balance_sheet_valid_records = df_balance_sheet_valid_records.drop('_corrupt_record')
df_balance_sheet_valid_records.show()
df_balance_sheet_invalid_records.show()

In [0]:
df_cashflow_statement =parse_df(df_bronze_valid_records.filter("statement_type = 'cash-flow-statement'"),cashflow_statement_schema_json,"cash-flow-statement")
df_cashflow_statement.show()
df_cashflow_statement = clean_df(df_cashflow_statement)
df_cashflow_statement.show()
df_cashflow_statement_valid_records, df_cashflow_statement_invalid_records = validate_df(df_cashflow_statement,cashflow_statement_schema_modified)
df_cashflow_statement_valid_records = df_cashflow_statement_valid_records.drop('_corrupt_record')
df_cashflow_statement_valid_records.show()
df_cashflow_statement_invalid_records.show()

In [0]:
# from delta.tables import *
# # 1. Define your paths and data
# silver_path = "s3a://mzon-to-databricks-5482/silver/income_statement/valid"

# if DeltaTable.isDeltaTable(spark, silver_path):
#     # MERGE (Upsert)
#     target_table = DeltaTable.forPath(spark, silver_path)
#     (target_table.alias("target")
#         .merge(
#             valid_records.alias("source"), 
#             "target.symbol = source.symbol AND target.date = source.date"
#         )
#         .whenMatchedUpdateAll()
#         .whenNotMatchedInsertAll()
#         .execute()
#     )
# else:
#     # INITIALIZE (Create)
#     (valid_records.write
#         .format("delta")
#         .mode("overwrite") 
#         .partitionBy("date")  # <--- CRITICAL: Partitioning Strategy
#         .save(silver_path)
#     )

### Write Strategy: Credential-Injection Overwrite

We use **Overwrite by Partition** with a `replaceWhere` condition. This achieves Idempotency while allowing us to explicitly pass the `fs.s3a.access.key` credentials in the `.write` options, bypassing the need for an Instance Profile.

In [0]:
def write_df(df,label,path,mode):
    if mode == "INCREMENTAL" : 
            (df.write
            .format("delta")
            .mode("overwrite")
            .partitionBy("date")
            # CRITICAL: This condition ensures we only overwrite the partitions present in the current batch
            .option("replaceWhere", f"date >= '{start_date}' AND date <= '{end_date}'")
            # INJECT CREDENTIALS AGAIN (Required for the Writer)
            .option("fs.s3a.access.key", temp_ak)
            .option("fs.s3a.secret.key", temp_sk)
            .option("fs.s3a.session.token", temp_token)
            .option("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")

            .save(path + f"/{label}"))
    else:
        (df.write
        .format("delta")
        .mode("overwrite")
        .partitionBy("date")
        # INJECT CREDENTIALS AGAIN (Required for the Writer)
    .option("fs.s3a.access.key", temp_ak)
            .option("fs.s3a.secret.key", temp_sk)
            .option("fs.s3a.session.token", temp_token)
        .option("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
        .option("overwriteSchema", "true")
        .save(path + f"/{label}")
    )



In [0]:
df_bronze_invalid_records.printSchema()
df_bronze_invalid_records.show()
bronze_path_invalid = "s3a://mzon-to-databricks-5482/bronze"
write_df(df_bronze_invalid_records,"invalid",bronze_path_invalid,"FULL")




In [0]:
silver_path_valid = "s3a://mzon-to-databricks-5482/silver/valid"
write_df(df_income_statement_valid_records,"income_statement",silver_path_valid,mode)
write_df(df_balance_sheet_valid_records,"balance_sheet",silver_path_valid,mode)
write_df(df_cashflow_statement_valid_records,"cashflow_statement",silver_path_valid,mode)

In [0]:
silver_path_invalid = "s3a://mzon-to-databricks-5482/silver/invalid"
write_df(df_income_statement_invalid_records,"income_statement",silver_path_invalid,"FULL")
write_df(df_balance_sheet_invalid_records,"balance_sheet",silver_path_invalid,"FULL")
write_df(df_cashflow_statement_invalid_records,"cashflow_statement",silver_path_invalid,"FULL")