# <font color='orange'>Step 2: Data Understanding</font> 

In [1]:
# Pandas DF operation
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from numpy import array
#pyspark
import pyspark
from pyspark.sql import SparkSession
import warnings
warnings.simplefilter(action='ignore')
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, DoubleType, ShortType, DecimalType
import pyspark.sql.functions as func
from pyspark.sql.functions import isnull
from pyspark.sql.functions import isnan, when, count, col, round
from pyspark.sql.functions import mean
from pyspark.sql.types import Row
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf



# Modeling + Evaluation
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.functions import avg
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import log_loss
from pyspark.sql import Window
from pyspark.sql.functions import rank,sum,col
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorSlicer

In [2]:
# # Findspark can add a startup file to the current IPython profile so that the environment vaiables will 
# # be properly set and pyspark will be imported upon IPython startup
# import findspark
# findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')

In [3]:
# Build a sparksession and build a unique app name
spark=SparkSession.builder.appName('iteration-4: prediction_hospital_readmission_rate ').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/11 21:37:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/11 21:37:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

## <font color='grey'> 2.1: Collect Initial Data</font> 

In [5]:
# Read the data from csv file into a dataframe
diabetic_data=spark.read.option("header","true").csv('dataset_diabetes/diabetic_data.csv')

AnalysisException: Path does not exist: file:/home/ubuntu/Iteration_4:HospitalReadmissionPrediction/Uoa-infosys722/Code/dataset_diabetes/diabetic_data.csv

## <font color='grey'> 2.2: Data Description</font> 

In [None]:
diabetic_data.show()

In [None]:
# diabetic_data.toPandas().head()

## <font color='grey'> 2.3: Explore the Data</font> 

In [None]:
# Check number of rows and columns in the data
row=diabetic_data.count()
cols=len(diabetic_data.columns)
print('Total number of columns are - ', cols)
print('\nTotal number of records are - ', row)

In [None]:
diabetic_data.select('readmitted').groupBy('readmitted').count().sort(col('count').asc()).show()

In [None]:
diabetic_data.printSchema()

In [None]:
# Let's import in the relevant types.
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType,FloatType)


In [None]:
# Then create a variable with the correct structure.
data_schema = [StructField('encounter_id',IntegerType(),True),
              StructField('patient_nbr',IntegerType(),True),
              StructField('race',StringType(),True),
              StructField('gender',StringType(),True),
              StructField('age',StringType(),True),
              StructField('weight',StringType(),True),
              StructField('admission_type_id',IntegerType(),True),
              StructField('discharge_disposition_id',IntegerType(),True),
              StructField('admission_source_id',IntegerType(),True),
              StructField('time_in_hospital',IntegerType(),True),
              StructField('payer_code',StringType(),True),
              StructField('medical_specialty',StringType(),True),
              StructField('num_lab_procedures',IntegerType(),True),
              StructField('num_procedures',IntegerType(),True),
              StructField('num_medications',IntegerType(),True),
              StructField('number_outpatient',IntegerType(),True),
              StructField('number_emergency',IntegerType(),True),
              StructField('number_inpatient',IntegerType(),True),
              StructField('diag_1',StringType(),True),
              StructField('diag_2',StringType(),True),
              StructField('diag_3',StringType(),True),
              StructField('number_diagnoses',IntegerType(),True),
              StructField('max_glu_serum',StringType(),True),
              StructField('A1Cresult',StringType(),True),
              StructField('metformin',StringType(),True),
              StructField('repaglinide',StringType(),True),
              StructField('nateglinide',StringType(),True),
              StructField('chlorpropamide',StringType(),True),
              StructField('glimepiride',StringType(),True),
              StructField('acetohexamide',StringType(),True),
              StructField('glipizide',StringType(),True),
              StructField('glyburide',StringType(),True),
              StructField('tolbutamide',StringType(),True),
              StructField('pioglitazone',StringType(),True),
              StructField('rosiglitazone',StringType(),True),
              StructField('acarbose',StringType(),True),
              StructField('miglitol',StringType(),True),
              StructField('troglitazone',StringType(),True),
              StructField('tolazamide',StringType(),True),
              StructField('examide',StringType(),True),
              StructField('citoglipton',StringType(),True),
              StructField('insulin',StringType(),True),
              StructField('glyburide-metformin',StringType(),True),
              StructField('glipizide-metformin',StringType(),True),
              StructField('glimepiride-pioglitazone',StringType(),True),
              StructField('metformin-rosiglitazone',StringType(),True),
              StructField('metformin-pioglitazone',StringType(),True),
              StructField('change',StringType(),True),
              StructField('diabetesMed',StringType(),True),
              StructField('readmitted',StringType(),True)]

final_struct = StructType(fields=data_schema)

In [None]:
diabetic_data=spark.read.option("header","true").csv('dataset_diabetes/diabetic_data.csv',schema=final_struct)

In [None]:
diabetic_data.printSchema()

In [None]:
# Seperate two object for continuous and categorical data
numeric_columns = [column[0] for column in diabetic_data.dtypes if column[1]=='int']
categorical_data=[column[0] for column in diabetic_data.dtypes if column[1]=='string']

In [None]:
# n=[]
# for col in diabetic_data.dtypes:
#     if col[1]=='int':
#         n.append(col[0])
# n

    

## Pyspark SQL

In [None]:
# First, we have to register the DataFrame as a SQL temporary view.
diabetic_data.createOrReplaceTempView('diabetic_data')

# After that, we can use the SQL programming language for queries. 
results = spark.sql("SELECT * FROM diabetic_data")
results.show()

In [None]:
spark.sql("select distinct race from diabetic_data").show()

In [None]:
spark.sql("select distinct gender from diabetic_data").show()

In [None]:
def countplot(df):
    for col in categorical_data:
        plt.figure(figsize=(15,5))
        sns.countplot(data=(diabetic_data.select(col).toPandas()), x=col)
        plt.show()
