In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

In [None]:
data = spark.read.option("header",True) \
     .csv("/content/diabetes.csv")

In [None]:
data.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [None]:
from pyspark.sql.functions import col, cast

In [None]:
data.schema

StructType([StructField('Pregnancies', StringType(), True), StructField('Glucose', StringType(), True), StructField('BloodPressure', StringType(), True), StructField('SkinThickness', StringType(), True), StructField('Insulin', StringType(), True), StructField('BMI', StringType(), True), StructField('DiabetesPedigreeFunction', StringType(), True), StructField('Age', StringType(), True), StructField('Outcome', StringType(), True)])

In [None]:
columns_to_convert =list(data.columns)
print(columns_to_convert)

['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']


In [None]:
for column in columns_to_convert:
    data = data.withColumn(column, col(column).cast('float'))

In [None]:
data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [None]:
to_median = list(data.columns)
to_median.remove("Pregnancies")
to_median.remove("Outcome")
print(to_median)

['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age']


In [None]:
from pyspark.sql.functions import when, col
for column in to_median:
    data = data.withColumn(column, when(col(column) == 0, None).otherwise(col(column)))
data.show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction| Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+
|        6.0|  148.0|         72.0|         35.0|   NULL|33.6|                   0.627|50.0|    1.0|
|        1.0|   85.0|         66.0|         29.0|   NULL|26.6|                   0.351|31.0|    0.0|
|        8.0|  183.0|         64.0|         NULL|   NULL|23.3|                   0.672|32.0|    1.0|
|        1.0|   89.0|         66.0|         23.0|   94.0|28.1|                   0.167|21.0|    0.0|
|        0.0|  137.0|         40.0|         35.0|  168.0|43.1|                   2.288|33.0|    1.0|
|        5.0|  116.0|         74.0|         NULL|   NULL|25.6|                   0.201|30.0|    0.0|
|        3.0|   78.0|         50.0|         32.0|   88.0|31.0|                   0.248|26.0

In [None]:
from pyspark.sql.functions import col, when, isnull, lit, median
for column in to_median:
    median_val = data.approxQuantile(column, [0.5], 0.25)[0]
    data = data.withColumn(column, when(col(column).isNull(), lit(median_val)).otherwise(col(column)))
data.show(50)

+-----------+-------+-------------+-------------+-------+------------------+------------------------+----+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|               BMI|DiabetesPedigreeFunction| Age|Outcome|
+-----------+-------+-------------+-------------+-------+------------------+------------------------+----+-------+
|        6.0|  148.0|         72.0|         35.0|   77.0|33.599998474121094|      0.6269999742507935|50.0|    1.0|
|        1.0|   85.0|         66.0|         29.0|   77.0|26.600000381469727|     0.35100001096725464|31.0|    0.0|
|        8.0|  183.0|         64.0|         22.0|   77.0|23.299999237060547|       0.671999990940094|32.0|    1.0|
|        1.0|   89.0|         66.0|         23.0|   94.0|28.100000381469727|     0.16699999570846558|21.0|    0.0|
|        0.0|  137.0|         40.0|         35.0|  168.0|43.099998474121094|      2.2880001068115234|33.0|    1.0|
|        5.0|  116.0|         74.0|         22.0|   77.0|25.600000381469727|    

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "DiabetesPedigreeFunction", "Outcome" ], outputCol="features")
final_data = assembler.transform(data)
final_data.show(10)


