## Python Processing for CHIR Data ##

In [1]:
# set-up steps
import pandas as pd
import numpy as np
import altair as alt
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import time
import os

warnings.filterwarnings("ignore")

# data loading

CHIR_df = pd.read_csv("CHIR_data.csv")

In [2]:
# setting up a data check for potential duplicates

# Typology for duplicates (pending confirmation)
dupe_code_list = {"cross_state_duplicate", "document_type_duplicate", "non_duplicate", "payer_duplicate","true_duplicate"}

# Converting the policy identifier column to a more useful format (numeric to string)
CHIR_df['Policy Identifier'] = CHIR_df['Policy Identifier'].astype(str)

# Creating a duplicate flagger function for this data stream
# This flags duplicates in the DataFrame based on Policy Identifier, States, Document Type, and Payer.

def flag_duplicates(CHIR_df):

    # Sort the DataFrame to ensure consistent grouping for duplicates
    CHIR_df_sorted = CHIR_df.sort_values(by=["Policy Identifier", "States", "Document Type", "Payer"])

    # Initialize the 'duplicate_flagger' column with a default value
    CHIR_df_sorted["duplicate_flagger"] = "non_duplicate"

    # Identify where there are the same 'Policy Identifier' but different 'States'.
    cross_state_dupes = CHIR_df_sorted.groupby("Policy Identifier").filter(
        lambda x: x["States"].nunique() > 1
    )
    CHIR_df_sorted.loc[cross_state_dupes.index, "duplicate_flagger"] = "cross_state_duplicate"
    

    # Identify where there are the same 'Policy Identifier' and 'States' but different 'Document Type'.
    doc_type_dupes_mask = (
        (CHIR_df_sorted.duplicated(subset=["Policy Identifier", "States"], keep=False))
        & (
            ~CHIR_df_sorted.duplicated(
                subset=["Policy Identifier", "States", "Document Type"], keep=False
            )
        )
        & (CHIR_df_sorted["duplicate_flagger"] == "non_duplicate")
    )
    CHIR_df_sorted.loc[doc_type_dupes_mask, "duplicate_flagger"] = "document_type_duplicate"
    


    # Identify where there are the same 'Policy Identifier' and 'States' but different 'Payer'.
    payer_dupes_mask = (
        (CHIR_df_sorted.duplicated(subset=["Policy Identifier", "States"], keep=False))
        & (
            ~CHIR_df_sorted.duplicated(
                subset=["Policy Identifier", "States", "Payer"], keep=False
            )
        )
        & (CHIR_df_sorted["duplicate_flagger"] == "non_duplicate")
    )
    CHIR_df_sorted.loc[payer_dupes_mask, "duplicate_flagger"] = "payer_duplicate"
    

    # Identifying true duplicates, where all these factors are identical
    true_dupes_mask = (
        CHIR_df_sorted.duplicated(
            subset=["Policy Identifier", "States", "Document Type", "Payer"], keep=False
        )
        & (CHIR_df_sorted["duplicate_flagger"] == "non_duplicate")
    )
    CHIR_df_sorted.loc[true_dupes_mask, "duplicate_flagger"] = "true_duplicate"
    

    return CHIR_df_sorted

# Running the function on the CHIR data
dupecheck_CHIR_df = flag_duplicates(CHIR_df.copy())

# Exporting the modified version
output_path = "derived_data/CHIR_processed_for_Airtable.csv"
dupecheck_CHIR_df.to_csv(output_path, index= False)

print(f"CHIR Processed Data successfully exported to: {output_path}")

CHIR Processed Data successfully exported to: derived_data/CHIR_processed_for_Airtable.csv


In [3]:
# CGM grouping exercise
# This categorizes CGM product coverage based on if they are 'Dexcom' and 'FreeStyle'


