Author: Marissa Munoz-Ruiz <br>
SLU Capstone: HDS 5960 <br>
Databricks Notebook: SLU_Capstone_Preprocessing  

---
##### Goal of Script: Import & Preprocess CMS Cost Report Data for ML Models 

* Use UUIDs csv file retrieved from CMS website (see SLU_Capstone_Selenium) to import data
* Perform data cleaning and feature selection to produce dataframe for Machine Learning

In [0]:
#See https://data.cms.gov/provider-compliance/cost-report/hospital-provider-cost-report#details and 
#"https://data.cms.gov/provider-compliance/cost-report/hospital-provider-cost-report/api-docs for information related to the data

#https://scrapfly.io/blog/web-scraping-with-selenium-and-python/
from pyspark.sql.functions import concat, col, lit

version_dir = "/FileStore/tables/URL_Versions.csv"
version_df = spark.read.format("csv").options(header='True',delimiter=',').load(version_dir)

api_url = 'https://data.cms.gov/data-api/v1/dataset/'
uuid_df = (version_df
           .withColumnRenamed('version','uuid')
           .withColumn('url',concat(lit(api_url),col('uuid'),lit('/data.json')))
          )

uuid_df.display()

year,uuid,url
2019,6ebd03b1-ff48-4994-94ed-0a54e90c1bd6,https://data.cms.gov/data-api/v1/dataset/6ebd03b1-ff48-4994-94ed-0a54e90c1bd6/data.json
2018,90869abf-c649-4d65-84b3-4d6a1b568b69,https://data.cms.gov/data-api/v1/dataset/90869abf-c649-4d65-84b3-4d6a1b568b69/data.json
2017,b2a1e8c3-62c3-4c47-94b2-5fa16b122a4d,https://data.cms.gov/data-api/v1/dataset/b2a1e8c3-62c3-4c47-94b2-5fa16b122a4d/data.json
2016,2981f550-653f-46a6-a5a5-06a3408eb245,https://data.cms.gov/data-api/v1/dataset/2981f550-653f-46a6-a5a5-06a3408eb245/data.json
2015,73e66edc-0b70-4e88-b1af-7d2b98a243f5,https://data.cms.gov/data-api/v1/dataset/73e66edc-0b70-4e88-b1af-7d2b98a243f5/data.json
2014,9855c8b2-1514-47f8-a3ed-c34c2d41eed3,https://data.cms.gov/data-api/v1/dataset/9855c8b2-1514-47f8-a3ed-c34c2d41eed3/data.json
2013,7d1090ac-9d79-47d6-b42d-533a1f3edd7a,https://data.cms.gov/data-api/v1/dataset/7d1090ac-9d79-47d6-b42d-533a1f3edd7a/data.json
2012,9f3eee40-cdbc-4082-af3f-30e1807399b9,https://data.cms.gov/data-api/v1/dataset/9f3eee40-cdbc-4082-af3f-30e1807399b9/data.json
2011,db36eabb-2344-4053-a579-6fa48602bb29,https://data.cms.gov/data-api/v1/dataset/db36eabb-2344-4053-a579-6fa48602bb29/data.json


In [0]:
from pyspark.sql import SparkSession, DataFrame
from urllib.request import urlopen
from functools import reduce
import json

# Use pyspark to read json from url
# https://stackoverflow.com/questions/13921910/python-urllib2-receive-json-response-from-url
def json_to_DF(url):
    spark = SparkSession.builder.getOrCreate()
    response = urlopen(url)
    jsonData = json.load(response)   
    rdd = spark.sparkContext.parallelize(jsonData)
    df = spark.read.json(rdd)
    return df


In [0]:
from collections import Counter

# Create list of url & year from UUID dataframe
uuid_ls = uuid_df.rdd.map(lambda x: x['url']).collect()
year_ls = uuid_df.rdd.map(lambda x: x['year']).collect()

cols_ls = []
cols_len = []

# Import data & get column names 
for url in uuid_ls:
    col_names = json_to_DF(url).columns
    cols_ls.append(col_names)
    cols_len.append(len(col_names))

# Get occurrence of each column name
cols_flat = [element for innerList in cols_ls for element in innerList]
cols_occurrence = Counter(cols_flat)

# Get columns occurring across all dataset years 
common_cols = [k for k,v in cols_occurrence.items() if v == len(cols_len)]

In [0]:
df_ls = []

for ind, uuid in enumerate(uuid_ls):
    df = json_to_DF(uuid)
    df = df.select(*common_cols).withColumn("data_year", lit(year_ls[ind]))
    df_ls.append(df)
    
data_df = reduce(DataFrame.unionAll, df_ls)
data_df = data_df.toDF(*[c.lower() for c in data_df.columns]) #make column names lowercase

data_rows = data_df.count()
data_cols = len(data_df.columns)

