## II -Fetch Distinct Values from MongoDB
#### 1) Importing libraries

In [583]:
import pandas as pd
from pymongo import MongoClient
import psycopg2
from sqlalchemy import create_engine
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import KNNImputer


print("Imported all the required libraries")


Imported all the required libraries


#### 2) Connect/Initialize the MongoDB hosted on  "cyclades.okeanos-global.grnet.gr"

In [3]:
db_cl = MongoClient('83.212.82.56', 27017)
db = db_cl.DAP_ProjectDB
print("MongoDB instance initialized!")


MongoDB instance initialized!


#### 3) HRRP Collection : Identifing distinct measure names to pivot data from JSON.

In [20]:
# collection hrrp instance
collection_hrrp = db.hrrp
#collection hacrp instance
collection_hacrp = db.hacrp
#collection hvbp instance
collection_hvbp = db.hvbp

# fetching distinct types of measure name
hrrp_measure_list_orig = db['hrrp'].distinct('measure_name')
print(f"List of measure names in HRRP : \n{hrrp_measure_list_orig}")
# replacing "-" with "_" in measure names
hrrp_measure_list = [w.replace('-', '_') for w in hrrp_measure_list_orig]
print(f"\nList of measure names in HRRP  after replacing '-' with '_' : \n{hrrp_measure_list}")

List of measure names in HRRP : 
[]

List of measure names in HRRP  after replacing '-' with '_' : 
[]


#### 6) Creating the PostgreSQL database (dap_medicare) hosted on the "cyclades.okeanos-global.grnet.gr"

In [5]:
try:
    dbConnection = psycopg2.connect(
    user = "dap",
    password = "dap",
    host = "83.212.82.56",
    port = "5432",
    database = "postgres")
    dbConnection.set_isolation_level(0) # AUTOCOMMIT
    dbCursor = dbConnection.cursor()

    # fetching the list of databases/schema present in the postgres instance
    dbCursor.execute("SELECT datname from pg_database")
    rows = dbCursor.fetchall()
    dbNames = []
    for row in rows:
        dbNames.append(row[0])
    print(f"\nAll databases present previously :\n{dbNames}")

    # checking if the database already exists/ if so then dropping the same
    if("dap_medicare" in dbNames) :
        try:
            dbCursor.execute("DROP DATABASE dap_medicare")
            print("dap_medicare schema dropped!")
        except:
            print("Error while dropping the database!")

    # checking the database schema names after dropping
    try:
        dbCursor.execute("SELECT datname from pg_database")
    except:
        print("Error while fetching database names")
    rows = dbCursor.fetchall()
    dbNames = []
    for row in rows:
        dbNames.append(row[0])
    print(f"\nAll databases present after check :\n{dbNames}")

    # Creating new schema dap_medicare
    try:
        dbCursor.execute('CREATE DATABASE dap_medicare;')
        print("\nCreated a new db schema 'dap_medicare'")
    except:
        print("Error while creating dap_medicare database!")
    dbCursor.close()
except (psycopg2.Error) as dbError :
    print("Error while connecting to PostgreSQL", dbError)
except Exception as exc :
    print("Error while creating the database schema in PostgreSQL", exc)
finally:
    if(dbConnection): dbConnection.close()


All databases present previously :
['postgres', 'template1', 'template0', 'dap_medicare']
dap_medicare schema dropped!

All databases present after check :
['postgres', 'template1', 'template0']

Created a new db schema 'dap_medicare'


#### 7) Creating strings to create dynamic tables with base columns for different collections