def categorize_cgm_coverage(df: pd.DataFrame, column_name: str) -> pd.DataFrame:

    # Create an empty list to store the labels for the new column
    cgm_grouping_labels = []

    # Iterate through each cell in the specified column
    for cell_value in df[column_name]:
        # Initialize flags for Dexcom and FreeStyle presence
        has_dexcom = False
        has_freestyle = False

        # Handle missing or empty values first
        if pd.isna(cell_value) or str(cell_value).strip() == '':
            cgm_grouping_labels.append("No coverage")
            continue # Move to the next cell

        # Convert the cell value to a string and then to lowercase for case-insensitive matching
        # Split the string by commas to get individual product names
        products = str(cell_value).lower().split(',')

        # Check for the presence of 'dexcom' and 'freestyle' in any of the product names
        for product in products:
            if "dexcom" in product:
                has_dexcom = True
            if "freestyle" in product:
                has_freestyle = True

        # Apply the labeling logic based on the flags
        if has_dexcom and has_freestyle:
            cgm_grouping_labels.append("both Dexcom and FreeStyle")
        elif has_dexcom:
            cgm_grouping_labels.append("Dexcom only")
        elif has_freestyle:
            cgm_grouping_labels.append("FreeStyle only")
        else:
            # If neither Dexcom nor FreeStyle are found, but the cell was not empty,
            # it means other products are covered.
            cgm_grouping_labels.append("Others covered")

    # Assign the generated labels to the new 'CGM_grouping' column in the DataFrame
    df['CGM_grouping'] = cgm_grouping_labels

    return df

# test run
cat_CHIR_df = categorize_cgm_coverage(dupecheck_CHIR_df.copy(), 'CGMs Covered')

# Exporting the modified version
output_path = "derived_data/cat_CHIR_processed_for_Airtable.csv"
cat_CHIR_df.to_csv(output_path, index= False)

print(f"CHIR Categorized CGM Data successfully exported to: {output_path}")

CHIR Categorized CGM Data successfully exported to: derived_data/cat_CHIR_processed_for_Airtable.csv


In [4]:
# grouping for implanted

# writing a dictionary based on the table
device_implant_dict = {
    "Eversense E3 rtCGM": "implanted",
    "Eversense 365": "implanted",
    "Medtronic Guardian 3 rtCGM": "non-implanted",
    "Medtronic Guardian 4 rtCGM": "non-implanted",
    "Medtronic Simplera rtCGM": "non-implanted",
    "Medtronic Simplera Sync rtCGM": "non-implanted",
    "Dexcom G6 rtCGM": "non-implanted",
    "Dexcom G7 rtCGM": "non-implanted",
    "Abbott FreeStyle Libre 2 Plus rtCGM": "non-implanted",
    "Abbott FreeStyle Libre 3 Plus rtCGM": "non-implanted",
    "Abbott FreeStyle Libre 2 isCGM": "non-implanted",
    "Abbott FreeStyle Libre 14 isCGM": "non-implanted",
    "Unspecified - transcutaneous rtCGM": "non-implanted",
    "Unspecified - transcutaneous isCGM": "non-implanted",
    "Unspecified - implantable rtCGM": "implanted",
    "Unspecified": "unknown"
}

# iterating through our data (with a copy for safety)
implanted_CHIR_df = cat_CHIR_df.dropna(subset=['CHIR Review Fields Last Modified']).copy()

# building an iterating function to run through the data
def categorize_implanted_type(df: pd.DataFrame, column_name: str) -> pd.DataFrame:

    # create an empty list to store the labels for the new column
    implanted_type_labels = []

    # iterate through each cell in the specified column
    for cell_value in df[column_name]:
        # handling missing or empty values
        if pd.isna(cell_value) or str(cell_value).strip() == '':
            implanted_type_labels.append("no devices")
            continue

        # separating multi-device coverage by commas
        products_in_cell = str(cell_value).split(',')
        
        has_implanted_device = False
        has_non_implanted_device = False
        
        # check in case any unknowns (or spelling variations)
        found_any_known_device_in_cell = False 

        # check each product in the cell against the dictionary
        for product_in_cell in products_in_cell:
            for mapped_device_name, status in device_implant_dict.items():
                # check for partial matches: if the mapped name is in the product string, or vice-versa
                if mapped_device_name in product_in_cell or product_in_cell in mapped_device_name:
                    found_any_known_device_in_cell = True
                    if status == "implanted":
                        has_implanted_device = True
                    elif status == "non-implanted":
                        has_non_implanted_device = True
                    break

        # applying the labeling logic based on the flags after checking all products in the cell
        if has_implanted_device and has_non_implanted_device:
            implanted_type_labels.append("both implanted and non-implanted")
        elif has_implanted_device:
            implanted_type_labels.append("implanted only")
        elif has_non_implanted_device:
            implanted_type_labels.append("non-implanted only")
        else:
            implanted_type_labels.append("unknown device type") 

    # assign the generated labels to the new 'Implanted_Type' column
    df['Implanted_Type'] = implanted_type_labels

    return df

