## ETL Project: USA HOSPITALS
#### Dependencies and Setup

In [None]:
import requests
import psycopg2
import pandas as pd
import config as creds
import sqlalchemy as sqlalchemy_package

#### Extracting Data via API interaction 
* ESRI Dataset: Definitive Healthcare: USA Hospital Beds

In [2]:
# Query URL
url = f"https://services7.arcgis.com/LXCny1HyhQCUSueu/arcgis/rest/services/Definitive_Healthcare_USA_Hospital_Beds/FeatureServer/0/query?where=1%3D1&outFields=*&outSR=4326&f=json"

In [None]:
# Get Hospitals JSON data
response = requests.get(url).json()
hospitals_data = response['features']
hospitals_data[1]['attributes']

In [None]:
# Create Lists of Data Dictionaries
hospitals = []
hospitals_beds = []
hospitals_location = []
hospitals_geometry = []
number = 1
for record in hospitals_data:
    hospital_id = str(number) + "-" + record['attributes']['FIPS']
    number += 1
    # create hospitals dataset
    row = {}
    row['id'] = hospital_id
    row['hospital_name'] = record['attributes']['HOSPITAL_NAME']
    hospitals.append(row)
    # create hospitals beds dataset
    row = {}
    row['id'] = hospital_id
    row['licensed_beds'] = record['attributes']['NUM_LICENSED_BEDS']
    row['icu_beds'] = record['attributes']['NUM_ICU_BEDS']
    row['ventilator_usage'] = record['attributes']['AVG_VENTILATOR_USAGE']
    hospitals_beds.append(row)
    # create hospitals location dataset
    row = {}
    row['id'] = hospital_id
    row['hospital_type'] = record['attributes']['HOSPITAL_TYPE']
    row['hospital_hq_address'] = record['attributes']['HQ_ADDRESS']
    row['hospital_hq_city'] = record['attributes']['HQ_CITY']
    row['hospital_hq_state'] = record['attributes']['HQ_STATE']
    row['zip_code'] = record['attributes']['HQ_ZIP_CODE']
    row['county_name'] = record['attributes']['COUNTY_NAME']
    row['state_name'] = record['attributes']['STATE_NAME']
    hospitals_location.append(row)
    # create hospitals geometry
    row = {}
    row['id'] = hospital_id
    row['lat'] = record['geometry']['x']
    row['lng'] = record['geometry']['y']
    hospitals_geometry.append(row)    

#### Transformation Data
* Converting lists of dictionaries into DataFrames

In [None]:
# Creating DataFrame for Hospitals 
hospitals_geometry_pd = pd.DataFrame(hospitals_geometry)
hospitals_geometry_pd.set_index("id", inplace=True)
hospitals_geometry_pd.head()

In [None]:
# Creating DataFrame for Hospitals Location Data
hospitals_location_pd = pd.DataFrame(hospitals_location)
hospitals_location_pd.set_index("id", inplace=True)
hospitals_location_pd.head()

In [None]:
# Creating DataFrame for Hospitals 
hospitals_beds_pd = pd.DataFrame(hospitals_beds)
hospitals_beds_pd.set_index("id", inplace=True)
hospitals_beds_pd.head()

In [None]:
# Creating DataFrame for Hospitals 
hospitals_pd = pd.DataFrame(hospitals)
hospitals_pd.set_index("id", inplace=True)
hospitals_pd

In [None]:
# Check Number of Unique Hospitals  
hospitals_pd_list = hospitals_pd['hospital_name'].unique()
len(hospitals_pd_list)

### Create database connection

In [None]:
# Create PostgreSQL database connection using credentials form config.sys
database_url = f"postgres://{creds.PGUSER}:{creds.PGPASSWORD}@{creds.PGHOST}:5432/{creds.PGDATABASE}"
engine = sqlalchemy_package.create_engine(database_url,
                connect_args={'options': '-csearch_path={}'.format(creds.DBSCHEMA)})

In [None]:
# Confirm tables
engine.table_names()

### Load DataFrames into database

In [None]:
# Load US hospitals list
hospitals_pd.to_sql(name='us_hospitals', con=engine, if_exists='append', index=True)

In [None]:
# Load US hospitals beds
hospitals_beds_pd.to_sql(name='hospitals_beds', con=engine, if_exists='append', index=True)

In [None]:
# Load US hospitals location
hospitals_location_pd.to_sql(name='hospitals_location', con=engine, if_exists='append', index=True)

In [None]:
# Load US hospitals geometry
hospitals_geometry_pd.to_sql(name='hospitals_geometry', con=engine, if_exists='append', index=True)

### Initial Query Analysis

In [6]:
def load_data(schema='public', table='us_hospitals', query=None):
    if query == None:
        sql_command = "SELECT * FROM {}.{};".format(str(schema), str(table))
    else: sql_command = query
    print ('Query:',sql_command)
    # Load the data
    data = pd.read_sql(sql_command, conn)
    print('Data shape:',data.shape)
    return (data)

In [None]:
# Set up a connection to the postgres server via database driver
conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \
+" password="+ creds.PGPASSWORD
conn=psycopg2.connect(conn_string)

