In [0]:
dbutils.widgets.text("process_date", "", "Processing Date")
date = dbutils.widgets.get("process_date")

dbutils.widgets.text("recipient_email", "", "Recipient Email")
recipient_email = dbutils.widgets.get("recipient_email")

In [0]:
#Import Libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import pandas as pd
from pandas import *
from functools import reduce

In [0]:
#Set Parameters
bronze_table = "workspace.`tegge-insurance-data`.claim_lines_bronze"
silver_table = "workspace.`tegge-insurance-data-silver`.claim_lines_silver"
silver_table_temp = "workspace.`tegge-insurance-data-silver`.claim_lines_silver_temp"
anomalies_table = "workspace.`tegge-insurance-data-anomalies`.claim_lines_silver_anomalies_"

In [0]:
claim_lines = spark.read.format("delta").table(bronze_table).where(f"load_timestamp LIKE '%{date}%'")

In [0]:
claim_lines = claim_lines.withColumn("silver_source", lit(bronze_table))\
    .withColumn("silver_load_timestamp", current_timestamp())\
    .withColumnRenamed("source", "bronze_source")\
    .withColumnRenamed("load_timestamp", "bronze_load_timestamp")

In [0]:
claim_lines = claim_lines.select(
    col("claim_line_id").cast("int").alias("claim_line_id"),
    col("claim_id").cast("int").alias("claim_id"),
    col("cpt_code").cast("int").alias("cpt_code"),
    col("diagnosis_code").cast("string").alias("diagnosis_code"),
    col("rev_code").cast("int").alias("rev_code"),
    col("units").cast("int").alias("units"),
    col("line_billed").cast("double").alias("line_billed"),
    col("line_allowed").cast("double").alias("line_allowed"),
    col("line_paid").cast("double").alias("line_paid"),
    col("denial_reason_code").cast("string").alias("denial_reason_code"),
    col("bronze_load_timestamp").cast("timestamp").alias("bronze_load_timestamp"),
    col("bronze_source").cast("string").alias("bronze_source"),
     col("silver_load_timestamp").cast("timestamp").alias("silver_load_timestamp"),
    col("silver_source").cast("string").alias("silver_source")
)

In [0]:
claim_lines = claim_lines.fillna("approved", subset = "denial_reason_code")

In [0]:
# List of columns to check for nulls (excluding 'denial_reason_code')
columns_to_check = [col for col in claim_lines.columns]

# Build filter condition: any column is null
null_condition = reduce(lambda a, b: a | b, [col(c).isNull() for c in columns_to_check])

#Save only nulls to nulls df and add anomaly column set to null
nulls = claim_lines.filter(null_condition)
nulls = nulls.withColumn("anomaly", lit("null"))

In [0]:
#Drop rows with nulls in non-nullable columns
claim_lines = claim_lines.filter(col("claim_line_id").isNotNull())

In [0]:
#Identify duplicates
dup_records = claim_lines.groupBy(col("claim_id"), col("claim_line_id")).count().filter(col("count") > 1).select("claim_id", "claim_line_id")
#Save duplicate records in dups df
dups = claim_lines.join(dup_records, on = ["claim_id", "claim_line_id"], how = "inner")\
    .withColumn("anomaly", lit("duplicate"))

In [0]:
claim_lines = claim_lines.dropDuplicates(["claim_id","claim_line_id"])

In [0]:
claim_lines.count()

In [0]:
anomalies = dups.unionAll(nulls).withColumn("load_timestamp", lit(date))

In [0]:
total_anomalies = anomalies.count()

#Checks for any anomalies before saving
if total_anomalies > 0:
    anomalies.write.format("delta").mode("overwrite").saveAsTable(anomalies_table)

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {silver_table} (
    claim_line_id INT NOT NULL,
    claim_id INT NOT NULL,
    cpt_code INT NOT NULL,
    diagnosis_code STRING NOT NULL,
    rev_code INT NOT NULL,
    units INT NOT NULL,
    line_billed DOUBLE NOT NULL,
    line_allowed DOUBLE NOT NULL,
    line_paid DOUBLE NOT NULL,
    denial_reason_code STRING NOT NULL,
    bronze_load_timestamp TIMESTAMP NOT NULL,
    bronze_source STRING NOT NULL,
    silver_load_timestamp TIMESTAMP NOT NULL,
    silver_source STRING NOT NULL
) USING DELTA
""")

In [0]:
claim_lines.write.format("delta").mode("overwrite").saveAsTable(silver_table_temp)

In [0]:
spark.sql(f"""
          MERGE INTO {silver_table} AS target
          using {silver_table_temp} AS source
          on target.claim_id = source.claim_id AND target.claim_line_id = source.claim_line_id
          WHEN MATCHED THEN
          UPDATE SET *
          WHEN NOT MATCHED THEN
          INSERT *
          """)

In [0]:
spark.sql(f"DROP TABLE {silver_table_temp}")

In [0]:
# import smtplib
# from email.mime.text import MIMEText
# from email.mime.multipart import MIMEMultipart

# # Email configuration (customize these values)
# smtp_server = "smtp.example.com"
# smtp_port = 587
# smtp_user = "stegge"
# smtp_password = "your_password"
# sender_email = "samuel.tegge@sogeti.com"
# recipient_email = recipient_email
# subject = f"Anomalies Detected in claim_lines_silver for {date}"

# anomaly_table_name = f"{anomalies_table}{date.replace('-', '_')}"

# # Group and summarize anomalies by type
# df_anomaly_counts = anomalies.groupBy("anomaly").count().collect()

# if total_anomalies > 0:
#     body = ""
#     for row in df_anomaly_counts:
#         anomaly_type = row[0]
#         anomaly_count = row[1]
#         body += f"""
# Anomaly: {anomaly_type.capitalize()}
# Total: {anomaly_count}
# Table: {anomaly_table_name}
# """
#     msg = MIMEMultipart()
#     msg["From"] = sender_email
#     msg["To"] = recipient_email
#     msg["Subject"] = subject
#     msg.attach(MIMEText(body, "plain"))

#     try:
#         with smtplib.SMTP(smtp_server, smtp_port) as server:
#             server.starttls()
#             server.login(smtp_user, smtp_password)
#             server.sendmail(sender_email, recipient_email, msg.as_string())
#         print(f"Email sent to {recipient_email}.")
#     except Exception as e:
#         print(f"Failed to send email: {e}")