## Project Hephaestus - Transparency in Coverage

### Assumptions
 - Transparency in coverage files were found manually through different insurance providers websites
 - We have taken PPO in_network data for below insuarance providers
   - Wellmark
   - UMC (UHC, United Health Care)
   - HealthPlans
 - Once the files were downloaded, the below code instructions were run manually for each insurance provider file.
   - This was done due to the fact that the format of the files were different and many times, the data was wrapped as a string array or a string in some of the string columns.
 - The data transformation followed three-phased approach of Bronze, Silver, and Gold Lake model where the data was first written to a Bronze table, then transformed to Silver and then curated to a Gold table which ultimately was used for analysis.
 - Due to the fact that the data is very large, we have considered some common used procedure to do our analysis but made all available to be searched for our stakeholders.
 - The analysis is all based on the the data available by the insurance provider.
 - The data can be a few months old as was available on insurance provider websites.

Import necessary libraries

In [0]:
import json
import os
import gzip, zipfile
import requests
import pandas as pd
import shutil
import numpy as np
import itertools
import shutil
from pyspark.sql.functions import explode

Create spark session

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

If Zip file, extract json file

In [0]:
with zipfile.ZipFile('/dbfs/FileStore/Akshay/IowaHealthData/HealthPartners/2023_03_01_HealthPartners_Iowa_Specialty_in_network_rates.zip'
, 'r') as zip_ref:
    zip_ref.extractall('/dbfs/FileStore/Akshay/IowaHealthData/HealthPartners/json_files')

If gzip file, extract json file

In [0]:
with gzip.open(
    "/dbfs/FileStore/Akshay/IowaHealthData/Aetna/03-06/2023_02_09_Aetna_Health_of_Iowa_Inc_json.gz",
    "rb",
) as f_in:
    with open(
        "/dbfs/FileStore/Akshay/IowaHealthData/Aetna/03-06/2023_02_09_Aetna_Health_of_Iowa_Inc.json",
        "wb",
    ) as f_out:
        shutil.copyfileobj(f_in, f_out)

List files to ensure the json file is there

In [0]:
%fs
ls /FileStore/Akshay/IowaHealthData/HealthPartners/json_files


Create database `health_plans`

In [0]:
db_name = "health_plans"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")

In [0]:
def load_file(file_path):
    data = {}
    with open(file_path, "r") as f:
        data = json.loads(f.read())
    return data

In [0]:
def flatten_data(data):
    in_network = pd.json_normalize(
        data["in_network"],
        record_path=["negotiated_rates", "negotiated_prices"],
        meta=[
            "negotiation_arrangement",
            "name",
            "billing_code_type",
            "billing_code_type_version",
            "billing_code",
            "description",
            ["negotiated_rates", "provider_references"],
        ],
    )
    #   in_network = validate_and_convert_provider_references(in_network)
    in_network = in_network.drop("billing_code_modifier", axis=1)
    in_network["reporting_entity_name"] = data["reporting_entity_name"]
    in_network["reporting_entity_type"] = data["reporting_entity_type"]
    in_network["last_updated_on"] = data["last_updated_on"]
    in_network["version"] = data["version"]
    in_network = in_network.drop("service_code", axis=1)
    print("flattened the data")
    return in_network

In [0]:
def validate_and_convert_provider_references(in_network):
    changes = ["negotiated_rates.provider_references"]
    cols = in_network.columns.difference(changes).tolist()
    in_network = (
        in_network.groupby(cols)[changes]
        .agg(list)
        .reset_index()
        .reindex(in_network.columns, axis=1)
    )
    provider_references_new = []
    for ref in in_network["negotiated_rates.provider_references"]:
        provider_references_new.append(list(itertools.chain.from_iterable(ref)))
    in_network["negotiated_rates.provider_references"] = provider_references_new
    in_network["negotiated_rates.provider_references"]
    print("validate_and_convert_provider_references")
    return in_network

In [0]:
def write_df_to_delta(data, delta_path):
    print("starting to create dataframe for in_network_data")
    df = spark.createDataFrame(data)
    df = df.withColumn(
        "provider_references", df["`negotiated_rates.provider_references`"]
    )
    df = df.drop("negotiated_rates.provider_references")
    df.write.mode("append").option("mergeSchema", "true").format("delta").save(
        delta_path
    )
    print("successfully wrote dataframe for in_network_data")

