In [204]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, Imputer, MinMaxScaler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [205]:
if __name__=="__main__":
    spark=SparkSession\
    .builder\
    .appName("DiseaseTypewithSpark")\
    .getOrCreate()

In [206]:
data=spark.read.csv("Disease_Prediction.csv",header=True)

In [207]:
display(data)

DataFrame[feature_1: string, feature_2: string, feature_3: string, feature_4: string, Disease_Type: string]

In [208]:
data.show()

+---------+---------+---------+---------+------------+
|feature_1|feature_2|feature_3|feature_4|Disease_Type|
+---------+---------+---------+---------+------------+
|   3.6216|   8.6661|  -2.8073| -0.44699|           0|
|   4.5459|   8.1674|  -2.4586|  -1.4621|           0|
|    3.866|  -2.6383|   1.9242|  0.10645|           0|
|   3.4566|   9.5228|  -4.0112|  -3.5944|           0|
|  0.32924|  -4.4552|   4.5718|  -0.9888|           0|
|       na|   9.6718|  -3.9606|  -3.1625|           0|
|   3.5912|   3.0129|  0.72888|  0.56421|           0|
|   2.0922|    -6.81|   8.4636| -0.60216|           0|
|   3.2032|   5.7588| -0.75345| -0.61251|           0|
|   1.5356|   9.1772|  -2.2718| -0.73535|           0|
|   1.2247|   8.7779|  -2.2135| -0.80647|           0|
|   3.9899|  -2.7066|   2.3946|  0.86291|           0|
|   1.8993|   7.6625|  0.15394|  -3.1108|           0|
|  -1.5768|   10.843|   2.5462|  -2.9362|           0|
|    3.404|   8.7261|  -2.9915| -0.57242|           0|
|   4.6765

In [209]:
for col in data.columns:
    data=data.withColumn(col,data[col].cast('double'))

In [210]:
input_columns=data.columns[:-1]

In [211]:
imputed_columns=['f_{}'.format(i+1) for i in range(len(input_columns))]

In [212]:
model=Imputer(strategy='mean',missingValue=None,inputCols=input_columns,outputCols=imputed_columns).fit(data)
impute_data=model.transform(data)

In [213]:
impute_data.show()

+---------+---------+---------+---------+------------+------------------+------------------+--------+--------+
|feature_1|feature_2|feature_3|feature_4|Disease_Type|               f_1|               f_2|     f_3|     f_4|
+---------+---------+---------+---------+------------+------------------+------------------+--------+--------+
|   3.6216|   8.6661|  -2.8073| -0.44699|         0.0|            3.6216|            8.6661| -2.8073|-0.44699|
|   4.5459|   8.1674|  -2.4586|  -1.4621|         0.0|            4.5459|            8.1674| -2.4586| -1.4621|
|    3.866|  -2.6383|   1.9242|  0.10645|         0.0|             3.866|           -2.6383|  1.9242| 0.10645|
|   3.4566|   9.5228|  -4.0112|  -3.5944|         0.0|            3.4566|            9.5228| -4.0112| -3.5944|
|  0.32924|  -4.4552|   4.5718|  -0.9888|         0.0|           0.32924|           -4.4552|  4.5718| -0.9888|
|     NULL|   9.6718|  -3.9606|  -3.1625|         0.0|0.4308653338439095|            9.6718| -3.9606| -3.1625|
|

In [214]:
a = VectorAssembler(inputCols=imputed_columns, outputCol='assembled_features', handleInvalid='error')
a_data = a.transform(impute_data)

In [215]:
s = MinMaxScaler(min=0.0, max=1.0, inputCol='assembled_features', outputCol='features')
s_data = s.fit(a_data).transform(a_data)

In [216]:
display(s_data)

DataFrame[feature_1: double, feature_2: double, feature_3: double, feature_4: double, Disease_Type: double, f_1: double, f_2: double, f_3: double, f_4: double, assembled_features: vector, features: vector]

In [217]:
s_data = s_data.withColumnRenamed('Disease_Type','label')

