## Text Normalization & Structuring

In [0]:
from pyspark.sql.functions import col, regexp_replace, trim
from pyspark.sql import functions as F
from pyspark.sql.window import Window

catalog_name = "financial_data"
schema_name = "lakehouse"

bronze_table = f"{catalog_name}.{schema_name}.bronze_filings"
silver_table = f"{catalog_name}.{schema_name}.silver_filings"

In [0]:
# Load Bronze Data
bronze_df = spark.read.table(bronze_table)

In [0]:
# Standardize Column Names
df = bronze_df.select([col(c).alias(c.lower()) for c in bronze_df.columns])

In [0]:
# Clean Narrative Sections
text_columns = [c for c in df.columns if c.startswith("item_")]

cleaned_df = df

for col_name in text_columns:
    # remove HTML tags, collapse multiple spaces
    cleaned_df = cleaned_df.withColumn(
        col_name,
        trim(
            regexp_replace(
                regexp_replace(col(col_name), "<[^>]+>", ""),
                "\\s+", " "
            )
        )
    )

In [0]:
# Reshape Data: Row per Section
melt_cols = [col(c) for c in text_columns]

unpivot_df = cleaned_df.select(
    "cik", "company", "filing_date", "filing_type", "period_of_report", "ingest_timestamp",
    F.explode(
        F.array([
            F.struct(F.lit(c).alias("section"), col(c).alias("content"))
            for c in text_columns
        ])
    ).alias("section_data")
).select(
    "cik",
    "company",
    "filing_date",
    "filing_type",
    "period_of_report",
    "ingest_timestamp",
    col("section_data.section").alias("section_name"),
    col("section_data.content").alias("section_text"),
)


In [0]:
# Deduplicate Rows
window = Window.partitionBy("cik", "filing_date", "section_name") \
               .orderBy(F.col("ingest_timestamp").desc())

silver_dedup_df = (
    unpivot_df
    .withColumn("row_number", F.row_number().over(window))
    .filter("row_number = 1")
    .drop("row_number")
)

In [0]:
# Data Quality Validation
dq_df = silver_dedup_df.withColumn(
    "dq_missing_text", F.when(F.length("section_text") < 50, True).otherwise(False)
).withColumn(
    "dq_section_valid", F.when(F.col("section_name").rlike("item_\\d+[A-Z]?"), True).otherwise(False)
)

silver_clean_df = dq_df.filter(
    (F.col("dq_missing_text") == False) &
    (F.col("dq_section_valid") == True)
)

In [0]:
# Write Silver Table
(
    silver_clean_df.write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .saveAsTable(silver_table)
)