countplot(diabetic_data)
        

In [None]:
# sns.countplot(data=(diabetic_data.select('readmitted').toPandas()), x='readmitted')

In [None]:
from pyspark.sql.functions import when
df3 = diabetic_data.withColumn("readmitted", when(diabetic_data.readmitted == "NO",0) \
      .when(diabetic_data.readmitted == ">30",1).when(diabetic_data.readmitted == "<30",2) \
      .otherwise(diabetic_data.readmitted))
# change the readmitted data types from string to integar after changing the values in new data frame, just for the 
#viaualization purpose
df3=df3.withColumn("readmitted",df3.readmitted.cast(IntegerType()))


In [None]:

def barplot(df):
    for col in numeric_columns:
        plt.figure(figsize=(15,5))
        sns.barplot(data=(df.toPandas()), y=col,x='readmitted')
        plt.show()
barplot(diabetic_data)

In [None]:
def histogram(df):
    for col in numeric_columns:
        plt.figure(figsize=(15,5))
        plt.hist(data=(diabetic_data.select(col).toPandas()),x=col,bins=15, color='b')
        plt.title(col)
        plt.show()

In [None]:
histogram(diabetic_data)

In [None]:

# sns.barplot(data=(diabetic_data.select('number_emergency','readmitted').filter("number_emergency<=1").toPandas()), y='number_emergency',x='readmitted')

In [None]:

# sns.barplot(data=(diabetic_data.select('number_emergency','readmitted').filter("number_emergency>1").toPandas()), y='number_emergency',x='readmitted')

## <font color='grey'> 2.4: Data Quality</font> 

### Check Missing Values

