In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
import os
import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
#Create Spark Session
spark = SparkSession.builder \
    .appName("ExploratoryAnalysis") \
    .getOrCreate()


In [None]:
#Plan Attriubutes CSV
df_planAttributes = spark.read.csv("raw_data/PlanAttributes.csv", header=True, inferSchema=True)

In [None]:
#Categorization of customers based on the market value
# Categories: 
#Bronze - 60%, Silver - 70%, Gold - 80%, Platinum - 90%, Catastrophic - below 60% sold to individual

In [None]:
df_IssuerValue= df_planAttributes[df_planAttributes['IssuerActuarialValue'] != '']
df_IssuerValue = df_planAttributes.select('StateCode', 'IssuerId', 'ServiceAreaId', 'IssuerActuarialValue', 'MarketCoverage')
df_IssuerValue = df_IssuerValue.withColumn("IssuerActuarialValue", F.regexp_replace(F.col("IssuerActuarialValue"), "%", ""))
df_IssuerValue = df_IssuerValue.withColumn("IssuerActuarialValue", F.col("IssuerActuarialValue").cast("int"))

#Categories Gold,Bronze,Silver,Platinum
df_bronze = df_IssuerValue.filter((F.col("IssuerActuarialValue") >= 60) & (F.col("IssuerActuarialValue") < 70))
df_bronze = df_bronze.groupBy("StateCode").agg(round(mean("IssuerActuarialValue"), 2).alias("Bronze"))

df_Silver = df_IssuerValue.filter((F.col("IssuerActuarialValue") >= 70) & (F.col("IssuerActuarialValue") < 80))
df_Silver = df_Silver.groupBy("StateCode").agg(round(mean("IssuerActuarialValue"),2).alias("Silver"))

df_Gold= df_IssuerValue.filter((F.col("IssuerActuarialValue") >= 80) & (F.col("IssuerActuarialValue") < 90))
df_Gold= df_Gold.groupBy("StateCode").agg(round(mean("IssuerActuarialValue"),2).alias("Gold"))

df_Platinum= df_IssuerValue.filter((F.col("IssuerActuarialValue") >= 90) & (F.col("IssuerActuarialValue") <= 100))
df_Platinum= df_Platinum.groupBy("StateCode").agg(round(mean("IssuerActuarialValue"),2).alias("Platinum"))

df_catastrophic= df_IssuerValue.filter(col("IssuerActuarialValue") < 60)
df_catastrophic = df_catastrophic.groupBy("StateCode").agg(round(mean("IssuerActuarialValue"),2).alias("Catastrophic"))

In [None]:
#Comdine all the Catergories in order to make a new dataset
from functools import reduce
from pyspark.sql import DataFrame
def join_dfs(df1, df2, join_column_name="StateCode"):
    return df1.join(df2, on=join_column_name, how="outer")

all_tiers = [df_bronze, df_Silver,df_Gold,df_Platinum, df_catastrophic]
df_categories= reduce(join_dfs, all_tiers)

# Remove_List contains US regions like the Counry Colombia
remove_list = [
    "30751MT0560005", "30751MT0560006", "30751MT0560014", "30751MT0560015",
    "30751MT0560020", "30751MT0560021"
]

# Filter out the rows with the values in the remove_list from the StateCode column
df_categories = df_categories.filter(~col("StateCode").isin(remove_list))
df_categories.show()

In [None]:
#Graphic Map of US

In [None]:
#Join categories with map 
df_usGeo=spark.read.csv("geo_data/US_GeoCode.csv", header=True, inferSchema=True)

# Rename the 'StateCode' column in df_usGeo to 'region'
df_usGeo = df_usGeo.withColumnRenamed("StateCode", "region")
# Rename the 'StateCode' column in df_categories to 'region'
df_categories = df_categories.withColumnRenamed("StateCode", "region")

# Group by 'region' and compute average longitude and latitude
statename = df_usGeo.groupBy("region").agg(avg("longitude").alias("long"), avg("latitude").alias("lat"))

# Join the two DataFrames on the 'region' column
mapdata = df_categories.join(df_usGeo, on="region", how="left_outer")
mapdata_pd = mapdata.toPandas()

# Loading data and the US states shapefile
us_states_detailed = gpd.read_file("geo_data/tl_2022_us_state.shp")
mapdata_gdf = gpd.GeoDataFrame(mapdata_pd, geometry=gpd.points_from_xy(mapdata_pd.longitude, mapdata_pd.latitude))

# Ensure the CRS matches before spatial join
# Set the initial CRS for mapdata_gdf
mapdata_gdf.crs = "EPSG:4326"

#Transformation in order to match the US states shapefile's CRS
mapdata_gdf = mapdata_gdf.to_crs(us_states_detailed.crs)


# Join the data using a spatial join
merged_data = gpd.sjoin(us_states_detailed, mapdata_gdf, how="left", predicate="intersects")


In [None]:
# Catastrophic Map
fig, ax = plt.subplots(figsize=(20, 15))  # Adjust the size here for a bigger map

merged_data.boundary.plot(ax=ax, color='black', linewidth=0.5)
merged_data.plot(column='Catastrophic', ax=ax, legend=True, cmap='Reds', legend_kwds={'label': "Catastrophic Coverage"})

# Bounds to focus on the contiguous US
ax.set_xlim(-130, -60)
ax.set_ylim(20, 50)

# Centering the title
ax.set_title("Catastrophic Coverage in the US", fontsize=20, pad=20)

# Removing the axis for a cleaner look
ax.axis('off')
plt.show()

In [None]:
# Platinum Map
fig, ax = plt.subplots(figsize=(15, 10))  # Adjust the size here for a bigger map

merged_data.boundary.plot(ax=ax, color='black', linewidth=0.5)
merged_data.plot(column='Platinum', ax=ax, legend=True, cmap='Greens', legend_kwds={'label': "Platinum Coverage"})

# Bounds to focus on the contiguous US
ax.set_xlim(-130, -60)
ax.set_ylim(20, 50)

# Centering the title
ax.set_title("Platinum Coverage in the US", fontsize=20, pad=20)

# Removing the axis for a cleaner look
ax.axis('off')
plt.show()

In [None]:
# Bronze Map
fig, ax = plt.subplots(figsize=(15, 10))  # Adjust the size here for a bigger map

