In [0]:
%run "/Workspace/Users/shoyofromconcrete@gmail.com/claims_risk_leakage/config"





In [0]:
import logging
import time

logger = logging.getLogger(CONFIG["logging"]["logger_name"])
logger.setLevel(CONFIG["logging"]["log_level"])

if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)


In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
spark.sql(f"USE CATALOG {CONFIG['catalog']}")
spark.sql(f"USE SCHEMA {CONFIG['schemas']['bronze']}")


In [0]:
def build_s3_path(prefix):
    return f"s3://{CONFIG['s3']['bucket']}/{prefix}"

In [0]:
def read_stream(entity_name):

    entity_config = CONFIG["streaming"][entity_name]

    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", entity_config["file_format"])
            .option("cloudFiles.inferColumnTypes", str(entity_config["infer_schema"]).lower())
            .option("cloudFiles.schemaEvolutionMode", entity_config["schema_evolution_mode"])
            .option("cloudFiles.schemaLocation", entity_config["schema_location"])
            .load(
                build_s3_path(
                    CONFIG["s3"][f"{entity_name}_prefix"]
                )
            )
    )

In [0]:
fnol_df_raw   = read_stream("fnol")
claims_df_raw = read_stream("claims")
policy_df_raw = read_stream("policy")

In [0]:
def enrich_bronze(df, rename_claim=False):

    if rename_claim:
        df = df.withColumnRenamed("claim_id", "fnol_id")

    return (
        df
        .withColumn("source_file", col("_metadata.file_path"))
        .withColumn("ingest_date", current_date())
        .withColumn("start_ts", current_timestamp())
        .withColumn("end_ts", lit(None).cast("timestamp"))
        .withColumn("is_current", lit(1))
    )

In [0]:
fnol_df_bronze   = enrich_bronze(fnol_df_raw, rename_claim=True)
claims_df_bronze = enrich_bronze(claims_df_raw)
policy_df_bronze = enrich_bronze(policy_df_raw)

In [0]:
def build_metrics(df):

    return (
        df.groupBy("source_file")
          .agg(
              min("start_ts").alias("file_ingestion_time"),
              count("*").alias("record_count"),
              *[
                  sum(
                      when(col(c).isNull() | (trim(col(c)) == ""), 1)
                      .otherwise(0)
                  ).alias(f"{c}_null_count")
                  for c in df.columns
              ],
              sum(
                  when(col("_rescued_data").isNotNull(), 1)
                  .otherwise(0)
              ).alias("rescued_record_count")
          )
    )


In [0]:
fnol_metrics_df   = build_metrics(fnol_df_bronze)
claims_metrics_df = build_metrics(claims_df_bronze)
policy_metrics_df = build_metrics(policy_df_bronze)

In [0]:
import time
import traceback
import builtins



def write_stream(df, entity_name, table_name, mode="append", metrics=False):

    if metrics:
        checkpoint_path = CONFIG["streaming"][entity_name]["metrics_checkpoint"]
    else:
        checkpoint_path = CONFIG["streaming"][entity_name]["checkpoint_location"]

    try:
        start_time = time.time()

        logger.info(
            f"[START] Stream write | entity={entity_name} | table={table_name} | mode={mode}"
        )

        query = (
            df.writeStream
              .format("delta")
              .outputMode(mode)
              .option("checkpointLocation", checkpoint_path)
              .option("mergeSchema", "true")
              .trigger(availableNow=True)
              .toTable(table_name)
        )

        end_time = time.time()
        duration = builtins.round(end_time - start_time, 2)

        if CONFIG["logging"]["track_execution_time"]:
            logger.info(
                f"[END] Stream write | entity={entity_name} | table={table_name} | "
                f"{duration} seconds"

            )

        return query

    except Exception as e:
        logger.error(
            f"[ERROR] Stream write failed | entity={entity_name} | table={table_name}"
        )
        logger.error(traceback.format_exc()) 
        raise


In [0]:
fnol_query = write_stream(
    fnol_df_bronze,
    "fnol",
    CONFIG["tables"]["bronze"]["fnol"]
)

claims_query = write_stream(
    claims_df_bronze,
    "claims",
    CONFIG["tables"]["bronze"]["claims"]
)

policy_query = write_stream(
    policy_df_bronze,
    "policy",
    CONFIG["tables"]["bronze"]["policy"]
)

In [0]:
fnol_metrics_query = write_stream(
    fnol_metrics_df,
    "fnol",
    CONFIG["tables"]["bronze_metrics"]["fnol"],
    mode="complete",
    metrics=True
)

claims_metrics_query = write_stream(
    claims_metrics_df,
    "claims",
    CONFIG["tables"]["bronze_metrics"]["claims"],
    mode="complete",
    metrics=True
)

policy_metrics_query = write_stream(
    policy_metrics_df,
    "policy",
    CONFIG["tables"]["bronze_metrics"]["policy"],
    mode="complete",
    metrics=True
)