In [9]:
createStringHRRP = """
DROP TABLE IF EXISTS "{tbl_Name}" CASCADE ;
CREATE TABLE "{tbl_Name}"(
date_time timestamp,
hospital_name VARCHAR(100),
provider_id integer PRIMARY KEY,
state VARCHAR(2),
measure_name VARCHAR,
number_of_discharges integer,
excess_readmission_ratio float,
predicted_readmission_rate float,
expected_readmission_rate float,
number_of_readmissions integer,
start_date timestamp,
end_date timestamp
);
"""
createStringHACRP = """
DROP TABLE IF EXISTS hacrp CASCADE ;
CREATE TABLE hacrp(
hospital_name VARCHAR(100),
provider_id integer PRIMARY KEY,
state VARCHAR(2),
fiscal_year integer,
psi_90__start_date timestamp,
psi_90_end_date timestamp,
psi_90_w_z_score float,
clabsi_w_z_score float,
cauti_w_z_score float,
ssi_w_z_score float,
mrsa_w_z_score float,
cdi_w_z_score float,
hai_measures_start_date timestamp,
hai_measures_end_date timestamp,
total_hac_score float,
payment_reduction VARCHAR
);
"""
createStringHVBP = """
DROP TABLE IF EXISTS hvbp CASCADE ;
CREATE TABLE hvbp(
hospital_name VARCHAR(100),
provider_number integer PRIMARY KEY,
address VARCHAR,
city VARCHAR,
state VARCHAR(2),
zip_code integer,
county_name VARCHAR,
mort_30_ami_achievement_threshold float,
mort_30_ami_benchmark float,
mort_30_ami_baseline_rate float,
mort_30_ami_performance_rate float,
mort_30_ami_achievement_points integer,
mort_30_ami_improvement_points integer,
mort_30_ami_measure_score integer,
mort_30_hf_achievement_threshold float,
mort_30_hf_benchmark float,
mort_30_hf_baseline_rate float,
mort_30_hf_performance_rate float,
mort_30_hf_achievement_points integer,
mort_30_hf_improvement_points integer,
mort_30_hf_measure_score integer,
mort_30_pn_achievement_threshold float,
mort_30_pn_benchmark float,
mort_30_pn_baseline_rate float,
mort_30_pn_performance_rate float,
mort_30_pn_achievement_points integer,
mort_30_pn_improvement_points integer,
mort_30_pn_measure_score integer,
comp_hip_knee_achievement_threshold float,
comp_hip_knee_benchmark float,
comp_hip_knee_baseline_rate float,
comp_hip_knee_performance_rate float,
comp_hip_knee_achievement_points integer,
comp_hip_knee_improvement_points integer,
comp_hip_knee_measure_score integer,
lat float,
long float);
"""
#print(f"Dynamic create table String HRRP : \n{createStringHRRP}")

#### 8) Creating tables into postgreSQL database

In [12]:
try:
    dbConnection = psycopg2.connect(
    user = "dap",
    password = "dap",
    host = "83.212.82.56",
    port = "5432",
    database = "dap_medicare")
    dbConnection.set_isolation_level(0) # AUTOCOMMIT
    dbCursor = dbConnection.cursor()

    # creating tables for all collections
    for i in range(len(hrrp_measure_list)):
        dbCursor.execute(createStringHRRP.format(tbl_Name = hrrp_measure_list[i]))
        print(f"Table {hrrp_measure_list[i]} created")
    dbCursor.execute(createStringHACRP)
    print(f"Table hacrp created")
    dbCursor.execute(createStringHVBP)
    print(f"Table hvbp created")
    dbCursor.close()
    print(f"\nSuccessfully created {len(hrrp_measure_list)} tables for HRRP collection")
    print(f"\nSuccessfully created 1 table for HACRP collection")
    print(f"\nSuccessfully created 1 table for HVBP collection")
except (Exception , psycopg2.Error) as dbError :
    print ("Error while table creation in PostgreSQL : \n", dbError)
finally:
    if(dbConnection): dbConnection.close()


Table READM_30_AMI_HRRP created
Table READM_30_CABG_HRRP created
Table READM_30_COPD_HRRP created
Table READM_30_HF_HRRP created
Table READM_30_HIP_KNEE_HRRP created
Table READM_30_PN_HRRP created
Table hacrp created
Table hvbp created

Successfully created 6 tables for HRRP collection