In [None]:
# Using List comprehension, we will check the mising values in the data
from pyspark.sql.functions import col,isnan,when,count
df1 = diabetic_data.select([count(when(col(c).contains('Nan') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in diabetic_data.columns])
df1.show()

#looks like data is clean but has missing values with "?" sign

In [None]:
# check how many columns has contained "?" sign with how many values
from pyspark.sql.functions import col,isnan,when,count
df2 = diabetic_data.select([count(when(col(c).contains('?') | \
                            isnan(c), c 
                           )).alias(c)
                    for c in diabetic_data.columns])
df2.show()

In [None]:
# # Now We will check the missing values in the data
# # Using List comprehension, we will check the mising values in the data
# df3 = diabetic_data.select([count(when(col(c).contains('Nan') | \
#                             col(c).contains('NULL') | \
#                             (col(c) == '' ) | (col(c) == '?' )| \
#                             col(c).isNull() | \
#                             isnan(c), c 
#                            )).alias(c)
#                     for c in diabetic_data.columns])
# df3.show()

In [None]:
# # numeric_cols=diabetic_data.
# diabetic_data.race.show()

In [None]:
# diabetic_data.select('time_in_hospital').approxQuantile(probabilities=[0.25],relativeError=0)

In [None]:
# q1=diabetic_data.approxQuantile('time_in_hospital',[0.25],relativeError=0)

In [None]:
# IQR=q3[0]-q1[0]
# IQR

## <font color='grey'> 2.4: Data Quality</font> 

In [None]:
# from pyspark.sql import functions as f
# def find_outliers(df):

#     # Identifying the numerical columns in a spark dataframe
#     numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']

#     # Using the `for` loop to create new columns by identifying the outliers for each feature
#     for column in numeric_columns:

#         less_Q1 = 'less_Q1_{}'.format(column)
#         more_Q3 = 'more_Q3_{}'.format(column)
#         Q1 = 'Q1_{}'.format(column)
#         Q3 = 'Q3_{}'.format(column)

#         # Q1 : First Quartile ., Q3 : Third Quartile
#         Q1 = df.approxQuantile(column,[0.25],relativeError=0)
#         Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
#         # IQR : Inter Quantile Range
#         # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
#         # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
#         IQR = Q3[0] - Q1[0]
        
#         #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
#         less_Q1 =  Q1[0] - 1.5*IQR
#         more_Q3 =  Q3[0] + 1.5*IQR
        
#         isOutlierCol = 'is_outlier_{}'.format(column)
        
#         df_1=df.filter((df[col]<=less_Q1) | (df[col]>=more_Q3) )
# #         df_2=df.filter(df[col]>=more_Q3)
# # #         df_1=df[(df.select(col)<=lower)]
# # #         df_2=df[(df.select(col)>=upper)]
# #         df=unionAll([df_1,df_2])
# #         df = df.withColumn(isOutlierCol,f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    

# #     # Selecting the specific columns which we have added above, to check if there are any outliers
# #     selected_columns = [column for column in df.columns if column.startswith("is_outlier")]

# #     # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
# #     df = df.withColumn('total_outliers',sum(df[column] for column in selected_columns))

# #     # Dropping the extra columns created above, just to create nice dataframe., without extra columns
# #     df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

#     return df_1

In [None]:
# # dataframe=diabetic_data.select('total_outliers')
# x=find_outliers(diabetic_data)
# x.count()

In [None]:
# import functools
 
# # explicit function
# def unionAll(dfs):
#     return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)



In [None]:
# Finding Outlier using IQR
def finding_outlier(df,col):
#     for col in numeric_columns:
    df.select(col).toPandas().astype(int)
    q1=df.approxQuantile(col,[0.25],relativeError=0)
    q3=df.approxQuantile(col,[0.75],relativeError=0)
    IQR=q3[0]-q1[0]
    upper=q3[0]+1.5*IQR
    lower=q1[0]-1.5*IQR
    df_1=df.filter((df[col]<=lower) | (df[col]>=upper) )
#         df_1=df[(df.select(col)<=lower)]
#         df_2=df[(df.select(col)>=upper)]
#     df=unionAll([df_1,df_2])

    return df_1.select(col)

In [None]:
finding_outlier(diabetic_data,'num_medications').groupby('num_medications').count().withColumnRenamed('count','outliers').sort(('num_medications')).show()

In [None]:
# finding_outlier(diabetic_data,'num_medications').groupby('num_medications').count().withColumnRenamed('count','outliers').sort(('num_medications')).show()

In [None]:
def outlier_count():
    for col in numeric_columns:
        print(f"{col}",finding_outlier(diabetic_data,col).count())
outlier_count()

In [None]:
#checked the statistics
diabetic_data.toPandas().describe()

In [None]:
sns.boxplot(data=(diabetic_data.select('num_medications').toPandas()),x='num_medications')

In [None]:
def boxplot(df):
    for col in numeric_columns:
        plt.figure(figsize=(15,5))
        sns.boxplot(data=(diabetic_data.select(col).toPandas()),x=col)
        plt.show()
boxplot(diabetic_data)

# <font color='orange'>Step 3: Data Preparation</font> 

## <font color='grey'> 3.1: Selecting the Data </font> 

### <font color='blue'> 3.1.1: Selecting items(rows) </font> 

In [None]:
diabetic_data.filter((diabetic_data.readmitted=='<30')).count()

In [None]:
diabetic_data.select('gender').toPandas().value_counts()

In [None]:
diabetic_data.filter((diabetic_data.gender=="Unknown/Invalid") & (diabetic_data.readmitted=='<30')).count()

In [None]:
diabetic_data=diabetic_data.filter(diabetic_data.gender!='Unknown/Invalid')
diabetic_data.select('gender').toPandas().value_counts()

In [None]:
diabetic_data.select('race').toPandas().value_counts()

In [None]:
#  Filter Based on List Values
# li=['Caucasian','AfricanAmerican','Hispanic','Other','Asian']
# diabetic_data.filter((diabetic_data.race.isin(li)==False)).count()

In [None]:
# df.filter(df.state.isin(li)==False).show()

In [None]:
# df_stroke[df_stroke['bmi'].isna() & df_stroke['stroke']==1]['stroke'].count()

In [None]:
# total 188 values of "?" in race column are contributing towards the target variable
diabetic_data.filter((diabetic_data.race=="?") & (diabetic_data.readmitted=='<30')).count()


In [None]:
diabetic_data=diabetic_data.filter(diabetic_data.race!='?')
diabetic_data.select('race').toPandas().value_counts()

In [None]:
# spark.sql("update diabetic_data set readmitted=0 where readmitted='NO' ").show()
# spark.sql("select readmitted from diabetic_data").show()

In [None]:
# spark.sql("update diabetic_data set readmitted=0 where readmitted='>30' ").show()

In [None]:
# Update the readmitted column 
diabetic_data=diabetic_data.withColumn("readmitted", when(col("readmitted") == "NO",0)
      .when(col("readmitted") == "<30",1).when(col("readmitted") == ">30",0))

In [None]:
diabetic_data.select('readmitted').groupBy('readmitted').count().show()

### <font color='blue'> 3.1.2: Selecting Attributes(Columns) </font> 

In [None]:
diabetic_data.show()

In [None]:
# encounter_id & patient_nbr are just for the records purpose only. So I am deleting these attributes
# diag_1, diag_2, diag_3 have almost similar values. Hence these are correlated,we will use only the diag_1 feature.
diabetic_data=diabetic_data.drop("encounter_id","patient_nbr","diag_2","diag_3")
diabetic_data.show()

In [None]:
diabetic_data.show()

In [None]:
# Check number of rows and columns in the data after removing some attributes and records
row=diabetic_data.count()
cols=len(diabetic_data.columns)
print('Total number of columns are - ', cols)
print('\nTotal number of records are - ', row)

In [None]:
# update the numeric columns now
numeric_columns = [column[0] for column in diabetic_data.dtypes if column[1]=='int']

## <font color='grey'> 3.2: Clean the Data</font> 

### <font color='blue'> 3.2.1: Clean Missing Values </font> 

In [None]:

"""
As We can see in step 2.3, there is no missing values in the data, but I have found the missing values are 
imputed with "?" sign. So, First of all we will replace the "?" sign with none value. and then we will check
the missing values
"""
print(" IMPORTANT 👆👆")


In [None]:
# Replace "?" with 'nan' values
diabetic_data = diabetic_data.replace({'?':''}, subset=['race','weight','payer_code','medical_specialty','diag_1','diag_2','diag_3'])

In [None]:
# Let's Check is there any values left with "?"
diabetic_data.select([count(when(col(c).contains('?') | \
                            isnan(c), c 
                           )).alias(c)
                    for c in diabetic_data.columns]).show()

In [None]:

# diabetic_data.select([count(when((col(c)=='') | \
#                             isnan(c), c 
#                            )).alias(c)
#                     for c in diabetic_data.columns]).show()

In [None]:
# Now check the missing values in the dataset 
diabetic_data.select([count(when(col(c).contains('NaN') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in diabetic_data.columns]).show()


In [None]:
# print(round((diabetic_data.filter(diabetic_data.weight=='').count()*100)/(diabetic_data.select('weight').count()),2))

In [None]:
# from pyspark.sql.functions import col
# for cols in diabetic_data.columns:
#     print(round((diabetic_data.filter(" cols =='' ").count()*100)\
#                 /(diabetic_data.select(cols).count()),2))
    

In [None]:
# column weight, payer code, & Medical speciality has Approximately more than 40% missing records
# I will drop these columns from the dataset
diabetic_data=diabetic_data.drop('weight','payer_code','medical_specialty')
# diabetic_data.show()

In [None]:
# remove the missing values in the data
diabetic_data=diabetic_data.filter(diabetic_data.diag_1!='')
# diabetic_data=diabetic_data.filter(diabetic_data.diag_2!='')
# diabetic_data=diabetic_data.filter(diabetic_data.diag_3!='')

In [None]:
# Now check again the missing values in the dataset 
diabetic_data.select([count(when(col(c).contains('NaN') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in diabetic_data.columns]).show()

# Now there is no missing data in the dataset

### <font color='blue'> 3.2.2: Clean Outliers </font> 

In [None]:
# After cleaning the missing value, let's find out the total number of outliers
# I have made a function to count the outliers. let's run this
outlier_count()

In [None]:
# # Finding Outlier using IQR
# def finding_outlier(df,col):
# #     for col in numeric_columns:
#     df.select(col).toPandas().astype(int)
#     q1=df.approxQuantile(col,[0.25],relativeError=0)
#     q3=df.approxQuantile(col,[0.75],relativeError=0)
#     IQR=q3[0]-q1[0]
#     upper=q3[0]+1.5*IQR
#     lower=q1[0]-1.5*IQR
#     df_1=df.filter((df[col]<=lower) | (df[col]>=upper) )
# #         df_1=df[(df.select(col)<=lower)]
# #         df_2=df[(df.select(col)>=upper)]
# #     df=unionAll([df_1,df_2])

#     return df_1.select(col)

In [None]:
diabetic_data.toPandas().describe()

In [None]:
def outliers_removed(df,colm):
#     df.select(col).toPandas().astype(int)
    q1=df.approxQuantile(colm,[0.25],relativeError=0)
    q3=df.approxQuantile(colm,[0.75],relativeError=0)
    IQR=q3[0]-q1[0]
    upper=q3[0]+1.5*IQR
    lower=q1[0]-1.5*IQR
#     df=df[(df[colm]>lower) & (df[colm]<upper)]
#     df.filter((df[colm]>lower) & (df[colm]<upper) )
    return df.filter((df[colm]>lower) & (df[colm]<upper) )

In [None]:
#diabetic_data=
diabetic_data=outliers_removed(diabetic_data,'admission_type_id')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'discharge_disposition_id')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'admission_source_id')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'time_in_hospital')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'num_lab_procedures')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'num_procedures')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'num_medications')