In [0]:
def process_provider_references(data):
    provider_references = pd.json_normalize(
        data["provider_references"],
        record_path="provider_groups",
        meta=["provider_group_id", ["provider_groups"]],
    )
    provider_references["reporting_entity_name"] = data["reporting_entity_name"]
    provider_references["reporting_entity_type"] = data["reporting_entity_type"]
    provider_references["last_updated_on"] = data["last_updated_on"]
    provider_references["version"] = data["version"]
    provider_references = provider_references.drop("provider_groups", axis=1)

    provider_referencesDF = spark.createDataFrame(provider_references)
    provider_referencesDF.write.mode("append").option("mergeSchema", "true").format(
        "delta"
    ).save("/mnt/edl/raw/iowa_health_plans/health_partners/provider_references")

In [0]:
def process_in_network(data):
    in_network = flatten_data(data)
    write_df_to_delta(in_network, "/mnt/edl/raw/iowa_health_plans/health_partners/in_network_rates")

In [0]:
def process_in_network_wellmark(data):
    in_network = flatten_data(data)
    in_network = in_network.explode("negotiated_rates.provider_references")
    write_df_to_delta(in_network, "/mnt/edl/raw/iowa_health_plans/health_partners/in_network_rates")

Process json files one by one. This is due to the fact that the files have little different format.

In [0]:
files = [
    #  '/dbfs/FileStore/Akshay/IowaHealthData/2023_02_01_United_HealthCare_Services__Inc__Third_Party_Administrator_PPO___NDC_PPO_NDC_in_network_r.json',
    #  '/dbfs/FileStore/Akshay/IowaHealthData/Aetna/2023_02_09_4c4a38f7_b4bd_4ced_949c_fe2c14f25c87_Aetna_Health_of_Iowa_Inc__json' #,
    #  '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/2023_02_01_WMK_I_PLAN2_13B0_HealthSparq.json',
    #  '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/2023_02_01_WMK_I_PLAN4_13E0_HealthSparq.json',
    #  '/dbfs/FileStore/Akshay/IowaHealthData/UHC/2023_02_01_United_Healthcare_Services_Choice_Plus_POS_8_in_network_rates.json',
    #   '/dbfs/FileStore/Akshay/IowaHealthData/UHC/03-05-2023/2023_03_01_UMR_OPTIONS_PPO__B_58_in_network_rates_json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__1_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__2_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__3_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__4_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__5_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__6_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__7_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__8_.json',
    #     '/dbfs/FileStore/Akshay/IowaHealthData/Wellmark/03-05/2023_03_01_WMK_I_PLAN2_13B0_HealthSparq__9_.json',
    #     "/dbfs/FileStore/Akshay/IowaHealthData/Aetna/03-06/2023_02_09_Aetna_Health_of_Iowa_Inc.json",
    #     '/dbfs/FileStore/Akshay/IowaHealthData/HealthPartners/json_files/2023-03-01_HealthPartners_Open-Access_in-network-rates.json',
          '/dbfs/FileStore/Akshay/IowaHealthData/HealthPartners/json_files/2023-03-01_HealthPartners_Iowa-Specialty_in-network-rates.json'
]
data = {}
for pos_json in files:
    print("loading file ", pos_json)
    data = load_file(pos_json)
  process_in_network(data)
  # process_in_network_wellmark(data) # If the file is Wellmark then run this method.
  process_provider_references(data)

Load NPI data from csv file

In [0]:
def load_and_process_npi_data(file_path):
    npi_data = pd.read_csv(file_path, header=0)
    npi_data.rename(
        columns={
            "Provider Organization Name (Legal Business Name)": "Organization_Name",
            "Provider Last Name (Legal Name)": "Last_Name",
            "Provider First Name": "First_Name",
            "Provider First Line Business Practice Location Address": "Practice_Location_Address",
            "Provider Second Line Business Practice Location Address": "Second_Line_Practice_Location_Address",
            "Provider Business Practice Location Address City Name": "Practice_Location_Address_City",
            "Provider Business Practice Location Address State Name": "Practice_Location_Address_State",
            "Provider Business Practice Location Address Postal Code": "Practice_Location_Address_Postal_Code",
        },
        inplace=True,
    )
    npi_data = npi_data.drop("Unnamed: 0", axis=1)
    print("starting to create dataframe for npi")
    df = spark.createDataFrame(npi_data)
    df.withColumnRenamed(
        "Provider Organization Name (Legal Business Name)", "Organization_Name"
    )
    df.withColumnRenamed("Provider Last Name (Legal Name)", "Last_Name")
    df.withColumnRenamed("Provider First Name", "First_Name")
    df.withColumnRenamed(
        "Provider First Line Business Practice Location Address",
        "Practice_Location_Address",
    )
    df.withColumnRenamed(
        "Provider Second Line Business Practice Location Address",
        "Second_Line_Practice_Location_Address",
    )
    df.withColumnRenamed(
        "Provider Business Practice Location Address City Name",
        "Practice_Location_Address_City",
    )
    df.withColumnRenamed(
        "Provider Business Practice Location Address State Name",
        "Practice_Location_Address_State",
    )
    df.withColumnRenamed(
        "Provider Business Practice Location Address Postal Code",
        "Practice_Location_Address_Postal_Code",
    )
    df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save(
        "/mnt/edl/raw/iowa_health_plans/npi"
    )
    print("successfully wrote dataframe for npi")