+-----------+-------+-------------+-------------+-------+------------------+------------------------+----+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|               BMI|DiabetesPedigreeFunction| Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+------------------+------------------------+----+-------+--------------------+
|        6.0|  148.0|         72.0|         35.0|   77.0|33.599998474121094|      0.6269999742507935|50.0|    1.0|[6.0,148.0,72.0,3...|
|        1.0|   85.0|         66.0|         29.0|   77.0|26.600000381469727|     0.35100001096725464|31.0|    0.0|[1.0,85.0,66.0,29...|
|        8.0|  183.0|         64.0|         22.0|   77.0|23.299999237060547|       0.671999990940094|32.0|    1.0|[8.0,183.0,64.0,2...|
|        1.0|   89.0|         66.0|         23.0|   94.0|28.100000381469727|     0.16699999570846558|21.0|    0.0|[1.0,89.0,66.0,23...|
|        0.0|  137.0|         40.0|         35.0

In [None]:
print(final_data["features", "Outcome"])

DataFrame[features: vector, Outcome: float]


In [None]:
final_data1 = final_data.select("features", "Outcome")
final_data1.show(10)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|    1.0|
|[1.0,85.0,66.0,29...|    0.0|
|[8.0,183.0,64.0,2...|    1.0|
|[1.0,89.0,66.0,23...|    0.0|
|[0.0,137.0,40.0,3...|    1.0|
|[5.0,116.0,74.0,2...|    0.0|
|[3.0,78.0,50.0,32...|    1.0|
|[10.0,115.0,64.0,...|    0.0|
|[2.0,197.0,70.0,4...|    1.0|
|[8.0,125.0,96.0,2...|    1.0|
+--------------------+-------+
only showing top 10 rows



In [None]:
train_data, test_data = final_data1.randomSplit([0.8, 0.2])
train_data.show()
test_data.show()

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[0.0,57.0,60.0,22...|    0.0|
|[0.0,67.0,76.0,22...|    0.0|
|[0.0,73.0,64.0,22...|    0.0|
|[0.0,74.0,52.0,10...|    0.0|
|[0.0,78.0,88.0,29...|    0.0|
|[0.0,84.0,64.0,22...|    0.0|
|[0.0,84.0,82.0,31...|    0.0|
|[0.0,86.0,68.0,32...|    0.0|
|[0.0,91.0,80.0,22...|    0.0|
|[0.0,93.0,60.0,22...|    0.0|
|[0.0,93.0,100.0,3...|    0.0|
|[0.0,94.0,64.0,22...|    0.0|
|[0.0,95.0,64.0,39...|    0.0|
|[0.0,95.0,80.0,45...|    0.0|
|[0.0,95.0,85.0,25...|    1.0|
|[0.0,97.0,64.0,36...|    0.0|
|[0.0,99.0,64.0,22...|    0.0|
|[0.0,100.0,70.0,2...|    0.0|
|[0.0,100.0,88.0,6...|    0.0|
|[0.0,101.0,62.0,2...|    0.0|
+--------------------+-------+
only showing top 20 rows

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[0.0,91.0,68.0,32...|    0.0|
|[0.0,93.0,60.0,25...|    0.0|
|[0.0,94.0,70.0,27...|    0.0|
|[0.0,98.0,82.0,15...|    0.0|
|[0.0,101.0,7

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="Outcome")
model = lr.fit(train_data)
predictions = model.transform(test_data)

In [None]:
predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,22...|    0.0|[20.1454433160567...|[0.99999999821784...|       0.0|
|[0.0,73.0,64.0,22...|    0.0|[20.0034414858235...|[0.99999999794592...|       0.0|
|[0.0,74.0,52.0,10...|    0.0|[20.1961035839516...|[0.99999999830588...|       0.0|
|[0.0,78.0,88.0,29...|    0.0|[19.7244152584883...|[0.99999999728484...|       0.0|
|[0.0,84.0,64.0,22...|    0.0|[19.7870030219240...|[0.99999999744956...|       0.0|
|[0.0,91.0,68.0,32...|    0.0|[19.6066614764615...|[0.99999999694553...|       0.0|
|[0.0,91.0,80.0,22...|    0.0|[19.6082788137300...|[0.99999999695047...|       0.0|
|[0.0,93.0,60.0,22...|    0.0|[19.7490784890976...|[0.99999999735098...|       0.0|
|[0.0,97.0,64.0,36...|    0.0|[19.4174204950032...|[0.99999999630919...|    

In [None]:
train_data.show()

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[0.0,57.0,60.0,22...|    0.0|
|[0.0,67.0,76.0,22...|    0.0|
|[0.0,74.0,52.0,10...|    0.0|
|[0.0,78.0,88.0,29...|    0.0|
|[0.0,84.0,64.0,22...|    0.0|
|[0.0,86.0,68.0,32...|    0.0|
|[0.0,91.0,68.0,32...|    0.0|
|[0.0,93.0,60.0,22...|    0.0|
|[0.0,93.0,60.0,25...|    0.0|
|[0.0,93.0,100.0,3...|    0.0|
|[0.0,94.0,64.0,22...|    0.0|
|[0.0,94.0,70.0,27...|    0.0|
|[0.0,95.0,64.0,39...|    0.0|
|[0.0,95.0,80.0,45...|    0.0|
|[0.0,95.0,85.0,25...|    1.0|
|[0.0,97.0,64.0,36...|    0.0|
|[0.0,98.0,82.0,15...|    0.0|
|[0.0,99.0,64.0,22...|    0.0|
|[0.0,100.0,70.0,2...|    0.0|
|[0.0,100.0,88.0,6...|    0.0|
+--------------------+-------+
only showing top 20 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator
Bevaluator = BinaryClassificationEvaluator()
Bevaluator.setLabelCol("Outcome")
Bevaluator.evaluate(predictions)


1.0

In [None]:
Mevaluator = MulticlassClassificationEvaluator()
Mevaluator.setLabelCol("Outcome")
Mevaluator.setPredictionCol("prediction")
Mevaluator.evaluate(predictions)

1.0

In [None]:
from pyspark.ml.classification import NaiveBayes

In [None]:
nb = NaiveBayes(featuresCol="features", labelCol="Outcome", modelType="multinomial")

In [None]:
nbmodel = nb.fit(train_data)
predictions = nbmodel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Outcome", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.7972972972972973


In [None]:
from pyspark.mllib.classification import PerceptronModel, PerceptronWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
perceptron = PerceptronClassifier(featuresCol="features", labelCol="Outcome", maxIter=10)
perpmodel = perceptron.fit(train_data)

predictions = perpmodel.transform(test_data)
print("Prediction:", prediction)

ImportError: cannot import name 'PerceptronModel' from 'pyspark.mllib.classification' (/usr/local/lib/python3.10/dist-packages/pyspark/mllib/classification.py)

In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(featuresCol="features", labelCol="Outcome", maxIter=10, regParam=0.1)
svmmodel = svm.fit(train_data)
predictions = svmmodel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Outcome", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 1.0


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="Outcome", maxDepth=5, impurity="gini")
model = dt.fit(train_data)

predictions = model.transform(test_data)


evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Outcome", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 1.0