print("Number of rows: {}".format(data_df.count()))
print("Number of cols: {}".format(len(data_df.columns)))

Number of rows: 55789
Number of cols: 116


In [0]:
from pyspark.sql.functions import when
from pyspark.sql.types import StringType

# convert empty rows to null
data_new = data_df.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in data_df.columns])

# list of irrevalent numerical columns         
num_cols = [ 
             'hospital name',
             'street address',
             'city',
             'state code',
             'zip code',
             'county',
             'medicare cbsa number',
             'fiscal year begin date',
             'fiscal year end date',
             'total days title v',
             'total days title xviii',
             'total days title xix',
             'number of beds',
             'total discharges title v',
             'total discharges title xviii',
             'total discharges title xix',
             'total days title v + total for all subproviders',
             'total days title xviii + total for all subproviders',
             'total days title xix + total for all subproviders',
             'number of beds + total for all subproviders',
             'total bed days available + total for all subproviders',
             'total discharges title v + total for all subproviders',
             'total discharges title xviii + total for all subproviders',
             'total discharges title xix + total for all subproviders',
             'total discharges (v + xviii + xix + unknown)',
             'total days (v + xviii + xix + unknown)',
             'hospital total days title v for adults & peds',
             'hospital total days title xviii for adults & peds',
             'hospital total days title xix for adults & peds',
             'hospital number of beds for adults & peds',
             'hospital total discharges title v for adults & peds',
             'hospital total discharges title xviii For adults & peds',
             'hospital total discharges title xix for adults & peds',
             'cost of charity care',
             'inpatient Total Charges',
             'combined Outpatient + Inpatient Total Charges',
             'other current assets',
             'total current assets',
             'total fixed assets',
             'other assests',
             'total other assets',
             'other current liabilities',
             'total current liabilities',
             'Other long term liabilities',
             'total long term liabilities',
             'general fund assets',
             'general fund liabilities',
             'total liabilities and fund balances',
             'allowable dsh percentage',
             'drg amounts other than outlier payments',
             'drg amounts before October 1',
             'drg amounts after October 1',
             'outlier payments for discharges',
             'outpatient revenue',
             'inpatient revenue',
             'total costs',
             'total charges',
             'net patient revenue',
             'less total operating expenses',
             'total other income',
             'total income',
             'total other expenses',
             'rpt_rec_num',
             'data_year'
            ]

#list of factor/categorical columns 
factor_cols = ['ccn facility type', 
               'provider ccn', 
               'rural versus urban',
               'provider type',
               'type of control'
              ]


# drop irrelevant columns and rows
data_clean = (data_new
              .drop(*num_cols, *factor_cols)
              .filter(col('gross revenue').isNotNull())
             )

clean_rows = data_clean.count()
clean_cols = len(data_clean.columns)

print("Number of rows: {}".format(clean_rows))
print("Number of cols: {}".format(clean_cols))

Number of rows: 53508
Number of cols: 60


In [0]:
from pyspark.sql.functions import isnull, count, round
import pandas as pd

#check how many non-null values there are in each column
null_perc = data_clean.select([count(when(isnull(c), c)).alias(c) for c in data_clean.columns])

#convert pyspark to pdf and transform columns
null_perc_pdf = null_perc.toPandas()
null_perc_pdf = pd.melt(null_perc_pdf)
null_perc_pdf.columns = ["column","count"]

#get perc of null values 
null_perc_melt = spark.createDataFrame(null_perc_pdf)
total_rows = data_clean.count()

null_perc_df = (null_perc_melt
                .withColumn('perc',round((col('count')/total_rows)*100,2))
                .orderBy(col('perc').desc())
               )

null_perc_df.display()




column,count,perc
unsecured loans,51093,95.49
notes receivable,50577,94.52
wage-related costs (rhc/fqhc),50000,93.44
health information technology designated assets,48531,90.7
wage related costs for interns and residents,47023,87.88
deferred income,46511,86.92
mortgage payable,45548,85.12
net revenue from stand-alone schip,44518,83.2
stand-alone schip charges,44405,82.99
minor equipment depreciable,44298,82.79


In [0]:
#find columns that have 50% or more null values within the column
null_cols = null_perc_df.filter(col('perc') > 25)
null_cols_ls=null_cols.rdd.map(lambda x: x['column']).collect()

data_clean_nulls = data_clean.drop(*null_cols_ls)

data_clean_nulls_rows = data_clean_nulls.count()
data_clean_nulls_cols = len(data_clean_nulls.columns)

print("Number of rows: {}".format(data_clean_nulls_rows))
print("Number of cols: {}".format(data_clean_nulls_cols))

Number of rows: 53508
Number of cols: 32