In [None]:
diabetic_data=outliers_removed(diabetic_data,'number_diagnoses')

In [None]:
diabetic_data.toPandas().describe()

In [None]:
#diabetic_data=
diabetic_data=outliers_removed(diabetic_data,'admission_type_id')

In [None]:
continuous=['admission_type_id',
 'discharge_disposition_id',
 'admission_source_id',
 'time_in_hospital',
 'num_lab_procedures',
 'num_procedures',
 'num_medications',
 'number_diagnoses']

for cols in continuous:
    plt.figure(figsize=(15,5))
    sns.boxplot(data=(diabetic_data.select(cols).toPandas()),x=cols)
    plt.show()

In [None]:
diabetic_data.toPandas().describe()

In [None]:
# Check number of rows and columns in the data after Cleaning the data
row=diabetic_data.count()
cols=len(diabetic_data.columns)
print('Total number of columns are - ', cols)
print('\nTotal number of records are - ', row)

In [None]:
for cols in continuous:
    plt.figure(figsize=(15,5))
    sns.displot(data=(diabetic_data.select(cols).toPandas()),x=cols)
    plt.show()

In [None]:
# This code will plot a boxplot of different variable with respect of target attribute stroke
for cols in continuous:
    plt.figure(figsize=(10,8))

    sns.boxplot(x='readmitted',y=cols,hue='readmitted', data=(diabetic_data.toPandas()))
    plt.legend(loc='upper left',title='Stroke')

    plt.show()

## <font color='grey'> 3.3: Constructing New Data</font> 

In [None]:
sns.countplot(data=(diabetic_data.select('age').toPandas()),x='age')

In [None]:
diabetic_data.select('age').groupBy('age').count().sort('age').show()

In [None]:
#create a new column based on the the previous column
diabetic_data=diabetic_data.withColumn("age_category",col("age"))


In [None]:
# replacing the values in the reconstructing column
diabetic_data=diabetic_data.replace({'[0-10)':'Minor','[10-20)':'Minor','[20-30)':'Young_adult',
               '[30-40)':'Young_adult', '[40-50)':'Middle_aged','[50-60)':'Middle_aged',
               '[60-70)':'Older_adult','[70-80)':'Older_adult',
               '[80-90)':'Elderly','[90-100)':'Elderly'},subset=['age_category'])
# Check the total values in each category
diabetic_data.select('age_category').groupBy('age_category').count().sort('age_category').show()

In [None]:
# THe comparision of the previous and the current 
sns.countplot(data=(diabetic_data.select('age','readmitted').toPandas()),x='age',hue='readmitted')
plt.show()
sns.countplot(data=(diabetic_data.select('age_category','readmitted').toPandas()),x='age_category',hue='readmitted')
plt.show()

In [None]:
#Drop the previous column after reconstructing the new column
diabetic_data=diabetic_data.drop('age')
diabetic_data.columns

## <font color='grey'> 3.4: Integrating Data</font> 

In [None]:
#.cache() is advised for consistency, because without it limited_df and rest_df can have overlapping rows
limited_df = diabetic_data.limit(25000).cache()
rest_df = diabetic_data.subtract(limited_df)

In [None]:
print(limited_df.count())
# limited_df.show(1)

In [None]:
print(rest_df.count())
# rest_df.show(1)

In [None]:
import functools
 
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
 
 
diabetic_data = unionAll([limited_df,rest_df])
diabetic_data.count()

## <font color='grey'> 3.5: Data Formatting</font> 

In [None]:
diabetic_data.printSchema()

In [None]:
# Cast Course_Fees from integer type to float type
from pyspark.sql.types import BooleanType
diabetic_data = diabetic_data.withColumn("readmitted", 
                                  diabetic_data["readmitted"]
                                  .cast(BooleanType()))
diabetic_data.printSchema()

# <font color='orange'>Step 4: Data Transformation</font> 

