In [2]:
!pip install pyspark
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [3]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import RandomOverSampler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.appName('BigDataProject').getOrCreate()
sc = spark.sparkContext

In [5]:
# read data
df = pd.read_csv('../Data/data_cleaned.csv')
df.head()

############# checking the accuracy of the model before oversampling ###########
X_train, X_test, y_train, y_test = train_test_split(df.drop('HeartDisease', axis=1), df['HeartDisease'], test_size=0.2, random_state=42)
# create a classifier
clf = RandomForestClassifier(n_estimators=100, max_depth=2, random_state=0)
# fit the classifier to the training data
clf.fit(X_train, y_train)
# predict the test data
y_pred = clf.predict(X_test)
# show the accuracy score
print('Accuracy score before oversampling: ', accuracy_score(y_test, y_pred))
# show the confusion matrix
print('Confusion matrix before oversampling: \n', confusion_matrix(y_test, y_pred))
# show the classification report
print('Classification report before oversampling: \n', classification_report(y_test, y_pred))


############# checking the accuracy of the model after oversampling ############
ros = RandomOverSampler(random_state=42)
X_resampled, y_resampled = ros.fit_resample(X_train, y_train)
# show the shape of the resampled data
print(X_resampled.shape)
print(y_resampled.shape)
# use the resampled data to train the classifier
clf.fit(X_resampled, y_resampled)
# predict the test data
y_pred = clf.predict(X_test)
# show the accuracy score
print('Accuracy score after oversampling: ', accuracy_score(y_test, y_pred))
# show the confusion matrix
print('Confusion matrix after oversampling: \n', confusion_matrix(y_test, y_pred))
# show the classification report
print('Classification report after oversampling: \n', classification_report(y_test, y_pred))


############# combining the features in one column ############
train_df = pd.concat([X_resampled, y_resampled], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)
df_resampled = pd.concat([train_df, test_df], ignore_index=True)
df_sk = spark.createDataFrame(df_resampled)
df_rdd = df_sk.rdd

train_data, test_data = df_rdd.randomSplit([0.8, 0.2], seed=42)


Accuracy score before oversampling:  0.9125689895089042
Confusion matrix before oversampling: 
 [[58367     0]
 [ 5592     0]]
Classification report before oversampling: 
               precision    recall  f1-score   support

           0       0.91      1.00      0.95     58367
           1       0.00      0.00      0.00      5592

    accuracy                           0.91     63959
   macro avg       0.46      0.50      0.48     63959
weighted avg       0.83      0.91      0.87     63959



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


(468110, 17)
(468110,)
Accuracy score after oversampling:  0.7250113353867321
Confusion matrix after oversampling: 
 [[42175 16192]
 [ 1396  4196]]
Classification report after oversampling: 
               precision    recall  f1-score   support

           0       0.97      0.72      0.83     58367
           1       0.21      0.75      0.32      5592

    accuracy                           0.73     63959
   macro avg       0.59      0.74      0.58     63959
weighted avg       0.90      0.73      0.78     63959



In [6]:
print(train_data.count())
print(train_data.take(20))

425793
[Row(BMI=23.33, Smoking=1, AlcoholDrinking=0, Stroke=0, PhysicalHealth=0.0, MentalHealth=0.0, DiffWalking=0, Sex=1, AgeCategory=11, Race=0, Diabetic=0, PhysicalActivity=1, GenHealth=3, SleepTime=7.0, Asthma=0, KidneyDisease=0, SkinCancer=0, HeartDisease=0), Row(BMI=27.46, Smoking=1, AlcoholDrinking=0, Stroke=1, PhysicalHealth=30.0, MentalHealth=0.0, DiffWalking=0, Sex=1, AgeCategory=7, Race=0, Diabetic=0, PhysicalActivity=1, GenHealth=2, SleepTime=6.0, Asthma=0, KidneyDisease=0, SkinCancer=0, HeartDisease=1), Row(BMI=32.69, Smoking=0, AlcoholDrinking=0, Stroke=0, PhysicalHealth=2.0, MentalHealth=2.0, DiffWalking=0, Sex=1, AgeCategory=6, Race=5, Diabetic=0, PhysicalActivity=0, GenHealth=3, SleepTime=8.0, Asthma=0, KidneyDisease=0, SkinCancer=0, HeartDisease=0), Row(BMI=31.32, Smoking=0, AlcoholDrinking=0, Stroke=0, PhysicalHealth=0.0, MentalHealth=0.0, DiffWalking=0, Sex=0, AgeCategory=1, Race=0, Diabetic=0, PhysicalActivity=1, GenHealth=4, SleepTime=8.0, Asthma=0, KidneyDisease=

# Models

In [7]:
from pyspark.ml.classification import NaiveBayes
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## -----------------Naive Bayes-----------------


In [8]:
# visualization function for the purpose of analysis
def count_values(column_index, specified_value):    
    # Filter the RDD based on the specified value and calculate the count
    count = train_data.filter(lambda row: row[column_index] == specified_value).count()
    # Print the count
    print("Count of value '{}' : {}".format(specified_value, count))
  
count_values(0, 32.69)

Count of value '32.69' : 492


In [9]:
# Count the total number of records
total_count = train_data.count()
# Get the header names
headers = df_sk.columns

# MapReduce Phase 1: Calculate the prior probabilities
prior_probs = {}
for i,header in enumerate(headers):
    header_counts = train_data.map(lambda x: ((header, x[i]), 1))
    header_total_counts = header_counts.reduceByKey(lambda x, y: x + y)  # ((header, attribute), total count)
    prior_probs[header] = header_total_counts.collectAsMap()
print(f'Prior Probabilites: {prior_probs}')

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 16.0 failed 1 times, most recent failure: Lost task 1.0 in stage 16.0 (TID 182) (DESKTOP-GGTUHE3 executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:201)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:172)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:760)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:89)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:80)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:734)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:201)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:172)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:760)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:89)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:80)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:734)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)