merged_data.boundary.plot(ax=ax, color='black', linewidth=0.5)
merged_data.plot(column='Bronze', ax=ax, legend=True, cmap='Oranges', legend_kwds={'label': "Bronze Coverage"})

# Bounds to focus on the contiguous US
ax.set_xlim(-130, -60)
ax.set_ylim(20, 50)

# Centering the title
ax.set_title("Bronze Coverage in the US", fontsize=20, pad=20)

# Removing the axis for a cleaner look
ax.axis('off')
plt.show()

In [None]:
# Gold Map
fig, ax = plt.subplots(figsize=(15, 10))  # Adjust the size here for a bigger map

merged_data.boundary.plot(ax=ax, color='black', linewidth=0.5)
merged_data.plot(column='Gold', ax=ax, legend=True, cmap='YlOrBr', legend_kwds={'label': "Gold  Coverage"})

# Bounds to focus on the contiguous US
ax.set_xlim(-130, -60)
ax.set_ylim(20, 50)

# Centering the title
ax.set_title("Gold  Coverage in the US", fontsize=20, pad=20)

# Removing the axis for a cleaner look
ax.axis('off')
plt.show()

In [None]:
#Silver Map
fig, ax = plt.subplots(figsize=(15, 10))  # Adjust the size here for a bigger map

merged_data.boundary.plot(ax=ax, color='black', linewidth=0.5)
merged_data.plot(column='Silver', ax=ax, legend=True, cmap='Greens', legend_kwds={'label': "Silver  Coverage"})

# Bounds to focus on the contiguous US
ax.set_xlim(-130, -60)
ax.set_ylim(20, 50)

# Centering the title
ax.set_title("Silver  Coverage in the US", fontsize=20, pad=20)

# Removing the axis for a cleaner look
ax.axis('off')
plt.show()

In [None]:
#Rate by coverage level
#Rate csv
df_rate=spark.read.csv("raw_data/Rate.csv", header=True, inferSchema=True)

In [None]:
#Subset of 2015 individual rate
df_rate2015 = df_rate.filter((df_rate.BusinessYear == 2015) & (df_rate.Age != "Family Option") & (df_rate.IndividualRate < 9000))
df_rate2015.show(5)

In [None]:
# Subset the planAttributes and clean PlanId
df_benefit = df_planAttributes.select(df_planAttributes.columns[114], df_planAttributes.columns[103],df_planAttributes.columns[161], df_planAttributes.columns[165], df_planAttributes.columns[169])
df_benefit = df_benefit.withColumn("PlanId", F.substring(df_benefit.PlanId, 1, 14))
df_rate2015 = df_rate2015.withColumn("PlanId", df_rate2015.PlanId.cast("string"))

In [None]:
df_benefit.columns

In [None]:
# Clean MOOP columns
def clean_moop(column):
    return regexp_replace(regexp_replace(F.col(column), ",", ""), "\\$", "").cast("double")

df_benefit = df_benefit.withColumn("TEHBInnTier1IndividualMOOP", clean_moop("TEHBInnTier1IndividualMOOP"))
df_benefit = df_benefit.withColumn("TEHBInnTier2IndividualMOOP", clean_moop("TEHBInnTier2IndividualMOOP"))
df_benefit = df_benefit.withColumn("TEHBOutOfNetIndividualMOOP", clean_moop("TEHBOutOfNetIndividualMOOP"))

# Aggregate the benefit table
benefittouse = df_benefit.groupBy("PlanId", "MetalLevel")\
.agg(
    F.mean("TEHBInnTier1IndividualMOOP").alias("innettier1moop"),
    F.mean("TEHBInnTier2IndividualMOOP").alias("innettier2moop"),
    F.mean("TEHBOutOfNetIndividualMOOP").alias("outnetmoop")
)

In [None]:
# Join the benefit to the rates
df_planrates = df_rate2015.join(benefittouse, "PlanId", "inner")

# Group by state and metal level, then aggregate
bystatecoverage = df_planrates.groupBy("StateCode", "MetalLevel").agg(
    F.countDistinct("PlanId").alias("PlanOffered"),
    F.mean("IndividualRate").alias("MeanIndRate"),
    F.expr("percentile(IndividualRate, 0.5)").alias("MedianIndRate")  # median calculation
).orderBy(F.desc("PlanOffered"))

# Exclude dental coverage
medicalonly = bystatecoverage.filter((bystatecoverage.MetalLevel != "High") & (bystatecoverage.MetalLevel != "Low"))

In [None]:
medicalonly = df_planrates

# Aggregate data
plan_counts = medicalonly.groupBy("StateCode", "MetalLevel").count().toPandas()

plt.figure(figsize=(10,6))
sns.barplot(x='StateCode', y='count', hue='MetalLevel', data=plan_counts, errorbar=None)
plt.title("Number of Medical Plans Offered By Coverage Levels")
plt.xlabel("Coverage Metal Level")
plt.ylabel("Number of Plans Offered")
plt.show()

In [None]:
#Average Monthly Premium by Age

In [None]:

byagecoverage = df_planrates.groupBy("Age", "MetalLevel") \
    .agg(
        F.mean("IndividualRate").alias("MeanIndRate"),
        F.expr('percentile_approx(IndividualRate, 0.5)').alias('MedianIndRate')
    ) \
    .orderBy(F.desc("MeanIndRate"))

byagecoverage_pd = byagecoverage.toPandas()

plt.figure(figsize=(30,6))
sns.scatterplot(x='Age', y='MeanIndRate', hue='MetalLevel', data=byagecoverage_pd, palette='deep')
plt.title("Average Monthly Premium by Age")
plt.xlabel("Age")
plt.ylabel("Average Premium by Coverage")
plt.show()

In [None]:
#Premium Distribution by Coverage Levels in 2015

In [None]:
#Due to limit memory I only presented the 20% of the data
sample_fraction = 0.2
planrates_sampled = df_planrates.sample(False, sample_fraction)

# Identify columns of type TimestampType
timestamp_cols = [f.name for f in planrates_sampled.schema.fields if isinstance(f.dataType, TimestampType)]

# Convert to StringType
for col in timestamp_cols:
    planrates_sampled = planrates_sampled.withColumn(col, planrates_sampled[col].cast(StringType()))