## <font color='grey'> 4.1: Data Reduction</font> 

In [None]:
from pyspark.ml.feature import StringIndexer


In [None]:
a=StringIndexer(inputCol="race", outputCol="Race")
b=StringIndexer(inputCol="gender", outputCol="Gender")
c=StringIndexer(inputCol="max_glu_serum", outputCol="Max_glu_serum")
d=StringIndexer(inputCol="A1Cresult", outputCol="AA1Cresult")
e=StringIndexer(inputCol="metformin", outputCol="Metformin")
f=StringIndexer(inputCol="repaglinide", outputCol="Repaglinide")
g=StringIndexer(inputCol="nateglinide", outputCol="Nateglinide")
h=StringIndexer(inputCol="chlorpropamide", outputCol="Chlorpropamide")
i=StringIndexer(inputCol="glimepiride", outputCol="Glimepiride")
j=StringIndexer(inputCol="acetohexamide", outputCol="Acetohexamide")
k=StringIndexer(inputCol="glipizide", outputCol="Glipizide")
l=StringIndexer(inputCol="glyburide", outputCol="Glyburide")
m=StringIndexer(inputCol="tolbutamide", outputCol="Tolbutamide")
n=StringIndexer(inputCol="pioglitazone", outputCol="Pioglitazone")
o=StringIndexer(inputCol="rosiglitazone", outputCol="Rosiglitazone")
p=StringIndexer(inputCol="acarbose", outputCol="Acarbose")
q=StringIndexer(inputCol="miglitol", outputCol="Miglitol")
r=StringIndexer(inputCol="troglitazone", outputCol="Troglitazone")
s=StringIndexer(inputCol="tolazamide", outputCol="Tolazamide")
t=StringIndexer(inputCol="examide", outputCol="Examide")
u=StringIndexer(inputCol="citoglipton", outputCol="Citoglipton")
v=StringIndexer(inputCol="insulin", outputCol="Insulin")
w=StringIndexer(inputCol="glyburide-metformin", outputCol="Glyburide_metformin")
x=StringIndexer(inputCol="glipizide-metformin", outputCol="Glipizide_metformin")
y=StringIndexer(inputCol="glimepiride-pioglitazone", outputCol="Glimepiride_pioglitazone")
z=StringIndexer(inputCol="metformin-rosiglitazone", outputCol="Metformin_rosiglitazone")
a1=StringIndexer(inputCol="metformin-pioglitazone", outputCol="Metformin_pioglitazone")
a2=StringIndexer(inputCol="change", outputCol="Change")
a3=StringIndexer(inputCol="diabetesMed", outputCol="DiabetesMed")
a4=StringIndexer(inputCol="age_category", outputCol="Age_category")
a5=StringIndexer(inputCol="diag_1", outputCol="Diag_1")


In [None]:
abc=diabetic_data
aa=a.fit(abc).transform(abc)
ab=b.fit(aa).transform(aa)
ac=c.fit(ab).transform(ab)
ad=d.fit(ac).transform(ac)
ae=e.fit(ad).transform(ad)
af=f.fit(ae).transform(ae)
ag=g.fit(af).transform(af)
ah=h.fit(ag).transform(ag)
ai=i.fit(ah).transform(ah)
aj=j.fit(ai).transform(ai)
ak=k.fit(aj).transform(aj)
al=l.fit(ak).transform(ak)
am=m.fit(al).transform(al)
an=n.fit(am).transform(am)
ao=o.fit(an).transform(an)
ap=p.fit(ao).transform(ao)
aq=q.fit(ap).transform(ap)
ar=r.fit(aq).transform(aq)
ast=s.fit(ar).transform(ar)
at=t.fit(ast).transform(ast)
au=u.fit(at).transform(at)
av=v.fit(au).transform(au)
aw=w.fit(av).transform(av)
ax=x.fit(aw).transform(aw)
ay=y.fit(ax).transform(ax)
az=a1.fit(ay).transform(ay)
ba=a2.fit(az).transform(az)
bb=a3.fit(ba).transform(ba)
bc=a4.fit(bb).transform(bb)
bd=a5.fit(bc).transform(bc)
# be=a.fit(bd).transform(bd)


In [None]:
len(bd.columns)

In [None]:
bcd=bd.select('Race', 'Gender', 'admission_type_id', 'discharge_disposition_id', 'admission_source_id', 'time_in_hospital', 'num_lab_procedures', 'num_procedures', 'num_medications', 'number_outpatient', 'number_emergency', 'number_inpatient', 'Diag_1', 'number_diagnoses', 'Max_glu_serum', 'A1Cresult', 'Metformin', 'Repaglinide', 'Nateglinide', 'Chlorpropamide', 'Glimepiride', 'Acetohexamide', 'Glipizide', 'Glyburide', 'Tolbutamide', 'Pioglitazone', 'Rosiglitazone', 'Acarbose', 'Miglitol', 'Troglitazone', 'Tolazamide', 'Examide', 'Citoglipton', 'Insulin', 'glyburide-metformin', 'glipizide-metformin', 'glimepiride-pioglitazone', 'metformin-rosiglitazone', 'metformin-pioglitazone', 'Change', 'DiabetesMed',  'Age_category', 'AA1Cresult', 'Glyburide_metformin', 'Glipizide_metformin', 'Glimepiride_pioglitazone', 'Metformin_pioglitazone','readmitted')
len(bcd.columns)

In [None]:
bcd=bcd.drop( 'A1Cresult',  'glyburide-metformin', 'glipizide-metformin', 'glimepiride-pioglitazone', 'metformin-rosiglitazone', 'metformin-pioglitazone')
bcd.columns
len(bcd.columns)

In [None]:
diabetic_data=bcd

In [None]:
diabetic_data.select('citoglipton').toPandas().value_counts()

In [None]:
diabetic_data.select('examide').toPandas().value_counts()