Successfully created 1 table for HACRP collection

Successfully created 1 table for HVBP collection


### Function to clean the DataFrames

In [614]:
def df_imputer(df_import):
    imputer = KNNImputer(n_neighbors=5)
    df_import2 = pd.DataFrame(imputer.fit_transform(df_import))
    df_import2.columns = df_import.columns
    return df_import2

### Edit Scores and Points in HVBP Table

In [606]:
def df_extract(df_import):
    for column, data in df_import.filter(regex='points$|score$').iteritems():
        df_import[column].replace(to_replace = '\W.*', value= ' ', regex=True, inplace=True)
    return df_import

### Separate Coordinates

In [615]:
def df_coord(df_new):
    lat = []
    lon = []
    for x in df_new['geocoded_column']:
        if isinstance(x,dict):
            lat.append(x['coordinates'][0])
            lon.append(x['coordinates'][1])
        else:
            lat.append(np.nan)
            lon.append(np.nan)
    df_new['latitude']=lat
    df_new['longitude']=lon
    return df_new

#### 9) Inserting data to tables into postgreSQL database


In [544]:
engine = create_engine('postgresql://dap:dap@83.212.82.56:5432/dap_medicare', echo=True)

# dynamically inserting data in measure tables for hrrp
for i in range(len(hrrp_measure_list)):
    cursor = collection_hrrp.find({"measure_name": hrrp_measure_list_orig[i]}, { "_id" : 0, "footnote" : 0}) #Fetch Data from MongoDB and filter with field 'measure_name'
    df = pd.DataFrame(list(cursor))
    df.replace("N/A", np.nan, inplace = True)
    df.replace("Too Few to Report", np.nan, inplace = True)
    df_cleaned = df_imputer(df.iloc[:, 4:-2])
    df.iloc[:, 4:-2]=df_cleaned
    df.to_sql(hrrp_measure_list[i], engine, if_exists = 'append', chunksize = 100, index= False)
    print(f"Insertion completed in {hrrp_measure_list[i]} tables")

ValueError: at least one array or dtype is required

### 10) Inserting Data to HACRP table

In [None]:
#inserting data in hacrp table
cursor = collection_hacrp.find({},{"_id" : 0, "footnote" : 0, "payment_reduction":0, "clabsi_footnote":0, "ssi_footnote":0, "mrsa_footnote":0, "cauti_footnote":0, "cdi_footnote psi_90_footnote":0, "total_hac_footnote":0, "payment_reduction_footnote":0,"cdi_footnote":0,"psi_90_footnote":0
})
df = pd.DataFrame(list(cursor)) 
df.replace("N/A", np.nan, inplace = True)
df.replace("Too Few to Report", np.nan, inplace = True)
df_cleaned = df_imputer(df.iloc[:, 6:12])
df.iloc[:, 6:12]=df_cleaned
df_cleaned = df_imputer(df[['total_hac_score']])
df[['total_hac_score']]=df_cleaned
df.to_sql(hacrp, engine, if_exists = 'append', chunksize = 100, index= False)
print(f"Insertion completed in hacrp table")

### 11) Inserting data into HVBP table

In [None]:
#inserting data in hvbp table
cursor = collection_hvbp.find({},{"_id" : 0, "footnote" : 0})
df = pd.DataFrame(list(cursor))
df.replace("Not Available", -99999, inplace = True)
df = df_extract(df)
df.replace(-99999, np.nan, inplace = True)
df.replace('0.896948(23)', 0.896948, inplace=True)
df.iloc[:,7:-4]=df.iloc[:,7:-4].astype(float)
df_cleaned = df_imputer(df.iloc[:,7:-4])
df.iloc[:,7:-4]=df_cleaned
df = df_coord(df)
df = df.drop(df.iloc[:,35:39], axis=1)
df.to_sql(hvbp, engine, if_exists = 'append', chunksize = 100, index= False)
print(f"Insertion completed in hvbp table")