# Convert the Spark dataframe to Pandas
planrates_pd = planrates_sampled.toPandas()

# Convert the string date columns back to datetime64[ns] in Pandas
for col in timestamp_cols:
    planrates_pd[col] = planrates_pd[col].astype('datetime64[ns]')

plt.figure(figsize=(10,6))
sns.boxplot(x='MetalLevel', y='IndividualRate', data=planrates_pd)
sns.pointplot(x='MetalLevel', y='IndividualRate', data=planrates_pd, errorbar=None, estimator=np.mean, color='darkred')
plt.title("Individual Rate Distribution of Coverage")
plt.show()

In [None]:
#Determination the plans offered  across various US states

In [None]:
df_benefit = spark.read.csv("raw_data/BenefitsCostSharing.csv", header=True, inferSchema=True)
num_fields = len(df_benefit.columns)
print(f"Number of fields: {num_fields}")

# Remove empty or null rows
df_no_nulls = df_benefit.dropna()
for column in df_no_nulls.columns:
    df_no_nulls = df_no_nulls.filter(F.col(column) != "")  # Use F.col here

num_rows = df_benefit.count()
print(f"The CSV file has {num_rows} rows.")

In [None]:
# Subset for planRateBenefit
df_planRateBenefit = df_rate\
.filter((df_rate["IndividualRate"] < 9999) & (df_rate["Age"] != "Family Option"))\
.select("BusinessYear", "StateCode", "IssuerId", "PlanId", "Age", "IndividualRate", "IndividualTobaccoRate")

df_planRateBenefit.show()

In [None]:
df_StateCarrier = df_planRateBenefit.groupBy("StateCode", "IssuerId", "PlanId")\
.agg(count("StateCode").alias("count"))

df_StateCarrier.show()

In [None]:
#Graph
dataBenefitsName = df_benefit.select("BusinessYear", "IssuerId", "StandardComponentId", "StateCode", "BenefitName")
dataBenefitsName.show(5)

In [None]:
# Subset for VariousRatesPerState
VariousRatesPerState = df_rate\
.filter(
    (df_rate["Couple"].isNotNull()) | 
    (df_rate["PrimarySubscriberAndOneDependent"].isNotNull()) |
    (df_rate["PrimarySubscriberAndTwoDependents"].isNotNull()) |
    (df_rate["PrimarySubscriberAndThreeOrMoreDependents"].isNotNull()) |
    (df_rate["CoupleAndOneDependent"].isNotNull()) |
    (df_rate["CoupleAndTwoDependents"].isNotNull()) |
    (df_rate["CoupleAndThreeOrMoreDependents"].isNotNull()))\
.select("StateCode", "IndividualRate", "Couple", "PrimarySubscriberAndOneDependent", 
         "PrimarySubscriberAndTwoDependents", "PrimarySubscriberAndThreeOrMoreDependents", 
         "CoupleAndOneDependent", "CoupleAndTwoDependents", "CoupleAndThreeOrMoreDependents")
VariousRatesPerState.show()

In [None]:
TotalRatePerState = VariousRatesPerState.groupBy(
    "StateCode", "IndividualRate", "Couple", "PrimarySubscriberAndOneDependent", 
    "PrimarySubscriberAndTwoDependents", "PrimarySubscriberAndThreeOrMoreDependents", 
    "CoupleAndOneDependent", "CoupleAndTwoDependents", "CoupleAndThreeOrMoreDependents")\
.agg(count("StateCode").alias("count"))


TotalRatePerState.show()

In [None]:
# Melt operation 
melted_columns = ["IndividualRate", "Couple", "PrimarySubscriberAndOneDependent", 
                  "PrimarySubscriberAndTwoDependents", "PrimarySubscriberAndThreeOrMoreDependents", 
                  "CoupleAndOneDependent", "CoupleAndTwoDependents", "CoupleAndThreeOrMoreDependents"]
df_melt = TotalRatePerState.select("StateCode", F.expr('stack(' + str(len(melted_columns)) + ', ' + ', '.join(["'" + x + "', " + x for x in melted_columns]) + ') as (RateType, RateValue)'))
df_melt.show(5)

In [None]:
# PlanRatevsAge subset
PlanRatevsAge = df_rate.filter(
    (df_rate["IndividualRate"] < 9999) & 
    (df_rate["StateCode"].isNotNull()) & 
    (df_rate["Age"].isNotNull()) & 
    (df_rate["IndividualRate"].isNotNull())
).select("StateCode", "Age", "IndividualRate")

# Filter data for different age groups and drop duplicates based on 'IndividualRate'
TotRatevsAge1 = PlanRatevsAge.filter(PlanRatevsAge["Age"] <= 22).dropDuplicates(["IndividualRate"])
TotRatevsAge2 = PlanRatevsAge.filter((PlanRatevsAge["Age"] >= 23) & (PlanRatevsAge["Age"] <= 40)).dropDuplicates(["IndividualRate"])
TotRatevsAge3 = PlanRatevsAge.filter((PlanRatevsAge["Age"] >= 41) & (PlanRatevsAge["Age"] <= 59)).dropDuplicates(["IndividualRate"])
TotRatevsAge4 = PlanRatevsAge.filter(PlanRatevsAge["Age"] == "Family Option").dropDuplicates(["IndividualRate"])
TotRatevsAge5 = PlanRatevsAge.filter((PlanRatevsAge["Age"] >= 60) & (PlanRatevsAge["Age"] <= 64)).dropDuplicates(["IndividualRate"])
TotRatevsAge6 = PlanRatevsAge.filter(PlanRatevsAge["Age"] == "65 and over").dropDuplicates(["IndividualRate"])

In [None]:
# Convert a PySpark DataFrame to Pandas for plotting
pdf = df_melt.toPandas()

# Sort the DataFrame by 'StateCode'
pdf = pdf.sort_values(by='StateCode')

# Split the state codes into two halves
unique_states = sorted(pdf['StateCode'].unique())
half_length = len(unique_states) // 2

first_half_states = unique_states[:half_length]
second_half_states = unique_states[half_length:]