In [None]:
# Column examide and 'citrogilptone' has only 1 values, so to avoid biaseness, we are removing these columns
diabetic_data=diabetic_data.drop('examide','citoglipton')
len(diabetic_data.columns)

## <font color='grey'> 4.2: Data Projection</font> 

In [None]:
n_col=diabetic_data.columns
n_col.remove("readmitted")

# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=n_col,outputCol="features")

# Now let us use the transform method to transform our dataset
diabetic_data_1=assembler.transform(diabetic_data)
diabetic_data_1.select("features").show(truncate=False)

In [None]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
diabetic_data_1=standardscaler.fit(diabetic_data_1).transform(diabetic_data_1)
diabetic_data_1.select("features","Scaled_features").show(5)

In [None]:
diabetic_data_1 = diabetic_data_1.withColumn("readmitted", 
                                  diabetic_data_1["readmitted"]
                                  .cast('int'))

In [None]:
dataset_size=float(diabetic_data_1.select("readmitted").count())
numPositives=diabetic_data_1.select("readmitted").where('readmitted == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [None]:

# Implement oversampling method
import pyspark.sql.functions as F

# calculate ratio
major_df = diabetic_data_1.filter(diabetic_data_1.readmitted == 0)
minor_df = diabetic_data_1.filter(diabetic_data_1.readmitted == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
a = range(ratio)

# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in a]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
combined_df = major_df.unionAll(oversampled_df)

In [None]:
dataset_size=float(combined_df.select("readmitted").count())
numPositives=combined_df.select("readmitted").where('readmitted == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [None]:
combined_df.select('readmitted').toPandas().value_counts()

# <font color='orange'>Step 5: Data Mining Method</font> 

5.1 - Match and discuss DM methods within the context of the DM objectives.

5.2 - Select the appropriate DM method(s) in a logical manner. The selected DM method must be in line with the data mining goal/success criteria.

## <font color='grey'> 5.1: Match and Discuss DM Methods</font> 

## <font color='grey'> 5.2: Select Appropriate DM Methods</font> 

#  <font color='orange'>Step 6: Data Mining Algorithm Selection</font> 

6.1 Conduct exploratory analysis of DM algorithms within the context of the DM objectives. Then, discuss the analysis.
6.2 - Select algorithm(s) in a logical manner based on the exploratory analysis and discussion.
6.3 - Model(s) must be selected/built, and the appropriate algorithm/model parameter(s) must be selected.

## <font color='grey'> 6.1: Exploratory analysis of DM algorithms</font> 

In [None]:
train, test = (combined_df.select('Scaled_features','readmitted')).randomSplit([0.70, 0.30], seed = 12345)

### <font color='skyblue'> 6.1.1:Logistic Regression Algorithm</font> 

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="readmitted", featuresCol="Scaled_features",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)

# from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='readmitted')
predict_test.select("readmitted","rawPrediction","prediction","probability").show(20)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

In [None]:
#confusion Matrix
cm_lr_result = predict_test.crosstab("prediction", "readmitted")
cm_lr_result = cm_lr_result.toPandas()
cm_lr_result

In [None]:
#calculate Accuracy, Sensitivity, Specificity, Precision
TP = cm_lr_result["1"][0]
FP = cm_lr_result["0"][0]
TN = cm_lr_result["0"][1]
FN = cm_lr_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity/Recall = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

In [None]:
import matplotlib.pyplot as plt
plt.plot(model.summary.roc.select('FPR').collect(),
         model.summary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
model.summary.areaUnderROC

### <font color='skyblue'> 6.1.2:Random Forest Algorithm</font> 

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'Scaled_features', labelCol = 
                            'readmitted')
rf_model = rf.fit(train)
predict_train_rf=rf_model.transform(train)
predict_test_rf=rf_model.transform(test)
predict_test_rf.select("readmitted","prediction").show(10)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = rf_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol('readmitted')
print('AUC of the model:' + str(auc.evaluate(predictions)))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='readmitted')
predict_test.select("readmitted","rawPrediction","prediction","probability").show(20)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train_rf)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test_rf)))

In [None]:
#ROC Grafik
PredAndLabels           = predict_test_rf.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Random Forest Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Random Forest')
plt.legend(loc="lower right")
plt.show()

In [None]:
#confusion Matrix
cm_rf_result = predict_test_rf.crosstab("prediction", "readmitted")
cm_rf_result = cm_rf_result.toPandas()
cm_rf_result

In [None]:
#calculate Accuracy, Sensitivity, Specificity, Precision for Random Forest Classifier
TP = cm_rf_result["1"][0]
FP = cm_rf_result["0"][0]
TN = cm_rf_result["0"][1]
FN = cm_rf_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity/Recall = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

### <font color='skyblue'> 6.1.3: Decision Tree Algorithm</font> 

In [None]:
#Decision Tree
#Create decision tree model to data train
dt=DecisionTreeClassifier(featuresCol = 'Scaled_features', labelCol = 'readmitted', maxDepth = 3)
dt_model = dt.fit(train)

##Transform model to data test
dt_result = dt_model.transform(test)

#view id, label, prediction and probability from result of modelling
dt_result.select( 'readmitted', 'prediction', 'probability').show(5)

# Decision Tree Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
dt_eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="readmitted")
dt_eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
dt_AUC  = dt_eval.evaluate(dt_result)
dt_ACC  = dt_eval2.evaluate(dt_result, {dt_eval2.metricName:"accuracy"})

print("Decision Tree Performance Measure")
print("Accuracy = %0.2f" % dt_ACC)
print("AUC = %.2f" % dt_AUC)

#ROC Grafik
PredAndLabels           = dt_result.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Decision Tree Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Decision Tree')
plt.legend(loc="lower right")
plt.show()

#confusion matrix
cm_dt_result = dt_result.crosstab("prediction", "readmitted")
cm_dt_result = cm_dt_result.toPandas()
cm_dt_result