In [None]:
# Create hospitals request for California
hospitals_request = """
    SELECT h.id, h.hospital_name, l.state_name, l.county_name, l.zip_code, l.hospital_type, b.licensed_beds
    FROM us_hospitals AS h
    INNER JOIN hospitals_beds AS b USING (id)
    INNER JOIN hospitals_location AS l USING (id)
    WHERE l.state_name = 'California'
    ORDER BY b.licensed_beds DESC;
    """
hosp_request_result = load_data(query=hospitals_request)
hosp_request_result.set_index("id", inplace=True)
hosp_request_result = hosp_request_result.sort_values('hospital_name')
hosp_request_result

### Adding New Datasets for Analysis (Secondary ETL Process)
* Connection as a GROUPUSER with a GROUPPASSWORD 

In [3]:
# Set up a connection to the postgres server via database driver using group password
conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.GROUPUSER \
+" password="+ creds.GROUPPASSWORD
conn=psycopg2.connect(conn_string)

In [None]:
# Read Hospitals General Information
hospitals_info = pd.read_csv('Resources/Hospital_General_Information.csv')
hospitals_info = hospitals_info.filter(['Facility ID', 'Facility Name', 'State', 'ZIP Code', 'Hospital Ownership'])
hospitals_info = hospitals_info.rename(columns={
                                        'Facility ID': 'facility_id', 'Facility Name': 'hospital_name',
                                        'State':'state_name', 'ZIP Code': 'zip_code', 'Hospital Ownership': 'ownership'})
hospitals_info.set_index("facility_id", inplace=True)
hospitals_info = hospitals_info.sort_values('hospital_name')
hospitals_info.head()

In [None]:
# Read Hospitals Profitability Information
hospitals_finance = pd.read_csv('Resources/hospital-profitability-2009-2013-.csv')
hospitals_finance = hospitals_finance.filter(['Facility Number', 'Facility Name', 'Year', 'County Name',
                                              'Income Statement Amount', 'Amount per Adjusted Patient Day'])
hospitals_finance = hospitals_finance.rename(columns={
                                        'Facility Number': 'facility_num', 'Facility Name': 'hospital_name',
                                        'Year': 'year', 'County Name':'county_name', 
                                        'Income Statement Amount': 'total_income',
                                        'Amount per Adjusted Patient Day': 'adj_patient_income'})
hospitals_finance.set_index("facility_num", inplace=True)
hospitals_finance = hospitals_finance.sort_values('hospital_name')
hospitals_finance

In [None]:
# Read Hospitals Staff Information
hospitals_staff = pd.read_csv('Resources/hospital-staffing-2009-2013-.csv')
hospitals_staff = hospitals_staff.filter(['Facility Number', 'Facility Name', 'Year', 'County Name',
                                              'Hours Type', 'Productive Hours', 'Productive Hours per Adjusted Patient Day'])
hospitals_staff = hospitals_staff.rename(columns={
                                        'Facility Number': 'facility_num', 'Facility Name': 'hospital_name',
                                        'Year': 'year', 'County Name':'county_name', 
                                        'Hours Type': 'hours_type', 'Productive Hours': 'prod_hours',
                                        'Productive Hours per Adjusted Patient Day': 'adj_patient_hours'})
hospitals_staff.set_index("facility_num", inplace=True)
hospitals_staff = hospitals_staff.sort_values('hospital_name')
hospitals_staff

In [None]:
# Read Hospitals Insuarance Info
hospitals_insuarance = pd.read_csv('Resources/healthcare_census.csv')
hospitals_insuarance = hospitals_insuarance.filter(['Provider Id', 'Provider Name', 'Provider Zip Code',
                                              'Average Covered Charges', 'Average Total Payments'])
hospitals_insuarance = hospitals_insuarance.rename(columns={
                                        'Provider Id': 'provider_id', 'Provider Name': 'hospital_name',
                                        'Provider Zip Code': 'zip_code', 'Average Covered Charges': 'avg_cov_charges',
                                        'Average Total Payments': 'avg_tot_payments'})
hospitals_insuarance.set_index("provider_id", inplace=True)
hospitals_insuarance = hospitals_insuarance.sort_values('hospital_name')
hospitals_insuarance.head()

In [None]:
# Confirm tables
engine.table_names()

#### Normalization
* NPI (National Provider Identifier) - unique identification number for health care providers
* Hospital Name
* Composite key: Zip code + address

In [None]:
# Check Hospitals List Inconsistency for California
esri_list = hosp_request_result['hospital_name'].unique().tolist()
esri_upper_list = []
[esri_upper_list.append(i.upper()) for i in esri_list]
staff_list = hospitals_staff['hospital_name'].unique().tolist()
hopsitals_list = []
[hopsitals_list.append(name)for name in staff_list if name in esri_upper_list]
len(hopsitals_list)

In [None]:
esri_upper_list[:20]

In [None]:
staff_list[:10]

* Overall, normalization requires more time and resources

In [None]:
# Load US hospitals info
hospitals_info.to_sql(name='hospitals_info', con=engine, if_exists='append', index=True)

In [None]:
# Load US hospitals_staff
# hospitals_insuarance.to_sql(name='hospitals_insuarance', con=engine, if_exists='append', index=True)