===========================================


Gebil Jibul


Description: This program demonstrates the use of PySpark, Keras, and TensorFlow to build a deep learning algorithm that predicts the probability of heart disease based on several features from patient data.

=========================================== 

# Classification in PySpark and Keras

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DSC 10") \
    .getOrCreate()


sample_libsvm_data_path = 'data/sample_libsvm_data.txt'

In [2]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format('libsvm').load(sample_libsvm_data_path)


lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print('Coefficients: ' + str(lrModel.coefficients))
print('Intercept: ' + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family='multinomial')

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print('Multinomial coefficients: ' + str(mlrModel.coefficientMatrix))
print('Multinomial intercepts: ' + str(mlrModel.interceptVector))

Coefficients: (692,[272,300,323,350,351,378,379,405,406,407,428,433,434,435,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.520689871384157e-05,-8.11577314684704e-05,3.814692771846389e-05,0.0003776490540424341,0.0003405148366194407,0.0005514455157343111,0.00040853861160969167,0.00041974673327494573,0.0008119171358670032,0.0005027708372668752,-2.392926040660149e-05,0.0005745048020902299,0.000903754642680371,7.818229700243959e-05,-2.17875519529124e-05,-3.402165821789581e-05,0.0004966517360637634,0.0008190557828370371,-8.017982139522661e-05,-2.743169403783574e-05,0.00048108322262389896,0.00048408017626778744,-8.926472920010679e-06,-0.0003414881233042728,-8.950592574121448e-05,0.0004864546911689218,-8.478698005186158e-05,-0.0004234783215831764,-7.296535777631296e-05])
Intercept: -0.5991460286401442
Multinomial coefficients: 2 X 692 CSRMatrix
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0002
(0,351) -0.0001
(0,378) -0.0003
(0,379) -0.0002
(0,405) -0.0002
(0,406) -0.0004
(0,407)

In [4]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print('objectiveHistory:')
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a df and areaUnderROC.
trainingSummary.roc.show()
print('areaUnderROC: ' + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

objectiveHistory:
0.6833149135741672
0.6661906127558116
0.6207433672479604
0.613154125312387
0.6059149689952394
0.5923656241678249
0.5898233082838019
0.5868012627420284
0.5844432058719141
0.5830790068041745
0.5807015754032354
objectiveHistory:
0.6833149135741672
0.6661906127558116
0.6207433672479604
0.613154125312387
0.6059149689952394
0.5923656241678249
0.5898233082838019
0.5868012627420284
0.5844432058719141
0.5830790068041745
0.5807015754032354




+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 1.0
+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.140

LogisticRegression_f8eaac902b63

LogisticRegression_f8eaac902b63

In [14]:
import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers

file_url = 'http://storage.googleapis.com/download.tensorflow.org/data/heart.csv'
df = pd.read_csv(file_url)



In [15]:
df.shape

(303, 14)

In [16]:
# Creates dfs for training and validating the model
val_df = df.sample(frac=0.2, random_state=1337)
train_df = df.drop(val_df.index)

print(f'Train, Val sizes: {len(train_df), len(val_df)}')

Train, Val sizes: (242, 61)


In [20]:
# Creates TF dataset objects from dataframes
def df_to_ds(df):
    df = df.copy()
    
    labels = df.pop('target')
    ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))
    ds = ds.shuffle(buffer_size=len(df))
    return ds

# Creates TF ds objects from dfs
train_ds = df_to_ds(train_df)
val_ds = df_to_ds(val_df)

In [21]:
# Batches the ds - Adds dimension to shape
train_ds = train_ds.batch(32)
val_ds = val_ds.batch(32)

In [22]:
from tensorflow.keras.layers import IntegerLookup
from tensorflow.keras.layers import Normalization
from tensorflow.keras.layers import StringLookup


# Normoralizes and engineers features quantitative
def encode_numerical_feature(feature, name, ds):
    # Create a Normalization layer for our feature
    normalizer = Normalization()

    # Prepare a dataset that only yields our feature
    feature_ds = ds.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # Learn the statistics of the data
    normalizer.adapt(feature_ds)

    # Normalize the input feature
    encoded_feature = normalizer(feature)
    return encoded_feature


# Normoralizes and engineers features qualitative
def encode_categorical_feature(feature, name, ds, is_string):
    lookup_class = StringLookup if is_string else IntegerLookup
    # Create a lookup layer which will turn strings into integer indices
    lookup = lookup_class(output_mode='binary')

    # Prepare a ds that only yields our feature
    feature_ds = ds.map(lambda x, y: x[name])
    feature_ds = feature_ds.map(lambda x: tf.expand_dims(x, -1))

    # Learn the set of possible string values and assign them a fixed integer index
    lookup.adapt(feature_ds)

    # Turn the string input into integer indices
    encoded_feature = lookup(feature)
    return encoded_feature

In [45]:
# Categorizes Data by type
data_cats = {
    'continious': ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'slope'],
    'discrete': [ 'sex', 'cp', 'fbs', 'restecg', 'exang', 'ca'],
    'string': ['thal']
}

In [68]:

all_inputs = []
all_features = []

# Encodes numerically continious features
for label in data_cats.get('continious'):
    feat_input = keras.Input(shape=(1,), name=label)
    feature = encode_numerical_feature(feat_input, label, train_ds)
    
    all_inputs.append(feat_input)
    all_features.append(feature)

# Encodes numerically discrete features
for label in data_cats.get('discrete'):
    feat_input = keras.Input(shape=(1,), name=label, dtype='int64')
    feature = encode_categorical_feature(feat_input, label, train_ds, False)
    
    all_inputs.append(feat_input)
    all_features.append(feature)

# Encodes string features
for label in data_cats.get('string'):
    feat_input = keras.Input(shape=(1,), name=label, dtype='string')
    feature = encode_categorical_feature(feat_input, label, train_ds, True)
    
    all_inputs.append(feat_input)
    all_features.append(feature)

all_features = layers.concatenate(all_features)

x = layers.Dense(32, activation='relu')(all_features)
x = layers.Dropout(0.5)(x)
output = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(all_inputs, output)
model.compile('adam', 'binary_crossentropy', metrics=['accuracy'])

In [69]:
model.fit(train_ds, epochs=50, validation_data=val_ds)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x22449473430>

In [71]:
sample = {
    'age': 60,
    'sex': 1,
    'cp': 1,
    'trestbps': 145,
    'chol': 233,
    'fbs': 1,
    'restecg': 2,
    'thalach': 150,
    'exang': 0,
    'oldpeak': 2.3,
    'slope': 3,
    'ca': 0,
    'thal': 'fixed',
}

input_dict = {name: tf.convert_to_tensor([value]) for name, value in sample.items()}
preds = model.predict(input_dict)

print(
    f'This particular patient had a {100 * preds[0][0]} percent probability '
    'of having a heart disease, as evaluated by the deep learning model.'
)

This particular patient had a 23.665252327919006 percent probability of having a heart disease, as evaluated by the deep learning model.