#calculate accuracy, sensitivity, specificity and precision
TP = cm_dt_result["1"][0]
FP = cm_dt_result["0"][0]
TN = cm_dt_result["0"][1]
FN = cm_dt_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coeffiecient from AUC
AUC = dt_AUC
Gini_dt = (2 * AUC - 1)
print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_dt)

### <font color='skyblue'> 6.1.4:Gradient Boosting Algorithm</font> 

In [None]:
#Gradient Boosting
#create gradient boosting model in data train
gbt = GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",  maxIter=10)
gbt_model = gbt.fit(train)

#transfrom model to data test
gbt_result = gbt_model.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval = BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="readmitted")
gbt_eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_AUC  = gbt_eval.evaluate(gbt_result)
gbt_ACC  = gbt_eval2.evaluate(gbt_result, {gbt_eval2.metricName:"accuracy"})

print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_ACC)
print("AUC = %.2f" % gbt_AUC)

#ROC Grafik
PredAndLabels           = gbt_result.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#Confusion Matrix
cm_gbt_result = gbt_result.crosstab("prediction", "readmitted")
cm_gbt_result = cm_gbt_result.toPandas()
cm_gbt_result

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result["1"][0]
FP = cm_gbt_result["0"][0]
TN = cm_gbt_result["0"][1]
FN = cm_gbt_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_AUC
Gini_gbt= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt)


## <font color='grey'> 6.2: Select algorithm(s) in a logical manner</font> 

## <font color='grey'> 6.3: Model Selection</font> 

#  <font color='orange'>Step 7: Data Mining </font> 

## <font color='grey'> 7.1: Create Logical Test</font> 

## <font color='grey'> 7.2: Conducting Data Mining (Model running!)</font> 

## <font color='grey'> 7.3: Searching for Patterns</font> 

# <font color='orange'>Step 8: Interpretation</font> 

## <font color='grey'> 8.1: Study and discuss the mined patterns</font> 

## <font color='grey'> 8.2: Visualize the data, results, models and patterns</font> 

## <font color='grey'> 8.3: Interpret the data, results, models and patterns</font> 

## <font color='grey'> 8.4: Assess and evaluate the data, results, models and patterns</font> 

## <font color='grey'> 8.5: Multiple Iterations</font> 

### <font color='skyblue'> 8.5.1:1st Iteration</font> 

In [None]:
train, test = (combined_df.select('Scaled_features','readmitted')).randomSplit([0.80, 0.20], seed = 12345)

In [None]:
#Gradient Boosting
#create gradient boosting model in data train
gbt = GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",  maxIter=10)
gbt_model = gbt.fit(train)

#transfrom model to data test
gbt_result = gbt_model.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval = BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="readmitted")
gbt_eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_AUC  = gbt_eval.evaluate(gbt_result)
gbt_ACC  = gbt_eval2.evaluate(gbt_result, {gbt_eval2.metricName:"accuracy"})

print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_ACC)
print("AUC = %.2f" % gbt_AUC)

#ROC Grafik
PredAndLabels           = gbt_result.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#Confusion Matrix
cm_gbt_result = gbt_result.crosstab("prediction", "readmitted")
cm_gbt_result = cm_gbt_result.toPandas()
cm_gbt_result

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result["1"][0]
FP = cm_gbt_result["0"][0]
TN = cm_gbt_result["0"][1]
FN = cm_gbt_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_AUC
Gini_gbt= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt)


### <font color='skyblue'> 8.5.2: Second Iteration</font> 

In [None]:
train, test = (combined_df.select('Scaled_features','readmitted')).randomSplit([0.85, 0.15], seed = 12345)

In [None]:
#Gradient Boosting
#create gradient boosting model in data train
gbt = GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",  maxIter=10)
gbt_model = gbt.fit(train)

#transfrom model to data test
gbt_result = gbt_model.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval = BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="readmitted")
gbt_eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_AUC  = gbt_eval.evaluate(gbt_result)
gbt_ACC  = gbt_eval2.evaluate(gbt_result, {gbt_eval2.metricName:"accuracy"})

print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_ACC)
print("AUC = %.2f" % gbt_AUC)

#ROC Grafik
PredAndLabels           = gbt_result.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#Confusion Matrix
cm_gbt_result = gbt_result.crosstab("prediction", "readmitted")
cm_gbt_result = cm_gbt_result.toPandas()
cm_gbt_result

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result["1"][0]
FP = cm_gbt_result["0"][0]
TN = cm_gbt_result["0"][1]
FN = cm_gbt_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_AUC
Gini_gbt= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt)


### <font color='skyblue'> 8.5.3:Third Iteration</font> 

In [None]:
#Gradient Boosting With Hyper-Parameter
#define gradient boosting model
gbt_hyper= GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",maxIter=10, maxDepth=12)

# Hyper-Parameter Tuning
# paramGrid_gbt = ParamGridBuilder() \
#     .addGrid(gbt_hyper.maxIter, [10])\
#     .addGrid(gbt_hyper.maxDepth, [6, 7,10]) \
#     .build()
# crossval_gbt = CrossValidator(estimator=gbt_hyper,
#                              estimatorParamMaps=paramGrid_gbt,
#                              evaluator=BinaryClassificationEvaluator(),
#                              numFolds=3)
#fit model to data train
gbt_model_hyper = gbt_hyper.fit(train)

#transfrom model to data test
gbt_result_hyper = gbt_model_hyper.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result_hyper.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting With Hyper-Parameter Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval_hyper = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="readmitted")
gbt_eval_hyper2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_hyper_AUC  = gbt_eval_hyper.evaluate(gbt_result_hyper)
gbt_hyper_ACC  = gbt_eval_hyper2.evaluate(gbt_result_hyper, {gbt_eval_hyper2.metricName:"accuracy"})


print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_hyper_ACC)
print("AUC = %.2f" % gbt_hyper_AUC)

