In [0]:
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
from pyspark.dbutils import DBUtils
from pyspark.sql import functions as F
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from datetime import date

spark=SparkSession.builder.appName("PROJECT").getOrCreate()

In [0]:
jdbcHostname = "dss-database.database.windows.net"
jdbcDatabase = "dss-db"
jdbcPort = 1433
jdbcUsername = "dss-database"
jdbcPassword = "**********"

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};user={jdbcUsername};password={jdbcPassword};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"


In [0]:
query = "(SELECT * FROM DailyClaimsRequests) AS df"

df=spark.read \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", query) \
    .option("user", jdbcUsername) \
    .option("password", jdbcPassword) \
    .load()

today_date=date.today().strftime('%Y-%m-%d')
df1=df.filter(F.col('EndDate')==today_date)
display(df1)

EndDate,Customer_ID,Medicine,Claim_Amount,Service_Date,Submission_Date,Status,Required_Docs,Insurance_Coverage,Remaining_Coverage
2024-09-28,U017,Metformin,70.0,2024-09-27,2024-09-28,submitted,,1500.0,200.0
2024-09-28,U022,Aspirin,1100.0,2024-06-03,2024-09-27,submitted,Yes,1000.0,100.0
2024-09-28,U049,Aspirin,800.0,2024-09-20,2024-09-21,submitted,,600.0,200.0
2024-09-28,U065,Ciprofloxacin,180.0,2024-09-20,2024-09-27,submitted,Yes,1000.0,200.0
2024-09-28,U087,Emtricitabine,111.0,2024-09-20,2024-09-24,submitted,Yes,2200.0,300.0


In [0]:
#Process Valid Claims

# Define the claimable medications list
medications_list = {
    'Cardiovascular Diseases': ['Atenolol', 'Lisinopril', 'Metoprolol'],
    'Diabetes': ['Metformin', 'Insulin', 'Glyburide', 'Glipizide'],
    'Pain Relief': ['Ibuprofen', 'Aspirin', 'Acetaminophen', 'Oxycodone'],
    'Respiratory Diseases': ['Albuterol', 'Fluticasone', 'Montelukast'],
    'Gastrointestinal': ['Omeprazole', 'Loperamide', 'Ranitidine'],
    'Antibiotics': ['Amoxicillin', 'Ciprofloxacin', 'Azithromycin']
}


medications_flat = [med for meds in medications_list.values() for med in meds]

today=F.current_date()

# Filter and process claims
processed_claims_df = df1.filter(
    (df1.Remaining_Coverage >= df1.Claim_Amount) &  # Enough remaining coverage
    (df1.Submission_Date >= df1.Service_Date) &  # Submission date must be after service date
    (df1.Submission_Date <= F.date_add(df1.Service_Date,90)) &  # Submission within 90 days
    (df1.Required_Docs == "Yes") &  # Required documents must be Yes
    (df1.Medicine.isin(medications_flat))  # Medicine must be claimable
)

# Update Remaining Coverage and Status
updated_claims_df = processed_claims_df.withColumn("Remaining_Coverage", 
    col("Remaining_Coverage") - col("Claim_Amount")
).withColumn("Status", 
    F.lit("Processed")
).withColumn("Processed_Date", 
    today
)

updated_claims_df1=updated_claims_df.select("Processed_Date","Customer_ID","Medicine","Claim_Amount","Service_Date", "Submission_Date","Status","Insurance_Coverage","Remaining_Coverage")

display(updated_claims_df1)

#Load Processed Claims to SQL Database
updated_claims_df1.write \
    .format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable","ProcessedClaims") \
    .option("user",jdbcUsername) \
    .option("password",jdbcPassword) \
    .mode("append") \
    .save()

Processed_Date,Customer_ID,Medicine,Claim_Amount,Service_Date,Submission_Date,Status,Insurance_Coverage,Remaining_Coverage
2024-09-28,U065,Ciprofloxacin,180.0,2024-09-20,2024-09-27,Processed,1000.0,20.0


In [0]:
#Filter and Categorize Rejected Claims

# Categorize claims and add today's date
categorized_claims_df = df1.withColumn("Status",
    F.when(col("Required_Docs").isNull(), "Rejected due to invalid documents. Do manual process at store.")
    .when(col("Submission_Date") > F.date_add(col("Service_Date"), 90), "Rejected due to outdated claim")
    .when(col("Remaining_Coverage") < col("Claim_Amount"), "Not Processed due to inadequate remaining coverage")
    .when(~col("Medicine").isin(medications_flat), "Rejected due to unclaimable medicine")
    .otherwise(col("Status"))
).withColumn("Today", F.current_date())  # Add today's date

# Filter out submitted claims
categorized_claims_df2 = categorized_claims_df.filter(col("Status") != "submitted")

# Display the filtered DataFrame
categorized_claims_df3=categorized_claims_df2.withColumnRenamed("Today","Rejected_Date")

categorized_claims_df4=categorized_claims_df3.select("Rejected_Date","Customer_ID","Medicine","Claim_Amount","Service_Date", "Submission_Date", "Status","Insurance_Coverage", "Remaining_Coverage")

display(categorized_claims_df4)

#Load Rejected Claims to SQL Database

categorized_claims_df4.write \
    .format("jdbc") \
    .option("url",jdbcUrl) \
    .option("dbtable","RejectedClaims") \
    .option("user",jdbcUsername) \
    .option("password",jdbcPassword) \
    .mode("append") \
    .save()

Rejected_Date,Customer_ID,Medicine,Claim_Amount,Service_Date,Submission_Date,Status,Insurance_Coverage,Remaining_Coverage
2024-09-28,U017,Metformin,70.0,2024-09-27,2024-09-28,Rejected due to invalid documents. Do manual process at store.,1500.0,200.0
2024-09-28,U022,Aspirin,1100.0,2024-06-03,2024-09-27,Rejected due to outdated claim,1000.0,100.0
2024-09-28,U049,Aspirin,800.0,2024-09-20,2024-09-21,Rejected due to invalid documents. Do manual process at store.,600.0,200.0
2024-09-28,U087,Emtricitabine,111.0,2024-09-20,2024-09-24,Rejected due to unclaimable medicine,2200.0,300.0