# Plot the first half
plt.figure(figsize=(15, 8))
sns.barplot(data=pdf[pdf['StateCode'].isin(first_half_states)], x='StateCode', y='RateValue', hue='RateType', order=first_half_states)
plt.title("Plot of Health insurance Plan rates")
plt.ylabel("Plan rates")
plt.xlabel("State Code")
plt.xticks(rotation=90)
plt.legend(title='Rate Plans')
plt.tight_layout()
plt.show()

In [None]:
# Plot the second half
plt.figure(figsize=(15, 8))
sns.barplot(data=pdf[pdf['StateCode'].isin(second_half_states)], x='StateCode', y='RateValue', hue='RateType', order=second_half_states)
plt.title("Plot of Health insurance Plan rates")
plt.ylabel("Plan rates")
plt.xlabel("State Code")
plt.xticks(rotation=90)
plt.legend(title='Rate Plans')
plt.tight_layout()
plt.show()

In [None]:
#Graphs per age
pdf1 = TotRatevsAge1.toPandas()
pdf2 = TotRatevsAge2.toPandas()
pdf3 = TotRatevsAge3.toPandas()
pdf4 = TotRatevsAge4.toPandas()
pdf5 = TotRatevsAge5.toPandas()
pdf6 = TotRatevsAge6.toPandas()
 #Function for creating all the plots be group age
def plot_rate_vs_age(data, title):
    plt.figure(figsize=(50, 10))
    sns.barplot(data=data, x='StateCode', y='IndividualRate', hue='Age')
    plt.ylabel("Individual Rates")
    plt.xlabel("State Code")
    plt.title(title)
    plt.xticks(rotation=90)
    plt.legend(title='Age')
    plt.tight_layout()
    plt.show()

In [None]:
#Age 0-22
plot_rate_vs_age(pdf1, "Plan Rate vs Age across various US States")

In [None]:
#Age 23-40
plot_rate_vs_age(pdf2, "Plan Rate vs Age across various US States")

In [None]:
#Age 41 to 59 
plot_rate_vs_age(pdf3, "Plan Rate vs Age across various US States")

In [None]:
#Age 60 to 64
plot_rate_vs_age(pdf5, "Plan Rate vs Age across various US States")

In [None]:
#Age 65 and over
plot_rate_vs_age(pdf6, "Plan Rate vs Age across various US States")

In [None]:
#Family edition
plot_rate_vs_age(pdf4, "Plan Rate vs Age across various US States")

In [None]:
#DentalCare System

In [None]:
#Prepare Plan Attributes with Dental data

In [None]:
#Keep only the columns with dental data
selected_columns = (
    df_planAttributes.columns[0:4] +
    [df_planAttributes.columns[5], df_planAttributes.columns[6]] +
    df_planAttributes.columns[9:15] +
    df_planAttributes.columns[16:21] +
    df_planAttributes.columns[24:28] +
    df_planAttributes.columns[29:50] +
    df_planAttributes.columns[54:125]
)
df_cleanPlan=df_planAttributes.select(selected_columns)

#Remove
rm_cols=["OutOfCountryCoverageDescription","OutOfServiceAreaCoverageDescription", "DiseaseManagementProgramsOffered","PlanEffictiveDate", "PlanExpirationDate", "OutOfCountryCoverage",
        "MedicalDrugMaximumOutofPocketIntegrated", "FirstTierUtilization","SecondTierUtilization","MEHBInnTier1IndividualMOOP","MEHBInnTier1FamilyMOOP","MEHBInnTier2IndividualMOOP",
        "MEHBInnTier2FamilyMOOP","MEHBOutOfNetIndividualMOOP","MEHBOutOfNetFamilyMOOP","MEHBCombInnOonIndividualMOOP",
         "MEHBCombInnOonFamilyMOOP","DEHBInnTier1IndividualMOOP","DEHBInnTier1FamilyMOOP","DEHBInnTier2IndividualMOOP","DEHBInnTier2FamilyMOOP","DEHBOutOfNetIndividualMOOP",
         "DEHBOutOfNetFamilyMOOP","DEHBCombInnOonIndividualMOOP","DEHBCombInnOonFamilyMOOP","TEHBInnTier1IndividualMOOP",                              
        "TEHBInnTier1FamilyMOOP","TEHBInnTier2IndividualMOOP","TEHBInnTier2FamilyMOOP","TEHBOutOfNetIndividualMOOP",                              
        "TEHBOutOfNetFamilyMOOP","TEHBCombInnOonIndividualMOOP","TEHBCombInnOonFamilyMOOP","MEHBDedInnTier1Individual",                               
        "MEHBDedInnTier1Family","MEHBDedInnTier1Coinsurance","MEHBDedInnTier2Individual","MEHBDedInnTier2Family","MEHBDedInnTier2Coinsurance",
        "MEHBDedOutOfNetIndividual","MEHBDedOutOfNetFamily","MEHBDedCombInnOonIndividual","MEHBDedCombInnOonFamily","DEHBDedInnTier1Individual",
         "DEHBDedInnTier1Family","DEHBDedInnTier1Coinsurance", "DEHBDedInnTier2Individual","DEHBDedInnTier2Family",                                   
        "DEHBDedInnTier2Coinsurance","DEHBDedOutOfNetIndividual","DEHBDedOutOfNetFamily","DEHBDedCombInnOonIndividual",                             
        "DEHBDedCombInnOonFamily","TEHBDedInnTier1Individual","TEHBDedInnTier1Family","TEHBDedInnTier1Coinsurance",                              
        "TEHBDedInnTier2Individual","TEHBDedInnTier2Family","TEHBDedInnTier2Coinsurance","TEHBDedOutOfNetIndividual",                              
        "TEHBDedOutOfNetFamily","TEHBDedCombInnOonIndividual","TEHBDedCombInnOonFamily","SBCHavingaBabyDeductible",                                
        "SBCHavingaBabyCopayment","SBCHavingaBabyCoinsurance","SBCHavingaBabyLimit","SBCHavingDiabetesDeductible",                            
        "SBCHavingDiabetesCopayment","SBCHavingDiabetesCoinsurance","SBCHavingDiabetesLimit",
        "FormularyId", "MedicalDrugDeductiblesIntegrated", "InpatientCopaymentMaximumDays",
        "SpecialtyDrugMaximumCoinsurance", "IsNoticeRequiredForPregnancy"]
df_cleanPlan=df_planAttributes.drop(*rm_cols)