In [218]:
display(s_data)

DataFrame[feature_1: double, feature_2: double, feature_3: double, feature_4: double, label: double, f_1: double, f_2: double, f_3: double, f_4: double, assembled_features: vector, features: vector]

In [219]:
d= ['label','features']
s_data.select(*d).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.76900388695382...|
|  0.0|[0.83565901535310...|
|  0.0|[0.78662859038429...|
|  0.0|[0.75710504871312...|
|  0.0|[0.53157807440740...|
|  0.0|[0.53890670112598...|
|  0.0|[0.76681161615068...|
|  0.0|[0.65871247358818...|
|  0.0|[0.73883131774224...|
|  0.0|[0.61857372592288...|
|  0.0|[0.59615343011055...|
|  0.0|[0.79556353619049...|
|  0.0|[0.64480164997223...|
|  0.0|[0.39412557961765...|
|  0.0|[0.75331184331032...|
|  0.0|[0.84507712610605...|
|  0.0|[0.70051705860718...|
|  0.0|[0.56578254692829...|
|  0.0|[0.61224931311251...|
|  0.0|[0.88587932414598...|
+-----+--------------------+
only showing top 20 rows



In [220]:
col = ['label','features']
s_data.select(*col).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.76900388695382...|
|  0.0|[0.83565901535310...|
|  0.0|[0.78662859038429...|
|  0.0|[0.75710504871312...|
|  0.0|[0.53157807440740...|
|  0.0|[0.53890670112598...|
|  0.0|[0.76681161615068...|
|  0.0|[0.65871247358818...|
|  0.0|[0.73883131774224...|
|  0.0|[0.61857372592288...|
|  0.0|[0.59615343011055...|
|  0.0|[0.79556353619049...|
|  0.0|[0.64480164997223...|
|  0.0|[0.39412557961765...|
|  0.0|[0.75331184331032...|
|  0.0|[0.84507712610605...|
|  0.0|[0.70051705860718...|
|  0.0|[0.56578254692829...|
|  0.0|[0.61224931311251...|
|  0.0|[0.88587932414598...|
+-----+--------------------+
only showing top 20 rows



In [221]:
train_df,test_df = s_data.select('label','features').randomSplit([0.7,0.3],1213)

In [222]:
test_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.19876107853954...|
|  0.0|[0.31320626816375...|
|  0.0|[0.32870360354513...|
|  0.0|[0.33488378801318...|
|  0.0|[0.36600826428401...|
|  0.0|[0.37781335410221...|
|  0.0|[0.38120993156365...|
|  0.0|[0.38276038624350...|
|  0.0|[0.38558726175280...|
|  0.0|[0.39912309167874...|
|  0.0|[0.39926732002105...|
|  0.0|[0.41408678219356...|
|  0.0|[0.41408678219356...|
|  0.0|[0.42568995233253...|
|  0.0|[0.42625244286754...|
|  0.0|[0.42711781292141...|
|  0.0|[0.44169352919542...|
|  0.0|[0.45695144552856...|
|  0.0|[0.47533911688986...|
|  0.0|[0.47912222630869...|
+-----+--------------------+
only showing top 20 rows



In [223]:
mlpc=MultilayerPerceptronClassifier( featuresCol='features',labelCol='label',layers = [4,16,2],\
                                    maxIter=1000,blockSize=8,seed=7,solver='gd')

In [224]:
b= mlpc.fit(train_df)

In [225]:
predictions = b.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='f1')

In [226]:
predictions

DataFrame[label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [227]:
b_f1 = evaluator.evaluate(prediction)
b_f1

0.8309986610958963

In [228]:
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
evaluator_precision = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedRecall')
evaluator_auc = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')


In [229]:
accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
auc = evaluator_auc.evaluate(predictions)

In [230]:
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", b_f1)
print("Area Under ROC Curve:", auc)

Accuracy: 0.8345323741007195
Precision: 0.8402262463849458
Recall: 0.8345323741007193
F1 Score: 0.8309986610958963
Area Under ROC Curve: 0.9093738211995465
