May need to run this notebook via `jupyter notebook --NotbookApp.iopub_Data_Rate_Limit=1e10`

# Data Loading to BigQuery

This notebook covers the setup of taking all of the files that we've discovered around Price Transparency and loading them into a standard format. 

We first need to do one, and then we will do the rest. See https://github.com/pauldria/ncssm-2022-jterm-price-transparency#data for more.

In [1]:
from tools import tools

from google.cloud import bigquery

import datetime
import json
import os
import pandas as pd
import time
import urllib

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

In [2]:
%env GOOGLE_APPLICATION_CREDENTIALS=/home/raff/.ssh/ncssm-price-transparency-d58392a32442.json

env: GOOGLE_APPLICATION_CREDENTIALS=/home/raff/.ssh/ncssm-price-transparency-d58392a32442.json


In [3]:
with open("config.json", "r") as f:
    config = json.load(f)

## Need to debug? Set `debug = True` here. Will only process 1000 lines for expediency.

In [4]:
debug = False
debug_check = 13

nrows = None if debug else None

idx_start = debug_check   if debug else 0
idx_end   = debug_check+1 if debug else len(config["data"])
num_entries = idx_end - idx_start

if debug:
    print(f"Checking only {debug_check}")
    print(config["data"][debug_check])
else:
    print(f"Loading in all hospitals")

Loading in all hospitals


In [None]:
date_obtained = datetime.datetime.now().strftime("%Y-%m-%d")

for i in range(14, idx_end):
    entry = config["data"][i]
    
    identifier    = i
    hospital_name = entry["hospital_name"]
    filepath      = entry["data_url_local"] if "data_url_local" in entry else entry["data_url"]
    is_local      = "data_url_local" in entry
    filetype      = "csv"
    if "filetype" in entry:
        filetype = entry["filetype"]
    else:
        if filepath.endswith("json"):
            filetype = "json"
        elif filepath.endswith("xlsx"):
            filetype = "xlsx"
    
    fixed_start = entry["idx_column_fixed_start"]
    fixed_end = entry["idx_column_fixed_end"]
    
    transformed_columns = entry["transformed_columns"]
    
    skiprows      = entry["skiprows"]
    date_provided = entry["date_provided"]
    gross_charge_name   = entry["gross_charge_name"] if "gross_charge_name" in entry else None
    self_pay_name       = entry["self_pay_name"]     if "self_pay_name" in entry else None
    min_name = entry["min_name"] if "min_name" in entry else None
    max_name = entry["max_name"] if "min_name" in entry else None
    min_inpatient_name  = entry["min_inpatient_name"] if "min_inpatient_name" in entry else None
    max_inpatient_name  = entry["max_inpatient_name"] if "max_inpatient_name" in entry else None
    min_outpatient_name = entry["min_outpatient_name"] if "min_outpatient_name" in entry else None
    max_outpatient_name = entry["max_outpatient_name"] if "max_outpatient_name" in entry else None
    
    print(f"Processing {hospital_name} ({identifier} of {num_entries})")
    
    checkpoint_0   = time.time()

    df             = tools.read(filepath, skiprows, nrows, filetype = filetype, is_local = is_local)
    
    checkpoint_1   = time.time()
    time_taken    = "{:.2f}".format(checkpoint_1 - checkpoint_0)
    total_elapsed = "{:.2f}".format(checkpoint_1 - checkpoint_0)
    print(f"File read complete.  Shape: {df.shape}. Time taken: {time_taken}. Total time elapsed: {total_elapsed}")
    
    df_processed   = tools.process(df)
    
    checkpoint_2   = time.time()
    time_taken    = "{:.2f}".format(checkpoint_2 - checkpoint_1)
    total_elapsed = "{:.2f}".format(checkpoint_2 - checkpoint_0)
    print(f"Processing complete. Shape: {df_processed.shape}. Time taken: {time_taken}. Total time elapsed: {total_elapsed}")
    
    df_transformed = tools.transform(df_processed, fixed_start = fixed_start, fixed_end = fixed_end)    
    df_transformed.columns = transformed_columns
    
    checkpoint_3   = time.time()
    time_taken    = "{:.2f}".format(checkpoint_3 - checkpoint_2)
    total_elapsed = "{:.2f}".format(checkpoint_3 - checkpoint_0)
    print(f"Transform complete.  Shape: {df_transformed.shape}. Time taken: {time_taken}. Total time elapsed: {total_elapsed}")
    
    df_standardized = tools.standardize(df_transformed, 
                                        cols = ["code", "description", "payer", "cost"],
                                        id = identifier,
                                        date_obtained = date_obtained,
                                        date_provided = date_provided,
                                        hospital_name = hospital_name,
                                        gross_charge_name = gross_charge_name,
                                        self_pay_name = self_pay_name,
                                        min_name = min_name,
                                        max_name = max_name,
                                        min_inpatient_name = min_inpatient_name,
                                        max_inpatient_name = max_inpatient_name,
                                        min_outpatient_name = min_outpatient_name,
                                        max_outpatient_name = max_outpatient_name)
    
    checkpoint_4   = time.time()
    time_taken    = "{:.2f}".format(checkpoint_4 - checkpoint_3)
    total_elapsed = "{:.2f}".format(checkpoint_4 - checkpoint_0)
    print(f"Standardization complete. Shape: {df_standardized.shape}. Time taken: {time_taken}. Total time elapsed: {total_elapsed}")
    
    if not debug:
        job = tools.send_to_bigquery(df_standardized,
                                     destination_table = f"ncssm-price-transparency.hospital_data.hospital_{identifier}",
                                     cols_int64 = ["id"],
                                     cols_date  = ["date_obtained", "date_provided"])

        checkpoint_5   = time.time()
        time_taken    = "{:.2f}".format(checkpoint_5 - checkpoint_4)
        total_elapsed = "{:.2f}".format(checkpoint_5 - checkpoint_0)
        print(f"BQ load complete. Time taken: {time_taken}. Total time elapsed: {total_elapsed}")
    
    del df
    del df_processed
    del df_transformed
    del df_standardized

