In [2]:
import pandas as pd
import numpy as np
import requests
import json
import os
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import text
import time
from tqdm.notebook import tqdm
from io import StringIO
from google.cloud import storage
import redshift_connector
import psycopg2

In [3]:
#setup redshift conncetion
redshift_endpoint = 'different-states-hospital-price.381492032483.us-east-1.redshift-serverless.amazonaws.com'
redshift_user = 'admin'
redshift_password = 'Admin12345!'
port = 5439
dbname = 'different-states-hospital-price'

engine_string = "postgresql+psycopg2://%s:%s@%s:%d/%s" \
% (redshift_user, redshift_password, redshift_endpoint, port, dbname)
engine = create_engine(engine_string)

In [4]:
sql = """
SELECT top 10 *
;
""" 

In [5]:
df = pd.read_sql_query(text(sql), engine)

OperationalError: (psycopg2.OperationalError) connection to server at "different-states-hospital-price.381492032483.us-east-1.redshift-serverless.amazonaws.com" (52.204.109.57), port 5439 failed: Connection timed out (0x0000274C/10060)
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: http://sqlalche.me/e/14/e3q8)

In [25]:
#load cpt_hcpcs.json into dataframe
# Set service account key
service_account_key_path = 'different-state-hospital-price-fd662d2f48c2.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_key_path

# Specify JSON file path
bucket_path = 'gs://different-state-hospital-prices/cpt_hcpcs.json'

# Read JSON file into DataFrame
df_cpt_hcpcs = pd.read_json(bucket_path, lines=True)

print(df_cpt_hcpcs.info())
df_cpt_hcpcs.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3287818 entries, 0 to 3287817
Data columns (total 3 columns):
 #   Column             Dtype 
---  ------             ----- 
 0   code               object
 1   short_description  object
 2   long_description   object
dtypes: object(3)
memory usage: 75.3+ MB
None


Unnamed: 0,code,short_description,long_description
0,00000A,DVC REVASC 6X20MM 200CM,
1,00001U,RBC DNA HEA 35 AG PLA,
2,"00001U,1",RBC DNA HEA 35 AG PLA,
3,00013,PT INDIVIDUAL GYM,
4,0001A,HC ADM PFIZER SARSCOV2 30MCG/0.3ML 1ST,


In [26]:
#cpt_hcpcs transformation
df_cpt_hcpcs = df_cpt_hcpcs.astype({
    'code': 'string',
    'short_description': 'string',
    'long_description': 'string'
})
df_cpt_hcpcs = df_cpt_hcpcs.fillna('None')

df_cpt_hcpcs = df_cpt_hcpcs.drop_duplicates()

print(df_cpt_hcpcs.info())
df_cpt_hcpcs.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3287818 entries, 0 to 3287817
Data columns (total 3 columns):
 #   Column             Dtype 
---  ------             ----- 
 0   code               string
 1   short_description  string
 2   long_description   string
dtypes: string(3)
memory usage: 75.3 MB
None


Unnamed: 0,code,short_description,long_description
0,00000A,DVC REVASC 6X20MM 200CM,
1,00001U,RBC DNA HEA 35 AG PLA,
2,"00001U,1",RBC DNA HEA 35 AG PLA,
3,00013,PT INDIVIDUAL GYM,
4,0001A,HC ADM PFIZER SARSCOV2 30MCG/0.3ML 1ST,


In [27]:
#load hospitals.json into dataframe
bucket_path = 'gs://different-state-hospital-prices/hospitals.json'
df_hospitals = pd.read_json(bucket_path, lines=True)

print(df_hospitals.info())
df_hospitals.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1400 entries, 0 to 1399
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   npi_number      1400 non-null   object 
 1   name            1400 non-null   object 
 2   url             1400 non-null   object 
 3   street_address  1351 non-null   object 
 4   city            1379 non-null   object 
 5   state           1377 non-null   object 
 6   zip_code        1357 non-null   object 
 7   publish_date    610 non-null    float64
dtypes: float64(1), object(7)
memory usage: 87.6+ KB
None