In [0]:
load_and_process_npi_data('/dbfs/FileStore/Akshay/NPI/Iowa/Iowa_NPI.csv')

Create Bronze tables on top of the processed data if table does not exist.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.npi USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/npi';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.in_network_rates USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/in_network_rates';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.provider_references USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/provider_references';

Create Silver table on top of the exploded data if table does not exist.

Explode the provider reference table by npi as npi_number

In [0]:
sql_query = "SELECT explode(npi) as npi_number, `tin.type`, `tin.value`, provider_group_id, reporting_entity_name, reporting_entity_type, last_updated_on, version from health_plans.provider_references"

In [0]:
provider_references_rates_exploded = spark.sql(sql_query)

In [0]:
provider_references_rates_exploded.write.mode("overwrite").partitionBy(
    "reporting_entity_name"
).option("overwriteSchema", "true").format("delta").save(
    "/mnt/edl/raw/iowa_health_plans/provider_references_exploded"
)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.provider_references_exploded USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/provider_references_exploded';

This process needs to be repeated for each insurance provider
- UMR INC (UHC)
- HealthSparq (Wellmark)
- Merative on behalf of HealthPartners

Remove data which does not belong to IOWA providers for provider_references_exploded

In [0]:
ia_provider_references = spark.sql("select * from health_plans.provider_references_exploded pre where pre.npi_number in (select npi from health_plans.npi) and reporting_entity_name = 'Merative on behalf of HealthPartners');

In [0]:
ia_provider_references = ia_provider_references.dropDuplicates()

In [0]:
ia_provider_references.write.mode("overwrite").partitionBy(
    "reporting_entity_name"
).option("overwriteSchema", "true").format("delta").save(
    "/mnt/edl/raw/iowa_health_plans/ia_provider_references"
  
)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.ia_provider_references USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/ia_provider_references';

In [0]:
%sql
show tables from health_plans

Remove data which does not belong to IOWA providers for in_network_rates

This process needs to be repeated for each insurance provider
- UMR INC (UHC)
- HealthSparq (Wellmark)
- Merative on behalf of HealthPartners

In [0]:
ia_inr_df = spark.sql("select * from health_plans.in_network_rates where reporting_entity_name = 'Merative on behalf of HealthPartners' and provider_references in ( select pre.provider_group_id from health_plans.provider_references_exploded pre where pre.npi_number in (select npi from health_plans.npi) and reporting_entity_name = 'Merative on behalf of HealthPartners')");

In [0]:
ia_inr_df.write.mode("append").partitionBy(
    "reporting_entity_name"
).option("mergeSchema", "true").format("delta").save(
    "/mnt/edl/raw/iowa_health_plans/ia_in_network_rates"
)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS health_plans.ia_in_network_rates USING DELTA LOCATION '/mnt/edl/raw/iowa_health_plans/ia_in_network_rates';

Remove duplicates from ia_in_network_rates if found

In [0]:
ia_in_network_rates_df = spark.sql('select * from health_plans.ia_in_network_rates inr where inr.provider_references in (select ipr.provider_group_id from health_plans.ia_provider_references ipr where inr.reporting_entity_name = ipr.reporting_entity_name)')

In [0]:
ia_in_network_rates_df = ia_in_network_rates_df.dropDuplicates()

In [0]:
ia_in_network_rates_df.write.mode("overwrite").partitionBy(
    "reporting_entity_name"
).option("overwriteSchema", "true").format("delta").save(
    "/mnt/edl/raw/iowa_health_plans/ia_in_network_rates"
  
)