Processing Wake - NC Baptist (14 of 25)
File read complete.  Shape: (337638, 58). Time taken: 5.22. Total time elapsed: 5.22
Processing complete. Shape: (337638, 58). Time taken: 2.39. Total time elapsed: 7.62
Transform complete.  Shape: (17557176, 8). Time taken: 7.76. Total time elapsed: 15.38


In [6]:
df_standardized.head()

Unnamed: 0,id,date_obtained,date_provided,hospital_name,code,code_type,description,payer,cost
0,22,2022-01-02,1900-01-01,Novant - Presbyterian,*,,HC IP PRIVATE,_GROSS_CHARGE,1472
1,22,2022-01-02,1900-01-01,Novant - Presbyterian,*,,HC IP L&D,_GROSS_CHARGE,1472
2,22,2022-01-02,1900-01-01,Novant - Presbyterian,*,,HC BEH MED PRIVATE,_GROSS_CHARGE,2248
3,22,2022-01-02,1900-01-01,Novant - Presbyterian,*,,HC IP ONCOLOGY,_GROSS_CHARGE,2296
4,22,2022-01-02,1900-01-01,Novant - Presbyterian,*,,HC IP TRANSPLANT AND SURGICAL ADMISSION,_GROSS_CHARGE,2757


In [7]:
df_standardized["code"].value_counts()

         483951
*         49335
G0480      2262
Q9969       975
87798       936
          ...  
77307        39
494          39
39402        39
46924        39
099          39
Name: code, Length: 5636, dtype: int64

In [5]:
print(df.shape)
print(df_processed.shape)
print(df_transformed.shape)
print(df_standardized.shape)

(113385, 115)
(113385, 115)
(12585735, 6)
(12585735, 9)


In [6]:
df_standardized.head()

Unnamed: 0,id,date_obtained,date_provided,hospital_name,code,code_type,description,payer,cost
0,0,2021-11-28,2021-09-28,Alamance Regional Medical Center,MS001,,Heart Transplant Or Implant Of Heart Assist Sy...,_GROSS_CHARGE,"$490,771.98"
1,0,2021-11-28,2021-09-28,Alamance Regional Medical Center,MS002,,Heart Transplant Or Implant Of Heart Assist Sy...,_GROSS_CHARGE,"$540,615.96"
2,0,2021-11-28,2021-09-28,Alamance Regional Medical Center,MS003,,Ecmo Or Tracheostomy With Mv >96 Hours Or Prin...,_GROSS_CHARGE,"$256,535.52"
3,0,2021-11-28,2021-09-28,Alamance Regional Medical Center,MS004,,Tracheostomy With Mv >96 Hours Or Principal Di...,_GROSS_CHARGE,"$256,923.78"
4,0,2021-11-28,2021-09-28,Alamance Regional Medical Center,MS011,,"Tracheostomy For Face, Mouth And Neck Diagnose...",_GROSS_CHARGE,"$58,037.92"


In [7]:
df_standardized.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12585735 entries, 0 to 12585734
Data columns (total 9 columns):
 #   Column         Dtype         
---  ------         -----         
 0   id             int64         
 1   date_obtained  datetime64[ns]
 2   date_provided  datetime64[ns]
 3   hospital_name  object        
 4   code           object        
 5   code_type      object        
 6   description    object        
 7   payer          object        
 8   cost           object        
dtypes: datetime64[ns](2), int64(1), object(6)
memory usage: 864.2+ MB


In [8]:
client = bigquery.Client()