Unnamed: 0,npi_number,name,url,street_address,city,state,zip_code,publish_date
0,1003139775.0,HCA Virginia,https://hcavirginia.com/about/legal/pricing-tr...,901 E. Cary St Suite 210,Richmond,VA,,1609459000000.0
1,1003260480.0,Brookwood Baptist Medical Center,https://www.brookwoodbaptisthealth.com/docs/gl...,2010 Brookwood Medical Center Dr.,Birmingham,AL,35209,
2,1003281452.0,Henderson Hospital,https://uhsfilecdn.eskycity.net/ac/henderson-h...,1050 West Galleria Drive,Henderson,NV,89011,1609459000000.0
3,1003362997.0,CHI Health St. Elizabeth,https://www.chihealth.com/content/dam/chi-heal...,555 S. 70Th St.,Lincoln,NE,68510,1609459000000.0
4,1003389206.0,Merrill pioneer hospital,https://www.avera.org/app/files/public/79147/m...,"1100 S 10th Ave, Ste 100",Rock Rapids,IA,51246-2020,


In [29]:
#hospitals transformation
#drop the uncessary column
df_hospitals = df_hospitals.drop(columns = ['publish_date'])

#Unify npi_number
df_hospitals['npi_number'] = df_hospitals['npi_number'].astype(str)
df_hospitals['npi_number'] = df_hospitals['npi_number'].str.replace(r'\D', '', regex=True)
df_hospitals['npi_number'] = df_hospitals['npi_number'].apply(lambda x: x[:10] if len(x) >= 10 else None).astype('string')

#unify zip_code
df_hospitals['zip_code'] = df_hospitals['zip_code'].astype('string')
df_hospitals['zip_code'] = df_hospitals['zip_code'].str.replace('.0','')

#Reference data for valid states
valid_states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']

# Validates standard 5-digit or ZIP+4 formats
zip_code_pattern = r'^\d{5}(-\d{4})?$'

# Filter DataFrame based on valid states and zip code pattern
df_hospitals = df_hospitals[df_hospitals['state'].isin(valid_states) & df_hospitals['zip_code'].str.match(zip_code_pattern)]

# covert other types using astype
df_hospitals = df_hospitals.astype({
    'name': 'string',
    'url': 'string',
    'street_address':'string',
    'city':'string',
    'state':'string',
})

df_hospitals = df_hospitals.fillna('None')

df_hospitals = df_hospitals.drop_duplicates(subset=['npi_number'])
print(df_hospitals.info())
df_hospitals.head()

<class 'pandas.core.frame.DataFrame'>
Index: 1226 entries, 1 to 1399
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   npi_number      1226 non-null   string
 1   name            1226 non-null   string
 2   url             1226 non-null   string
 3   street_address  1226 non-null   string
 4   city            1226 non-null   string
 5   state           1226 non-null   string
 6   zip_code        1226 non-null   string
dtypes: string(7)
memory usage: 76.6 KB
None


Unnamed: 0,npi_number,name,url,street_address,city,state,zip_code
1,1003260480,Brookwood Baptist Medical Center,https://www.brookwoodbaptisthealth.com/docs/gl...,2010 Brookwood Medical Center Dr.,Birmingham,AL,35209
2,1003281452,Henderson Hospital,https://uhsfilecdn.eskycity.net/ac/henderson-h...,1050 West Galleria Drive,Henderson,NV,89011
3,1003362997,CHI Health St. Elizabeth,https://www.chihealth.com/content/dam/chi-heal...,555 S. 70Th St.,Lincoln,NE,68510
4,1003389206,Merrill pioneer hospital,https://www.avera.org/app/files/public/79147/m...,"1100 S 10th Ave, Ste 100",Rock Rapids,IA,51246-2020
5,1003811290,Providence Health,http://www.yourprovidencehealth.com/Content/Up...,2435 Forest Drive,Columbia,SC,29204


In [30]:
#load prices.json into dataframe
bucket_path = 'gs://different-state-hospital-prices/prices.json'

df_prices = pd.DataFrame()

# Create a JSON reader
json_reader = pd.read_json(bucket_path, lines=True, chunksize=10000)