# test run with our copied data
implanted_CHIR_df = categorize_implanted_type(implanted_CHIR_df, 'CGMs Covered')

# supplementing the data to include a count of the number of devices
def count_cell_items(df: pd.DataFrame, column_name: str, new_column_name: str) -> pd.DataFrame:
    totals = []
    for cell_value in df[column_name]:
        if pd.isna(cell_value) or str(cell_value).strip() == '':
            totals.append(0) # 0 for empty cells
        else:
            items = [item.strip() for item in str(cell_value).split(',') if item.strip()]
            totals.append(len(items))
    
    df[new_column_name] = totals
    return df    

implanted_CHIR_df = count_cell_items(implanted_CHIR_df, 'CGMs Covered', 'CGM_count')

# exporting the modified version
new_output_path = "derived_data/counted_and_implanted_CHIR_processed_for_Airtable.csv"
implanted_CHIR_df.to_csv(new_output_path, index= False)

print(f"CHIR Implanted/non-implanted & counted CGM Data successfully exported to: {new_output_path}")

CHIR Implanted/non-implanted & counted CGM Data successfully exported to: derived_data/counted_and_implanted_CHIR_processed_for_Airtable.csv


In [None]:
# Webscraping and data organization -- pulling the CHIR documents
# adding packages for scraping
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


def scrape_chir_pdfs(df, username, password):

    # organizing the destination folder
    download_folder = "CHIR Policy Reporter Downloads"
    if not os.path.exists(download_folder):
        os.makedirs(download_folder)
        print(f"Created new folder: {os.path.abspath(download_folder)}")

    # need to use a headless browser to overcome the two step login process
    options = webdriver.ChromeOptions()
    options.add_argument("--headless") 
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)

    try:
        print("Navigating to login page with Selenium...")
        driver.get("https://portal.policyreporter.com/login")

        # Wait for username field and submit
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.NAME, "username"))
        ).send_keys(username)
        driver.find_element(By.XPATH, "//input[@type='submit' and @value='Next']").click()

        # Wait for password field and submit
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.NAME, "password"))
        ).send_keys(password)
        driver.find_element(By.XPATH, "//button[contains(text(), 'Submit')]").click()

        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.TAG_NAME, "body"))
        )
        print("Login successful with Selenium.")
        
    except Exception as e:
        print(f"Selenium login failed: {e}")
        driver.quit()
        return
        
    # scraping function
    for index, row in df.iterrows():
            # based on the CHIR data frame set up earlier
            dynamic_url = row['Link to Policy/Formulary PDF']
            name = row['Document Title']
            unique_id = row['airtable_uuid']
            
            # cleaing the filename to remove invalid characters and creating a destination file name based on the UUID
            filename = f"{name}_{unique_id}.pdf".replace("/", "_").replace("\\", "_").replace(":", "_").replace("?", "_").replace("|", "_")
            filepath = os.path.join(download_folder, filename)

            # reducing redundancy
            if os.path.exists(filepath):
                print(f"File already exists, skipping: {filename}")
                continue

            try:
                # first, circumventing the dynamic URL gatekeeping permissions checker
                print(f"Retrieving PDF link from: {dynamic_url}")
                driver.get(dynamic_url)

                # saving the new URL after the redirecting
                final_pdf_url = driver.current_url

                # Now, use requests with the cookies from the driver to download
                s = requests.Session()
                for cookie in driver.get_cookies():
                    s.cookies.set(cookie['name'], cookie['value'])

                # finally, we download that pdf!
                response_pdf = s.get(final_pdf_url, stream=True)
                response_pdf.raise_for_status()

                # confirming that we are pulling the pdf and not the HTML file
                content_type = response_pdf.headers.get('Content-Type', '')
                if 'application/pdf' not in content_type:
                    print(f"❗ Warning: Expected a PDF but received a different content type ({content_type}) from {final_pdf_url}. Skipping file.")
                    continue

                with open(filepath, 'wb') as f:
                    for chunk in response_pdf.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                print(f"✅ Successfully downloaded: {filename}")
                
                # adding a snooze to circumvent the usual anti-download blockers
                sleep_time = random.uniform(5, 17)
                time.sleep(sleep_time)

            except Exception as e:
                print(f"❗ Failed to process URL {dynamic_url}: {e}")
    driver.quit()
    print("All downloads complete.")