In [0]:
#remove spaces from column names 
model_data = data_clean_nulls.select([col(c).alias(c.replace('+', '')
                                                    .replace(':', '')
                                                    .replace("'", '')
                                                    .replace(',', '')
                                                    .replace('-', '')
                                                    .replace('/', '')
                                                    .replace('(','')
                                                    .replace(')','')
                                                    .replace('(','')
                                                    .replace(')','')
                                                    .replace(' ','_')
                                                    .replace('__','_')) for c in data_clean_nulls.columns])   

#convert pyspark dataframe into pandas dataframe
model_data_pdf = model_data.toPandas()

In [0]:
#https://machinelearningmastery.com/feature-selection-for-regression-data/
#https://www.linkedin.com/pulse/feature-selection-using-filter-methods-runa-veigas

In [0]:
import numpy as np
from patsy import dmatrices
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_regression
from matplotlib import pyplot

#patsy handles one-hot encoding 
## Create formula for all variables in model
vars_remove = ['gross_revenue']
vars_left = set(model_data_pdf.columns) - set(vars_remove)
formula = "gross_revenue ~ " + " + ".join(vars_left)

## Use Patsy to create model matrices
Y,X = dmatrices(formula,model_data_pdf,return_type='dataframe')

## Split Data into training and sample
X_train, X_test, Y_train, Y_test = train_test_split(X,
                                                    np.ravel(Y), # prevents dimensionality error later!
                                                    test_size=0.20,
                                                    random_state=30)

print('Train', X_train.shape, Y_train.shape)
print('Test', X_test.shape, Y_test.shape)

Train (18248, 32) (18248,)
Test (4563, 32) (4563,)


In [0]:
formula

Out[11]: 'gross_revenue ~ total_bad_debt_expense + medicaid_charges + cost_to_charge_ratio + buildings + major_movable_equipment + salaries_wages_and_fees_payable + total_unreimbursed_and_uncompensated_care + net_income + cash_on_hand_and_in_banks + total_assets + cost_of_uncompensated_care + depreciation_cost + inventory + other_assets + total_fund_balances + total_salaries_from_worksheet_a + less_total_operating_expense + total_days_v_xviii_xix_unknown_total_for_all_subproviders + accounts_payable + total_bed_days_available + less_contractual_allowance_and_discounts_on_patients_accounts + net_revenue_from_medicaid + net_income_from_service_to_patients + outpatient_total_charges + overhead_nonsalary_costs + prepaid_expenses + total_discharges_v_xviii_xix_unknown_total_for_all_subproviders + fte_employees_on_payroll + accounts_receivable + general_fund_balance + total_liabilities'

In [0]:
# feature selection function
def select_features(X_train, Y_train, X_test):
	# configure to select all features
	fs = SelectKBest(score_func=f_regression, k='all')
	# learn relationship from training data
	fs.fit(X_train, Y_train)
	# transform train input data
	X_train_fs = fs.transform(X_train)
	# transform test input data
	X_test_fs = fs.transform(X_test)
	return X_train_fs, X_test_fs, fs

# apply feature selection function 
X_train_fs, X_test_fs, fs = select_features(X_train, Y_train, X_test)

# what are scores for the features
feature_ls =[]

for i in range(len(fs.scores_)):
    feature_ls.append([X.columns[i],fs.scores_[i]])
    
feature_pdf = pd.DataFrame(feature_ls, columns = ['feature','score'])
feature_desc_pdf = feature_pdf.sort_values(by=['score'],ascending=False).reset_index().drop('index',axis=1)
feature_desc_pdf

  corr /= X_norms
  F = corr ** 2 / (1 - corr ** 2) * degrees_of_freedom


Unnamed: 0,feature,score
0,less_contractual_allowance_and_discounts_on_pa...,872841.844026
1,outpatient_total_charges,294390.117277
2,overhead_nonsalary_costs,73356.805913
3,less_total_operating_expense,71238.323771
4,total_salaries_from_worksheet_a,50450.571723
5,medicaid_charges,45115.986847
6,total_bed_days_available,44753.895495
7,total_discharges_v_xviii_xix_unknown_total_for...,44139.083608
8,depreciation_cost,34046.954648
9,inventory,30812.506232


In [0]:
avg_score = feature_desc_pdf['score'].mean()
top_features = feature_desc_pdf[feature_desc_pdf['score'] > avg_score]
#top_features
#top_features.plot(kind = 'bar', title='Top Features')

In [0]:
top_feature_ls = top_features['feature'].values.tolist()
final_vars = top_feature_ls + ['gross_revenue']
MLdata_pdf = model_data_pdf[final_vars]
#MLdata_pdf

In [0]:
MLdata_pdf[['gross_revenue']].describe()

Unnamed: 0,gross_revenue
count,53508.0
mean,540965200.0
std,1082218000.0
min,-177031900.0
25%,38115930.0
50%,127172800.0
75%,591822500.0
max,29390140000.0