# Process each chunk
for chunk in json_reader:
    # Clean 'npi_number' column
    chunk['npi_number'] = chunk['npi_number'].astype(str)
    chunk['npi_number'] = chunk['npi_number'].str.replace(r'\D', '', regex=True)
    chunk['npi_number'] = chunk['npi_number'].apply(lambda x: x[:10] if len(x) >= 10 else None).astype('string')

    # change dtype
    chunk = chunk.astype({
        'code': 'string',
        'payer': 'string'
    })
    
    # Append to dataframe
    df_prices = pd.concat([df_prices, chunk], ignore_index=True)
    
    print(len(df_prices))
    
df_prices = df_prices.fillna('none')

df_prices = df_prices.drop_duplicates()

print(df_prices.head())
print(df_prices.info())

10000
     code  npi_number payer     price
0  00000A  1053358010  CASH  75047.00
1  00000A  1336186394  CASH  75047.00
2  00001U  1003139775  CASH    457.23
3  00001U  1053824292  CASH    972.00
4  00001U  1417901406  CASH    296.00
<class 'pandas.core.frame.DataFrame'>
Index: 9971 entries, 0 to 9999
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   code        9971 non-null   string 
 1   npi_number  9971 non-null   string 
 2   payer       9971 non-null   string 
 3   price       9971 non-null   float64
dtypes: float64(1), string(3)
memory usage: 389.5 KB
None


In [31]:
# Check for duplicates in the npi_number column
if df_hospitals['npi_number'].duplicated().any():
    print("Duplicates found in 'npi_number'.")
    # Optionally, you can see the duplicated entries
    print(df_hospitals[df_hospitals['npi_number'].duplicated(keep=False)])
else:
    print("No duplicates found in 'npi_number'.")

No duplicates found in 'npi_number'.


In [32]:
#parper for load
# Adding surrogate keys for each dimension table
df_cpt_hcpcs['cpt_id'] = np.arange(1, len(df_cpt_hcpcs) + 1)
df_hospitals['hospital_id'] = np.arange(1, len(df_hospitals) + 1)

# Mapping foreign keys in the fact table
df_prices['cpt_id'] = df_prices['code'].map(df_cpt_hcpcs.set_index('code')['cpt_id'])
df_prices['hospital_id'] = df_prices['npi_number'].map(df_hospitals.set_index('npi_number')['hospital_id'])

# Creating a unique list of payers and mapping payer_id
unique_payers = pd.DataFrame(df_prices['payer'].unique(), columns=['payer'])
unique_payers['payer_id'] = np.arange(1, len(unique_payers) + 1)
df_prices['payer_id'] = df_prices['payer'].map(unique_payers.set_index('payer')['payer_id'])

# Prepare final DataFrames for the new schema
dim_cpt_hcpcs = df_cpt_hcpcs[['cpt_id', 'code', 'short_description', 'long_description']]
dim_hospitals = df_hospitals[['hospital_id', 'npi_number', 'name', 'url', 'street_address', 'city', 'state', 'zip_code']]
dim_payer = unique_payers[['payer_id', 'payer']]
fact_prices = df_prices[['cpt_id', 'hospital_id', 'payer_id', 'price']]

print(dim_cpt_hcpcs.info())
print(dim_hospitals.info())
print(dim_payer.info())
print(fact_prices.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3287818 entries, 0 to 3287817
Data columns (total 4 columns):
 #   Column             Dtype 
---  ------             ----- 
 0   cpt_id             int32 
 1   code               string
 2   short_description  string
 3   long_description   string
dtypes: int32(1), string(3)
memory usage: 87.8 MB
None
<class 'pandas.core.frame.DataFrame'>
Index: 1226 entries, 1 to 1399
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   hospital_id     1226 non-null   int32 
 1   npi_number      1226 non-null   string
 2   name            1226 non-null   string
 3   url             1226 non-null   string
 4   street_address  1226 non-null   string
 5   city            1226 non-null   string
 6   state           1226 non-null   string
 7   zip_code        1226 non-null   string
dtypes: int32(1), string(7)
memory usage: 81.4 KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1