In [9]:
schema = []
for c in df_standardized:
    bigquery_type = bigquery.enums.SqlTypeNames.STRING
    if c in ["id"]:
        bigquery_type = bigquery.enums.SqlTypeNames.INT64
    if c in ["date_obtained", "date_provided"]:
        bigquery_type = bigquery.enums.SqlTypeNames.DATE
    schema.append(bigquery.SchemaField(c, bigquery_type))
schema

[SchemaField('id', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('date_obtained', 'DATE', 'NULLABLE', None, (), None),
 SchemaField('date_provided', 'DATE', 'NULLABLE', None, (), None),
 SchemaField('hospital_name', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('code', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('code_type', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('description', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('payer', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('cost', 'STRING', 'NULLABLE', None, (), None)]

In [10]:
job_config = bigquery.LoadJobConfig(
    schema            = schema,
    write_disposition = "WRITE_TRUNCATE",
)

In [11]:
job = client.load_table_from_dataframe(
    df_standardized, "ncssm-price-transparency.hospital_data.hospital_0", job_config=job_config
)

In [12]:
job.result()  # Wait for the job to complete.

LoadJob<project=ncssm-price-transparency, location=US, id=2e61d15d-8695-46d5-96c6-187f3660daa3>

# Test Bed - CharMeck has a silly way of dealing with their data

`TabName` separates out the different styles of costs and they use both Payer and Payor. Ugh.

In [3]:
df = pd.read_json("/home/raff/56-0529945_AtriumHealthAnson_StandardCharges.json")

In [4]:
df.head()

Unnamed: 0,Procedure,Code Type,Code,Rev Code,Procedure Description,Min /Max,Inpatient Gross Charge,Inpatient Negotiated Charge,Outpatient Gross Charge,Outpatient Negotiated Charge,TabName,Quantity,Payer,Plan(s),Inpatient Discounted Charge,Outpatient Discounted Charge,Plan,Unnamed: 18,Product,Gross Charge - Facility,Negotiated Charge - Facility,Gross Charge - Non-Facility,Negotiated Charge - Non-Facility,Procedure External ID,Procedure External ID RECORD NAME,Procedure Modifier,Price,Discounted Cash Price,Service Description,Codes,Methodology,Gross Charge,Aetna Commercial,Aetna Medicare,BCBS NC Commercial,BCBS NC Medicare,Cigna Commercial,Cigna Medicare,United Commercial,Veteran's Administration Community Care Network,Medcost Commercial,BCBS SC Individual,Min Negotiated Charge,Max Negotiated Charge
0,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,MIN,1708.0,789.1,,,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,2060000001,EAP,,0206 - INTENSIVE CARE-INTERMEDIATE ICU,Hc Pic Progressive,MIN,3697.0,1708.01,,,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,2500000026,EAP,,0250 - PHARMACY-GENERAL,Hc India Ink,MAX,111.85,111.85,111.85,111.85,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,2500000029,EAP,,0272 - MEDICAL/SURGICAL SUPPLIES AND DEVICES-S...,Hc Kit 2.0 Fibrin Seal Crosseal 1929,MAX,723.15,723.15,723.15,723.15,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,2500000031,EAP,,0250 - PHARMACY-GENERAL,Hc Kit 5.0 Lap Fibrin Seal Cros 1937,MIN,1541.8,712.31,1541.8,516.35,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [6]:
df.loc[df["Procedure"] == "1130000000"].head()

Unnamed: 0,Procedure,Code Type,Code,Rev Code,Procedure Description,Min /Max,Inpatient Gross Charge,Inpatient Negotiated Charge,Outpatient Gross Charge,Outpatient Negotiated Charge,TabName,Quantity,Payer,Plan(s),Inpatient Discounted Charge,Outpatient Discounted Charge,Plan,Unnamed: 18,Product,Gross Charge - Facility,Negotiated Charge - Facility,Gross Charge - Non-Facility,Negotiated Charge - Non-Facility,Procedure External ID,Procedure External ID RECORD NAME,Procedure Modifier,Price,Discounted Cash Price,Service Description,Codes,Methodology,Gross Charge,Aetna Commercial,Aetna Medicare,BCBS NC Commercial,BCBS NC Medicare,Cigna Commercial,Cigna Medicare,United Commercial,Veteran's Administration Community Care Network,Medcost Commercial,BCBS SC Individual,Min Negotiated Charge,Max Negotiated Charge
0,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,MIN,1708.0,789.1,,,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
672,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,MAX,1708.0,1708.0,,,Hosp Deidentified Payor MinMax,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
71831,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,,1708.0,,1708.0,,Hosp Discounted Cash Price,1.0,Self Pay,Discounted Cash Price,854.0,854.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,
649080,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,,1708.0,1035.05,,,Hosp Std. Charges_All Payors,1.0,AETNA [100],,,,AETNA OP CHC PPO [10000302],,,,,,,,,,,,,,,,,,,,,,,,,,,
649086,1130000000,EAP,,0113 - ROOM & BOARD-PRIVATE (ONE BED)-PEDIATRIC,Hc Room Private Pediatric,,1708.0,1292.96,,,Hosp Std. Charges_All Payors,1.0,AETNA [100],,,,AETNA WHOLE HEALTH - ATRIUM HEALTH [10001903],,,,,,,,,,,,,,,,,,,,,,,,,,,


In [10]:
df.columns

Index(['Procedure', 'Code Type', 'Code', 'Rev Code', 'Procedure Description',
       'Min /Max', ' Inpatient Gross Charge ', ' Inpatient Negotiated Charge ',
       ' Outpatient Gross Charge ', ' Outpatient Negotiated Charge ',
       'TabName', 'Quantity', 'Payer', 'Plan(s)',
       ' Inpatient Discounted Charge ', ' Outpatient Discounted Charge ',
       'Plan ', '', 'Product', ' Gross Charge - Facility ',
       ' Negotiated Charge - Facility ', ' Gross Charge - Non-Facility ',
       ' Negotiated Charge - Non-Facility ', 'Procedure External ID',
       'Procedure External ID RECORD NAME', 'Procedure Modifier', 'Price',
       'Discounted Cash Price', 'Service Description', 'Codes', 'Methodology',
       'Gross Charge', 'Aetna Commercial', 'Aetna Medicare',
       'BCBS NC Commercial ', 'BCBS NC Medicare', 'Cigna Commercial ',
       'Cigna Medicare', 'United Commercial ',
       'Veteran's Administration Community Care Network',
       'Medcost Commercial ', 'BCBS SC Individual ', 

# Test Bed - Wake


In [8]:
df = pd.read_csv("/home/raff/Lexington-Medical-Center-Transparency-12312020-FINAL.csv", dtype = str)

In [6]:
df = tools.read("/home/raff/wake-davie.csv", skiprows = 0, nrows = None)

In [7]:
df.head()

Unnamed: 0,Inpatient/Outpatient,DRG,Revenue Code,CPT,NDC,Procedure Description,Gross Charge,Discounted Cash Price,Minimum Negotiated Charge,Maximum Negotiated Charge,Aetna Managed Care,Aetna Medicare,Aetna Wholehealth,"BCBS\n(PPO, State Health, Federal Employees, Blue Select)",Blue Local Group,Blue Local Individual,Blue Medicare,Blue Value,Cigna Healthsprings,Cigna Managed Care,Coventry Wellpath,Coventry Firsthealth,Healthgram,Health Team Advantage,Humana Choicecare,Humana Medicare,Medcost,Medcost Ultra,UHC Managed Care,UHC Medicare,Wellcare Medicare
0,Inpatient,42,,,,"PERIPHERAL, CRANIAL NERVE AND OTHER NERVOUS SY...",49303,24652,12068,46838,22827,12068,18193,20559,17108,12770,12068,18637,12068,20757,27610,46838,34512,12068,44373,12189,24947,23715,21003,12068,12309
1,Inpatient,56,,,,DEGENERATIVE NERVOUS SYSTEM DISORDERS WITH MCC,45877,22939,11882,43583,21241,14004,16929,19131,15919,11882,14004,17342,14004,19314,25691,43583,32114,14004,41289,14144,23214,22067,19544,14004,14284
2,Inpatient,57,,,,DEGENERATIVE NERVOUS SYSTEM DISORDERS WITHOUT MCC,26113,13057,6763,24808,12090,8094,9636,10889,9061,6763,8094,9871,8094,10994,14623,24808,18279,8094,23502,8175,13213,12560,11124,8094,8256
3,Inpatient,64,,,,INTRACRANIAL HEMORRHAGE OR CEREBRAL INFARCTION...,36262,18131,9392,34449,16789,12248,13381,15121,12583,9392,12248,13707,12248,15266,20307,34449,25383,12248,32635,12370,18348,17442,15447,12248,12493
4,Inpatient,65,,,,INTRACRANIAL HEMORRHAGE OR CEREBRAL INFARCTION...,24652,12326,6385,23419,11414,6519,9097,10280,8554,6385,6519,9318,6519,10379,13805,23419,17256,6519,22187,6584,12474,11858,10502,6519,6649


In [9]:
df2 = tools.transform(tools.process(df), fixed_start = 0, fixed_end = 6)

In [22]:
df2["NDC"].value_counts()

                 7127175
00409-6102-02        575
00338-0049-04        550
00409-9093-32        475
00143-9587-01        425
                  ...   
00270-1411-25         25
13533-0634-02         25
68455-0107-64         25
60258-0160-01         25
08884-4301-00         25
Name: NDC, Length: 918, dtype: int64