# running the code
reboot_CHIR_df = CHIR_df.iloc[2450:]

scrape_chir_pdfs(reboot_CHIR_df, ## email address ## , ## password ## 
                 )

In [6]:
# Airtable mass upload of the data scraped above
# adding in pyairtable
from pyairtable import Table
from pyairtable.formulas import match

# building an uploader function
def pdf_to_airtable_uploader(folder_path, PAT, base_id, table_id, field_id):
    table = Table(PAT, base_id, table_id)
    # counting the number of files uploaded to ensure effectiveness
    processed_files = 0

    # building a looped approach to uploading our documents
    for filename in os.listdir(folder_path):
        if filename.endswith(".pdf"):
            try:
                # first, pulling the Airtable ID appended to each file name (done in the block above)
                record_id = filename.rsplit('_', 1)[-1].replace('.pdf', '')
                file_path = os.path.join(folder_path, filename)
                print(f"Uploading '{filename}' to record {record_id}...")
                
                # aligning the record ID with the field ID where the document is to be deposited
                table.upload_attachment(record_id, field_id, file_path)
                
                print(f"✅ Successfully uploaded '{filename}'.")
                processed_files += 1

            except Exception as e:
                print(f"An error occurred while processing {filename}: {e}")

    print(f"\nCompleted! Processed {processed_files} files.")


# Airtable access data for this project
folder = "CHIR Policy Reporter Downloads"
airtable_personal_access_token = ## INSERT THE AIRTABLE PAT HERE ##
airtable_base_id = ## INSERT THE AIRTABLE BASE ID HERE ##
airtable_table_id = ## INSERT THE AIRTABLE TABLE ID HERE ##
airtable_field_id = ## INSERT THE AIRTABLE FIELD ID HERE ##

pdf_to_airtable_uploader(folder, airtable_personal_access_token, airtable_base_id, airtable_table_id, airtable_field_id)

Uploading 'Partial Hospitalization Program and Intensive Outpatient Program Services (CA Commercial)_recGiO5jn6apYiKo4.pdf' to record recGiO5jn6apYiKo4...
✅ Successfully uploaded 'Partial Hospitalization Program and Intensive Outpatient Program Services (CA Commercial)_recGiO5jn6apYiKo4.pdf'.
Uploading 'Ambulatory or Outpatient Surgery Center Procedures_rec6hWizOZMoCizxL.pdf' to record rec6hWizOZMoCizxL...
✅ Successfully uploaded 'Ambulatory or Outpatient Surgery Center Procedures_rec6hWizOZMoCizxL.pdf'.
Uploading 'Clinical Criteria (Colorado)_recwXGygv1b5pDPra.pdf' to record recwXGygv1b5pDPra...
✅ Successfully uploaded 'Clinical Criteria (Colorado)_recwXGygv1b5pDPra.pdf'.
Uploading 'Anthem Blue Cross and Blue Shield Provider Manual (Kentucky)_recSjJXsDen9Vy51U.pdf' to record recSjJXsDen9Vy51U...
✅ Successfully uploaded 'Anthem Blue Cross and Blue Shield Provider Manual (Kentucky)_recSjJXsDen9Vy51U.pdf'.
Uploading 'Implants_recfxLHxw0rnagUpU.pdf' to record recfxLHxw0rnagUpU...
✅ Succes