#ROC Grafik
PredAndLabels           = gbt_result_hyper.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#confusion Matrix
cm_gbt_result_hyper = gbt_result_hyper.crosstab("prediction", "readmitted")
cm_gbt_result_hyper = cm_gbt_result_hyper.toPandas()
cm_gbt_result_hyper

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result_hyper["1"][0]
FP = cm_gbt_result_hyper["0"][0]
TN = cm_gbt_result_hyper["0"][1]
FN = cm_gbt_result_hyper["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_hyper_AUC
Gini_gbt_hyper= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt_hyper)


### <font color='skyblue'> 8.5.4: Fourth Iteration</font> 

In [None]:
#Gradient Boosting With Hyper-Parameter
#define gradient boosting model
gbt_hyper= GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",maxIter=10, maxDepth=10)

# Hyper-Parameter Tuning
# paramGrid_gbt = ParamGridBuilder() \
#     .addGrid(gbt_hyper.maxIter, [10])\
#     .addGrid(gbt_hyper.maxDepth, [6, 7,10]) \
#     .build()
# crossval_gbt = CrossValidator(estimator=gbt_hyper,
#                              estimatorParamMaps=paramGrid_gbt,
#                              evaluator=BinaryClassificationEvaluator(),
#                              numFolds=3)
#fit model to data train
gbt_model_hyper = gbt_hyper.fit(train)

#transfrom model to data test
gbt_result_hyper = gbt_model_hyper.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result_hyper.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting With Hyper-Parameter Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval_hyper = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="readmitted")
gbt_eval_hyper2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_hyper_AUC  = gbt_eval_hyper.evaluate(gbt_result_hyper)
gbt_hyper_ACC  = gbt_eval_hyper2.evaluate(gbt_result_hyper, {gbt_eval_hyper2.metricName:"accuracy"})


print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_hyper_ACC)
print("AUC = %.2f" % gbt_hyper_AUC)

#ROC Grafik
PredAndLabels           = gbt_result_hyper.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#confusion Matrix
cm_gbt_result_hyper = gbt_result_hyper.crosstab("prediction", "readmitted")
cm_gbt_result_hyper = cm_gbt_result_hyper.toPandas()
cm_gbt_result_hyper

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result_hyper["1"][0]
FP = cm_gbt_result_hyper["0"][0]
TN = cm_gbt_result_hyper["0"][1]
FN = cm_gbt_result_hyper["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_hyper_AUC
Gini_gbt_hyper= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt_hyper)


### <font color='skyblue'> 8.5.5 : Fifth Iteration</font> 

In [None]:
#Gradient Boosting With Hyper-Parameter
#define gradient boosting model
gbt_hyper= GBTClassifier(featuresCol="Scaled_features", labelCol="readmitted",maxIter=10, maxDepth=6)

# Hyper-Parameter Tuning
# paramGrid_gbt = ParamGridBuilder() \
#     .addGrid(gbt_hyper.maxIter, [10])\
#     .addGrid(gbt_hyper.maxDepth, [6, 7,10]) \
#     .build()
# crossval_gbt = CrossValidator(estimator=gbt_hyper,
#                              estimatorParamMaps=paramGrid_gbt,
#                              evaluator=BinaryClassificationEvaluator(),
#                              numFolds=3)
#fit model to data train
gbt_model_hyper = gbt_hyper.fit(train)

#transfrom model to data test
gbt_result_hyper = gbt_model_hyper.transform(test)

#view id, label, prediction and probability from result of modelling
gbt_result_hyper.select( 'readmitted', 'prediction', 'probability').show(5)

#Gradient Boosting With Hyper-Parameter Evaluation
#Evaluate model by calculating accuracy and area under curve (AUC)
gbt_eval_hyper = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="readmitted")
gbt_eval_hyper2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="readmitted")
gbt_hyper_AUC  = gbt_eval_hyper.evaluate(gbt_result_hyper)
gbt_hyper_ACC  = gbt_eval_hyper2.evaluate(gbt_result_hyper, {gbt_eval_hyper2.metricName:"accuracy"})


print("Gradient Boosted Tree Performance Measure")
print("Accuracy = %0.2f" % gbt_hyper_ACC)
print("AUC = %.2f" % gbt_hyper_AUC)

#ROC Grafik
PredAndLabels           = gbt_result_hyper.select("probability", "readmitted")
PredAndLabels_collect   = PredAndLabels.collect()
PredAndLabels_list      = [(float(i[0][0]), 1.0-float(i[1])) for i in PredAndLabels_collect]
PredAndLabels           = sc.parallelize(PredAndLabels_list)

metrics = BinaryClassificationMetrics(PredAndLabels)

# Area under ROC
print("Gradient Boosting Area Under ROC")
print("Area under ROC = %.2f" % metrics.areaUnderROC)

# Visualization
FPR = dict()                                                        # FPR: False Positive Rate
tpr = dict()                                                        # TPR: True Positive Rate
roc_auc = dict()
 
y_test = [i[1] for i in PredAndLabels_list]
y_score = [i[0] for i in PredAndLabels_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Gradient Boosting')
plt.legend(loc="lower right")
plt.show()

#confusion Matrix
cm_gbt_result_hyper = gbt_result_hyper.crosstab("prediction", "readmitted")
cm_gbt_result_hyper = cm_gbt_result_hyper.toPandas()
cm_gbt_result_hyper

#calculate accuracy, sensitivity, specificity and precision
TP = cm_gbt_result_hyper["1"][0]
FP = cm_gbt_result_hyper["0"][0]
TN = cm_gbt_result_hyper["0"][1]
FN = cm_gbt_result_hyper["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )

#Calculate Gini Coefficient from AUC
AUC = gbt_hyper_AUC
Gini_gbt_hyper= (2 * AUC -1)

print("AUC=%.2f" % AUC)
print("GINI ~=%.2f" % Gini_gbt_hyper)