In [None]:
df_cleanPlan=df_cleanPlan.withColumn("plan_id_short", 
                                     F.expr("substring(PlanId, 1, length(PlanId) - 3)"))
# Filter to get dental_only_list and not_dental_only_list
dental_only_list = df_cleanPlan.filter(df_cleanPlan.DentalOnlyPlan == "Yes").select("PlanId",df_cleanPlan.columns[2], df_cleanPlan.columns[3], df_cleanPlan.columns[4], df_cleanPlan.columns[11], df_cleanPlan.columns[33])

# Count unique PlanId
print(dental_only_list.select("PlanId").distinct().count())

In [None]:
#Prepare 'Benefits of Cost Sharing' with Dental data

In [None]:
#Load BenefitsCostSharing
df_costShare = spark.read.csv("raw_data/BenefitsCostSharing.csv", header=True, inferSchema=True)

In [None]:
column_positions = {column: index for index, column in enumerate(df_costShare.columns)}
print(column_positions)

In [None]:
all_columns = df_costShare.columns
select_bnf_cols =df_costShare.columns[0:8] + df_costShare.columns[9:31]
df_cleanbnf=df_costShare.select(select_bnf_cols)
df_cleanbnf.printSchema()

In [None]:
#Linked benefits to dentalcare

In [None]:
#General examination
df_cleanbnf = df_cleanbnf.withColumn(
    "routineexam",
    F.when(
        (F.col("BenefitName").rlike("Prophylaxis|Oral Exam|Cleaning|Dental Check-Up for Children")) & 
        (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(None)
)

# Summary table
general_exams = df_cleanbnf.filter(F.col("routineexam") > 0).groupBy("BenefitName").count()
general_exams = general_exams.withColumnRenamed("count", "Covering Plans")
general_exams.show()

In [None]:
#Basic Denatalcare
df_cleanbnf = df_cleanbnf.withColumn(
    "basicdental",
    F.when(
        (F.col("BenefitName").rlike("Basic Dental Care|Routine Dental Services")) & 
        (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(
        F.when(
            (F.col("BenefitName").rlike("Non-Routine")) & 
            (F.col("IsCovered") == "Covered"),
            0
        ).otherwise(None)
    )
)

# Summary table
basic_dental = df_cleanbnf.filter(F.col("basicdental") > 0).groupBy("BenefitName").count()
basic_dental = basic_dental.withColumnRenamed("count", "Covering Plans")
basic_dental.show()


In [None]:
#Basic Dental Adult
if 'basicdental_adult' not in df_cleanbnf.columns:
    df_cleanbnf = df_cleanbnf.withColumn('basicdental_adult', F.lit(0))

# Your existing code
df_cleanbnf = df_cleanbnf.withColumn(
    "basicdental_adult", 
    F.when(
        (F.col("BenefitName").like("%Basic Dental Care - Adult%")) & (F.col("IsCovered") == "Covered"), 
        1
    ).otherwise(F.col("basicdental_adult"))
)
# Summary table
basicdental_adult = df_cleanbnf.filter(F.col("basicdental_adult") > 0).groupBy("BenefitName").count()
basicdental_adult = basic_dental.withColumnRenamed("count", "Covering Plans")
basicdental_adult.show()

In [None]:
#Major Dentalcare
df_cleanbnf = df_cleanbnf.withColumn(
    "majordental",
    F.when(
        (F.col("BenefitName").rlike("Major Dental Care|Non-Routine")) & 
        (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(None)
)

# Summary table
major_dental = df_cleanbnf.filter(F.col("majordental") > 0).groupBy("BenefitName").count()
major_dental = major_dental.withColumnRenamed("count", "Covering Plans")
major_dental.show()

In [None]:
#X-Rays
df_cleanbnf = df_cleanbnf.withColumn(
    "xrays",
    F.when(
        (F.col("BenefitName").rlike("X-ray|X-Ray|X ray|X Ray")) & 
        (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(
        F.when(
            F.col("BenefitName") == "X-rays and Diagnostic Imaging",
            0
        ).otherwise(None)
    )
)

# Summary table
xrays = df_cleanbnf.filter(F.col("xrays") > 0).groupBy("BenefitName").count()
xrays = xrays.withColumnRenamed("count", "Covering Plans")
xrays.show()

In [None]:
#Fluoride Treatment
df_cleanbnf = df_cleanbnf.withColumn(
    'fluoride',
    F.when(
        (F.col('BenefitName').rlike('Flouride|Fluoride|fluoride')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)

# Summary table
fluoride = df_cleanbnf.filter(F.col("fluoride") > 0).groupBy("BenefitName").count()
fluoride = fluoride.withColumnRenamed("count", "Covering Plans")
fluoride.show()

In [None]:
#Extranctions
df_cleanbnf = df_cleanbnf.withColumn(
    'extract',
    F.when(
        (F.col('BenefitName').rlike('Extraction|extraction|removal|Removal')) & (F.col('IsCovered') == 'Covered') & (F.col('BenefitName') != 'Breast Implant Removal'),
        1
    ).otherwise(None)
)
# Summary table
extract = df_cleanbnf.filter(F.col("extract") > 0).groupBy("BenefitName").count()
extract = extract.withColumnRenamed("count", "Covering Plans")
extract.show()

In [None]:
#Root Canals
df_cleanbnf = df_cleanbnf.withColumn(
    'rootcanal',
    F.when(
        (F.col('BenefitName').rlike('Root Canal|root canal|Root canal|Endodonti')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
rootcanal = df_cleanbnf.filter(F.col('rootcanal') > 0).groupBy('BenefitName').count()
rootcanal = rootcanal.withColumnRenamed('count', 'Covering Plans')
rootcanal.show()

In [None]:
#Sealants
df_cleanbnf = df_cleanbnf.withColumn(
    'sealant',
    F.when(
        (F.col('BenefitName').rlike('Sealant|sealant')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
sealant = df_cleanbnf.filter(F.col('sealant') > 0).groupBy('BenefitName').count()
sealant = sealant.withColumnRenamed('count', 'Covering Plans')
sealant.show()

In [None]:
# Fillings
df_cleanbnf = df_cleanbnf.withColumn(
    'fillings',
    F.when(
        (F.col('BenefitName').rlike('restorative|Restorative|Amalgam|Filling')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
fillings = df_cleanbnf.filter(F.col('fillings') > 0).groupBy('BenefitName').count()
fillings = fillings.withColumnRenamed('count', 'Covering Plans')
fillings.show()

In [None]:
# Periodontics
df_cleanbnf = df_cleanbnf.withColumn(
    'periodontics',
    F.when(
        (F.col('BenefitName').rlike('Periodont|periodont|Scaling|Planing')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
periodontics = df_cleanbnf.filter(F.col('periodontics') > 0).groupBy('BenefitName').count()
periodontics = periodontics.withColumnRenamed('count', 'Covering Plans')
periodontics.show()

In [None]:
#Orthodontia
df_cleanbnf = df_cleanbnf.withColumn(
    'orthodontia',
    F.when(
        (F.col('BenefitName').rlike('Orthognatic|orthodont|Orthodont|Occlusal|Ocalusal|occlusal')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
orthodontia = df_cleanbnf.filter(F.col('orthodontia') > 0).groupBy('BenefitName').count()
orthodontia = orthodontia.withColumnRenamed('count', 'Covering Plans')
orthodontia.show()

In [None]:
#Dentures or Partials
df_cleanbnf = df_cleanbnf.withColumn(
    'denture',
    F.when(
        (F.col('BenefitName').rlike('Denture|denture|Bridge|Dentiures')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
denture = df_cleanbnf.filter(F.col('denture') > 0).groupBy('BenefitName').count()
denture = denture.withColumnRenamed('count', 'Covering Plans')
denture.show()

In [None]:
#Specialist Gum Procedures
df_cleanbnf = df_cleanbnf.withColumn(
    'gums',
    F.when(
        (F.col('BenefitName').rlike('Gingivec')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
gums = df_cleanbnf.filter(F.col('gums') > 0).groupBy('BenefitName').count()
gums = gums.withColumnRenamed('count', 'Covering Plans')
gums.show()

In [None]:
#Dental Anesthesia
df_cleanbnf = df_cleanbnf.withColumn(
    'anesthesia',
    F.when(
        (F.col('BenefitName').rlike('Dental Anesthesia')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
anesthesia = df_cleanbnf.filter(F.col('anesthesia') > 0).groupBy('BenefitName').count()
anesthesia = anesthesia.withColumnRenamed('count', 'Covering Plans')
anesthesia.show()

In [None]:
#Crowns
df_cleanbnf = df_cleanbnf.withColumn(
    'crown',
    F.when(
        (F.col('BenefitName').rlike('Crown')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
crown = df_cleanbnf.filter(F.col('crown') > 0).groupBy('BenefitName').count()
crown = crown.withColumnRenamed('count', 'Covering Plans')
crown.show()

In [None]:
#Cosmetic Dentistry
df_cleanbnf = df_cleanbnf.withColumn(
    'cosmetic',
    F.when(
        (F.col('BenefitName').rlike('Cosmetic Ortho')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
cosmetic = df_cleanbnf.filter(F.col('cosmetic') > 0).groupBy('BenefitName').count()
cosmetic = cosmetic.withColumnRenamed('count', 'Covering Plans')
cosmetic.show()

In [None]:
#Cosmetic Dentistry
df_cleanbnf = df_cleanbnf.withColumn(
    'cosmetic',
    F.when(
        (F.col('BenefitName').rlike('Cosmetic Ortho')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
cosmetic = df_cleanbnf.filter(F.col('cosmetic') > 0).groupBy('BenefitName').count()
cosmetic = cosmetic.withColumnRenamed('count', 'Covering Plans')
cosmetic.show()

In [None]:
#Dental Surgery
df_cleanbnf = df_cleanbnf.withColumn(
    'surgery',
    F.when(
        (F.col('BenefitName').rlike('Oral Surgery|Orthognatic Treatment/Surgery|Osseous Surgery')) & (F.col('IsCovered') == 'Covered'),
        1
    ).otherwise(None)
)
# Summary table
surgery = df_cleanbnf.filter(F.col('surgery') > 0).groupBy('BenefitName').count()
surgery = surgery.withColumnRenamed('count', 'Covering Plans')
surgery.show()

In [None]:
#Dental benefits summary

In [None]:
ben_summary = df_cleanbnf.groupBy('PlanId', 'IssuerId').agg(
    F.sum('xrays').alias('xrays'),
    F.sum('fluoride').alias('fluoride'),
    F.sum('majordental').alias('majordental'),
    F.sum('extract').alias('extract'),
    F.sum('rootcanal').alias('rootcanal'),
    F.sum('sealant').alias('sealant'),
    F.sum('routineexam').alias('routineexam'),
    F.sum('basicdental').alias('basicdental'),
    F.sum('basicdental_adult').alias('basicdental_adult'),
    F.sum('fillings').alias('fillings'),
    F.sum('periodontics').alias('periodontics'),
    F.sum('orthodontia').alias('orthodontia'),
    F.sum('denture').alias('denture'),
    F.sum('gums').alias('gums'),
    F.sum('anesthesia').alias('anesthesia'),
    F.sum('crown').alias('crown'),
    F.sum('cosmetic').alias('cosmetic'),
    F.sum('surgery').alias('surgery')
)
ben_summary.show(5)

In [None]:
#The amount a patient pays out of pocket for a visit to the dentist.
#If it's a flat dollar amount, that's a copay. 
#If it's a percentage of the charges, it's coinsurance. Together, this is "cost sharing" in insurance parlance.

In [None]:
ben_costs=df_cleanbnf.join(ben_summary.select("PlanId"),on="PlanId", how="inner").distinct()

In [None]:
# Clean and convert Copay strings to numeric
for column in ["CopayInnTier1", "CopayInnTier2", "CopayOutofNet"]:
    # Remove unwanted substrings
    for substring in ["Copay before deductible", "Copay after deductible", "Copay per Stay", "Copay per Day"]:
        ben_costs = ben_costs.withColumn(column, F.expr(f"TRIM(REPLACE({column}, '{substring}', ''))"))
    # Remove commas and dollar signs
    ben_costs = ben_costs.withColumn(column, F.expr(f"TRIM(REPLACE(REPLACE({column}, ',', ''), '$', ''))"))
    # Convert to numeric
    ben_costs = ben_costs.withColumn(column + "_num", ben_costs[column].cast(DoubleType()))
    # Handle special values
    ben_costs = ben_costs.withColumn(column + "_num", F.when(ben_costs[column].contains("No Charge") | (ben_costs[column] == "") | (ben_costs[column] == "0%"), 0).otherwise(ben_costs[column + "_num"]))

In [None]:
# Generate summary tables
ben_costs.select("CopayInnTier1_num", "CopayInnTier2_num", "CopayOutofNet_num").summary().show()

In [None]:
# Histogram for CoinsINNTier1_num
ben_costs1 = ben_costs.filter((ben_costs.CoinsInnTier1_num < 1000) & (ben_costs.CoinsInnTier1_num > 0))
ben_costs1_pd = ben_costs1.select("CoinsInnTier1_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
# Histogram for CoinsINNTier2_num
ben_costs1 = ben_costs.filter((ben_costs.CoinsInnTier2_num < 1000) & (ben_costs.CoinsInnTier2_num > 0))
ben_costs1_pd = ben_costs1.select("CoinsInnTier2_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
# Filter and plot for CoinsOutofNet_num
ben_costs1 = ben_costs.filter((ben_costs.CoinsOutofNet_num< 1000) & (ben_costs.CoinsOutofNet_num > 0))
ben_costs1_pd = ben_costs1.select("CoinsOutofNet_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
# Filter and plot for CopayInnTier1_num
ben_costs1 = ben_costs.filter((ben_costs.CopayInnTier1_num< 1000) & (ben_costs.CopayInnTier1_num > 0))
ben_costs1_pd = ben_costs1.select("CopayInnTier1_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
# Filter and plot for CopayInnTier2_num
ben_costs1 = ben_costs.filter((ben_costs.CopayInnTier2_num< 1000) & (ben_costs.CopayInnTier2_num > 0))
ben_costs1_pd = ben_costs1.select("CopayInnTier2_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
# Filter and plot for CopayOutofNet_num
ben_costs1 = ben_costs.filter((ben_costs.CopayOutofNet_num< 1000) & (ben_costs.CopayOutofNet_num> 0))
ben_costs1_pd = ben_costs1.select("CopayOutofNet_num").toPandas()
ben_costs1_pd.plot(kind='hist', bins=range(0, 1000, 5), edgecolor='black')

In [None]:
#Benefit of cost sharing for the dental categories
#Basic Dental
ben_costs = ben_costs.withColumn(
    "basicdental", 
    F.when(
        (F.col("BenefitName").rlike("Basic Dental Care|Routine Dental Services")) & (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(0)
)

#Basicdental_adult
ben_costs = ben_costs.withColumn(
    "basicdental_adult", 
    F.when(
        (F.col("BenefitName").rlike("Basic Dental Care|Adult")) & (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(0)
)
#Majordental
ben_costs = ben_costs.withColumn(
    "majordental", 
    F.when(
        (F.col("BenefitName").rlike("Major Dental Care")) & (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(0)
)

#Fillings
ben_costs = ben_costs.withColumn(
    "fillings", 
    F.when(
        (F.col("BenefitName").rlike("restorative|Restorative|Amalgam|Filling")) & (F.col("IsCovered") == "Covered"),
        1
    ).otherwise(0)
)

#Extract
ben_costs = ben_costs.withColumn(
    "extract", 
    F.when(F.col("BenefitName") == "Breast Implant Removal", 0).otherwise(F.col("extract"))
)

In [None]:
# Defination the common condition expressions to reuse
condition_basicdental = F.col("basicdental") == 1
condition_basicdental_adult = F.col("basicdental_adult") == 1
condition_majordental = F.col("majordental") == 1
condition_fillings = F.col("fillings") == 1
condition_extract = F.col("extract") == 1

agg_exprs = []

In [None]:
# Helper function to add aggregation expressions for Copay
def add_agg_for_copay(condition, alias_prefix):
    agg_exprs.extend([
        F.sum(F.when(condition, F.col("CopayInnTier1_num")).otherwise(0)).alias(f"{alias_prefix}_copay_inn_1"),
        F.sum(F.when(condition, F.col("CopayInnTier2_num")).otherwise(0)).alias(f"{alias_prefix}_copay_inn_2"),
        F.sum(F.when(condition, F.col("CopayOutofNet_num")).otherwise(0)).alias(f"{alias_prefix}_copay_oon"),
    ])

# Helper function to add aggregation expressions for Coinsurance
def add_agg_for_coinsurance(condition, alias_prefix):
    agg_exprs.extend([
        F.sum(F.when(condition, F.col("CoinsINNTier1_num")).otherwise(0)).alias(f"{alias_prefix}_coin_inn_1"),
        F.sum(F.when(condition, F.col("CoinsINNTier2_num")).otherwise(0)).alias(f"{alias_prefix}_coin_inn_2"),
        F.sum(F.when(condition, F.col("CoinsOutofNet_num")).otherwise(0)).alias(f"{alias_prefix}_coin_oon"),
    ])

In [None]:
# Using helper functions to add aggregation expressions
add_agg_for_copay(condition_basicdental, "basic")
add_agg_for_copay(condition_basicdental_adult, "basic_a")
add_agg_for_copay(condition_majordental, "majordental")
add_agg_for_copay(condition_fillings, "fillings")
add_agg_for_copay(condition_extract, "extract")

add_agg_for_coinsurance(condition_basicdental, "basic")
add_agg_for_coinsurance(condition_basicdental_adult, "basic_a")
add_agg_for_coinsurance(condition_majordental, "majordental")
add_agg_for_coinsurance(condition_fillings, "fillings")
add_agg_for_coinsurance(condition_extract, "extract")

# Grouping and Aggregation
ben_cost_summary = ben_costs.groupBy("PlanId", "IssuerId").agg(*agg_exprs)

In [None]:
# Performing the first merge (join)
dental_data = df_planAttributes.join(ben_summary, on=["PlanId", "IssuerId"], how="inner")

# Performing the second merge (join)
dental_data = dental_data.join(ben_cost_summary, on=["PlanId", "IssuerId"], how="inner")


#dental_data.summary().show()
print(dental_data.columns)

In [None]:
#Create Dental Data file

In [None]:
# Create a new column 'plan_id_short' using substring function
df_planAttributes_filtered = df_planAttributes.withColumn("plan_id_short", F.substring(F.col("PlanId"), 1, -4))


# Define the join keys
join_keys = ["PlanId", "IssuerId"]

# Perform the merge using the join keys
dental_data = df_planAttributes_filtered.join(ben_summary, on=join_keys, how="inner")

# Show the merged data
output_path = "output_files/dental_data.csv"
dental_data.write.csv(output_path, header=True, mode="overwrite")

In [None]:
#Rates are the monthly payments regardless of whether you go see your dentist or not. 

In [None]:
rate=spark.read.csv("raw_data/Rate.csv", header=True, inferSchema=True)
rate.printSchema()

In [None]:
from pyspark.sql.window import Window
# Filter rate: remove rates that are either above 1000 or below 0
rate = rate.filter((rate.IndividualRate < 1000) & (rate.IndividualRate > 0))

# Load dental_data into a Spark DataFrame, adjust path as needed
dental_data = spark.read.csv("output_files/dental_data.csv", header=True, inferSchema=True)

# Assuming PlanId is the common column between the two DataFrames
rate = rate.join(dental_data.select("plan_id_short", "DentalOnlyPlan"), rate.PlanId == dental_data.plan_id_short, how='left')

# Create age groups and calculate mean IndividualRate for each group
age_groups = {
    "0-20":"mean_rate_kids",
    "21-29":"mean_rate_20s",
    "30-39":"mean_rate_30s",
    "40-49":"mean_rate_40s",
    "50-64":"mean_rate_5064",
    "65 and over":"mean_rate_65up",
    "Family Option":"mean_rate_family"
}

In [None]:
for age_group, output_col in age_groups.items():
    lower_age, upper_age = age_group.split("-") if "-" in age_group else (age_group, age_group)
    age_conditions = (F.col("Age") >= lower_age) & (F.col("Age") <= upper_age)

    windowSpec = Window.partitionBy("PlanId").orderBy("Age").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    mean_rate_do = F.mean(F.when(age_conditions & (F.col("DentalOnlyPlan") == "Yes"), F.col("IndividualRate"))).over(windowSpec)
    rate = rate.withColumn(output_col + "_do", mean_rate_do)


    # For DentalOnlyPlan == "No"
    mean_rate_dm = F.mean(F.when(age_conditions & (F.col("DentalOnlyPlan") == "No"), F.col("IndividualRate"))).over(windowSpec)
    rate = rate.withColumn(output_col + "_dm", mean_rate_dm)

In [None]:
# Calculating mean IndividualTobaccoRate for DentalOnlyPlan == "Yes"
windowSpec_do = Window.partitionBy("PlanId").orderBy("DentalOnlyPlan").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
mean_rate_tobacco_do = F.mean(F.when(F.col("DentalOnlyPlan") == "Yes", F.col("IndividualTobaccoRate"))).over(windowSpec_do).alias("mean_rate_tobacco_do")
rate = rate.withColumn("mean_rate_tobacco_do", mean_rate_tobacco_do)

# Calculating mean IndividualTobaccoRate for DentalOnlyPlan == "No"
mean_rate_tobacco_dm = F.mean(F.when(F.col("DentalOnlyPlan") == "No", F.col("IndividualTobaccoRate"))).over(windowSpec_do).alias("mean_rate_tobacco_dm")
rate = rate.withColumn("mean_rate_tobacco_dm", mean_rate_tobacco_dm)

In [None]:
rate_summary = rate.groupBy("PlanId").agg(
    mean("mean_rate_kids_do").alias("mean_rate_kids_do"),
    mean("mean_rate_20s_do").alias("mean_rate_20s_do"),
    mean("mean_rate_30s_do").alias("mean_rate_30s_do"),
    mean("mean_rate_40s_do").alias("mean_rate_40s_do"),
    mean("mean_rate_5064_do").alias("mean_rate_5064_do"),
    mean("mean_rate_65up_do").alias("mean_rate_65up_do"),
    mean("mean_rate_family_do").alias("mean_rate_family_do"),
    mean("mean_rate_tobacco_do").alias("mean_rate_tobacco_do"),
    mean("mean_rate_kids_dm").alias("mean_rate_kids_dm"),
    mean("mean_rate_20s_dm").alias("mean_rate_20s_dm"),
    mean("mean_rate_30s_dm").alias("mean_rate_30s_dm"),
    mean("mean_rate_40s_dm").alias("mean_rate_40s_dm"),
    mean("mean_rate_5064_dm").alias("mean_rate_5064_dm"),
    mean("mean_rate_65up_dm").alias("mean_rate_65up_dm"),
    mean("mean_rate_tobacco_dm").alias("mean_rate_tobacco_dm")
)

rate_summary.printSchema()

In [None]:
#Read Network csv
network = spark.read.csv("raw_data/Network.csv", header=True, inferSchema=True)

In [None]:
# Merge dental_data with network
dental_data = dental_data.join(network, on=["BusinessYear", "StateCode", "IssuerId", "SourceName", "NetworkId"], how="left")

# Create a new column plan_id_short by extracting the substring
dental_data = dental_data.withColumn("plan_id_short", dental_data["PlanId"].substr(1, 4))

# Merge dental_data with rate_summary
dental_data = dental_data.join(rate_summary, dental_data["plan_id_short"] == rate_summary["PlanId"], how="left")


In [None]:
# Drop unnecessary DataFrames
dental_data.cache()
rate_summary.unpersist()
network.unpersist()

# Deduplicate the merged DataFrame
dental_data = dental_data.dropDuplicates()

dental_data.printSchema()

In [None]:
# How many plans offer dental coverage of some sort?
unique_plan_count = dental_data.select("plan_id_short").distinct().count()
print("Number of plans offering dental coverage of some sort:", unique_plan_count)

In [None]:
# How many providers do you have to choose from for dental?
unique_provider_count = dental_data.select("IssuerId").distinct().count()
print("Number of dental providers to choose from:", unique_provider_count)

In [None]:
# Where is dental available?
state_counts = dental_data.groupBy("StateCode").count()
state_counts.show()

In [None]:
# How many are dental-only?
dental_only_count = dental_data.filter(dental_data["DentalOnlyPlan"] == "Yes").count()

print("Number of dental-only plans:", dental_only_count)

In [None]:
spark.stop()