In [None]:
# MapReduce Phase 2: Calculate the conditional probabilities
cond_probs = {}
class_header = "HeartDisease"
for i,header in enumerate(headers):
    if header != "HeartDisease":
        header_counts = train_data.map(lambda x: ((x[i], x[17]), 1))  # ((header, attribute), count)
        header_total_counts = header_counts.reduceByKey(lambda x, y: x + y)  # ((header, attribute), total count)
        cond_probs[header] = header_total_counts.map(lambda x: ((x[0][0], x[0][1]), x[1] / prior_probs[class_header][(class_header,x[0][1])]))

In [None]:
# for header, probabilities in cond_probs.items():
#     print(f'Conditional Probabilities for : {header}')
#     print(probabilities.collect())
#     data_as_list = probabilities.collect()
#     class_value = 1
#     feature_value = 225
#     target_pair = (feature_value, class_value)
#     target_value = [value for pair, value in data_as_list if pair == target_pair]
#     print(target_value[0] if len(target_value) else "0")  # Access the first element (assuming there's only one matching element)
#     print()


In [None]:
# Precompute and collect the conditional probabilities as a dictionary
collected_cond_probs = {header: dict(cond_probs[header].collect()) for header in headers if header != "HeartDisease"}
rec = test_data.take(1)
print(rec[0])
test_examples_count = int(test_data.count())
records = test_data.take(test_examples_count)

Row(BMI=32.73, Smoking=1, AlcoholDrinking=0, Stroke=0, PhysicalHealth=0.0, MentalHealth=5.0, DiffWalking=0, Sex=1, AgeCategory=2, Race=0, Diabetic=0, PhysicalActivity=0, GenHealth=2, SleepTime=8.0, Asthma=0, KidneyDisease=0, SkinCancer=0, HeartDisease=0)


In [None]:
def calculate_posterior(attributes, class_value):
    posterior_probs = {header: collected_cond_probs[header].get((attributes[i], class_value), 0) for i, header in enumerate(headers) if header != "HeartDisease"}
    return posterior_probs

print(f'Class 0 :\n{calculate_posterior(rec[0],0)}')

Class 0 :
{'BMI': 8.555271330430245e-05, 'Smoking': 0.3960748415135986, 'AlcoholDrinking': 0.9295430629582417, 'Stroke': 0.9737738157365661, 'PhysicalHealth': 0.7258634407590236, 'MentalHealth': 0.04505633646171088, 'DiffWalking': 0.8825746233541797, 'Sex': 0.46419191184648423, 'AgeCategory': 0.06343305927947505, 'Race': 0.762338840076313, 'Diabetic': 0.8619307536338515, 'PhysicalActivity': 0.2118413510484485, 'GenHealth': 0.2856177333664137, 'SleepTime': 0.30511947436412945, 'Asthma': 0.8700283179481038, 'KidneyDisease': 0.9715622780976498, 'SkinCancer': 0.9153883665420449}


In [None]:
# Function to classify a record
def classify(record):
    actual_class = record[17]
    class0_probs = calculate_posterior(record, 0)
    class1_probs = calculate_posterior(record, 1)
    max_prob = 0
    predicted_class = None
    for heart_disease, prior_prob in prior_probs["HeartDisease"].items():
        prob = prior_prob / total_count
        posterior_probs = class0_probs if heart_disease[1] == 0 else class1_probs
        for posterior_prob in posterior_probs.values():
            prob *= posterior_prob
        if prob > max_prob:
            max_prob = prob
            predicted_class = heart_disease
    final_class = predicted_class[1] if predicted_class else 0
    return final_class, actual_class

predictions = []
actual = []
for i in range(test_examples_count):
    predictions.append(classify(records[i])[0])
    actual.append(classify(records[i])[1])

# Print the predictions
print(predictions)
print(actual)


[0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 

In [None]:
print(f'Classification report :\n{classification_report(actual, predictions)}')

Classification report :
              precision    recall  f1-score   support

           0       0.76      0.80      0.78     58648
           1       0.74      0.69      0.71     47928

    accuracy                           0.75    106576
   macro avg       0.75      0.75      0.75    106576
weighted avg       0.75      0.75      0.75    106576

