In [9]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.getOrCreate()
import pandas as pd
import pyspark.sql.types as tp
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [10]:
#read csv file
my_data=spark.read.csv('/content/diabetes.csv',header=True)
my_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [11]:
#see the schema
my_data.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [12]:
#define schema
my_schema=tp.StructType(
    [
        tp.StructField('Pregnancies',tp.IntegerType(),True),
        tp.StructField('Glucose',tp.IntegerType(),True),
        tp.StructField('BloodPressure',tp.IntegerType(),True),
        tp.StructField('SkinThickness',tp.IntegerType(),True),
        tp.StructField('Insulin',tp.IntegerType(),True),
        tp.StructField('BMI',tp.FloatType(),True),
        tp.StructField('DiabetesPedigreeFunction',tp.FloatType(),True),
        tp.StructField('Age',tp.IntegerType(),True),
        tp.StructField('Outcome',tp.IntegerType(),True)
    ]
)

In [13]:
my_data=spark.read.csv('/content/diabetes.csv',header=True,schema=my_schema)
my_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [14]:
#get dimension of data
(my_data.count(),len(my_data.columns))

(768, 9)

In [15]:
my_data.head()

Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.599998474121094, DiabetesPedigreeFunction=0.6269999742507935, Age=50, Outcome=1)

In [16]:
#Replace zero with null for filling missing values
from pyspark.sql.functions import when, col

def replace_zero_with_null(df):
    for i, column_name in enumerate(df.columns):
        # Skip first and last column
        if i == 0 or i == len(df.columns) - 1:
            continue
        df = df.withColumn(column_name, when(col(column_name) == 0, None).otherwise(col(column_name)))
    return df

my_data = replace_zero_with_null(my_data)
my_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   NULL|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|   NULL|26.6|                   0.351| 31|      0|
|          8|    183|           64|         NULL|   NULL|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|         NULL|   NULL|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [17]:
#impute values in null place in
imputer = Imputer(
    inputCols=my_data.columns,
    outputCols=my_data.columns
    ).setStrategy("median")


my_data1 = imputer.fit(my_data).transform(my_data)

In [18]:
my_data1.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|    125|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|    125|26.6|                   0.351| 31|      0|
|          8|    183|           64|           29|    125|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|           29|    125|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [19]:
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Pregnancies',
                                       'Glucose',
                                       'BloodPressure',
                                       'SkinThickness',
                                       'Insulin', 'BMI',
                                       'DiabetesPedigreeFunction',
                                       'Age'],
                             outputCol='features')

final_data = assembler.transform(my_data1)
final_data.select('features', 'Outcome').show()

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[6.0,148.0,72.0,3...|      1|
|[1.0,85.0,66.0,29...|      0|
|[8.0,183.0,64.0,2...|      1|
|[1.0,89.0,66.0,23...|      0|
|[0.0,137.0,40.0,3...|      1|
|[5.0,116.0,74.0,2...|      0|
|[3.0,78.0,50.0,32...|      1|
|[10.0,115.0,72.0,...|      0|
|[2.0,197.0,70.0,4...|      1|
|[8.0,125.0,96.0,2...|      1|
|[4.0,110.0,92.0,2...|      0|
|[10.0,168.0,74.0,...|      1|
|[10.0,139.0,80.0,...|      0|
|[1.0,189.0,60.0,2...|      1|
|[5.0,166.0,72.0,1...|      1|
|[7.0,100.0,72.0,2...|      1|
|[0.0,118.0,84.0,4...|      1|
|[7.0,107.0,74.0,2...|      1|
|[1.0,103.0,30.0,3...|      0|
|[1.0,115.0,70.0,3...|      1|
+--------------------+-------+
only showing top 20 rows



In [20]:
#split data for test and train
train_data,test_data=final_data.randomSplit([0.8,0.2])

In [21]:
#train the model
lr=LogisticRegression(featuresCol='features',labelCol='Outcome', maxIter=10)
model=lr.fit(train_data)

In [22]:
#test model
prediction=model.transform(test_data)
prediction.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|          0|     67|           76|           29|    125|45.3|                   0.194| 46|      0|[0.0,67.0,76.0,29...|[2.05328161503477...|[0.88627878792583...|       0.0|
|          0|     84|           64|           22|     66|35.8|                   0.545| 21|      0|[0.0,84.0,64.0,22...|[2.35893397349947...|[0.91364173254214...|       0.0|
|          0|     93|           60|           25|     92|28.7|                   0.532| 22|      0|[0.0,93.0,60.0,25...|[2.7403358

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer # Assuming 'imputer' should be an Imputer
from pyspark.ml.feature import SQLTransformer # Assuming 'replace_zero_with_null' uses SQL

# Assuming replace_zero_with_null logic: Replace 0 with NULL in specified columns
replace_zero_with_null = SQLTransformer(
    statement="""
    SELECT
        CASE WHEN Pregnancies = 0 THEN NULL ELSE Pregnancies END AS Pregnancies,
        CASE WHEN Glucose = 0 THEN NULL ELSE Glucose END AS Glucose,
        CASE WHEN BloodPressure = 0 THEN NULL ELSE BloodPressure END AS BloodPressure,
        CASE WHEN SkinThickness = 0 THEN NULL ELSE SkinThickness END AS SkinThickness,
        CASE WHEN Insulin = 0 THEN NULL ELSE Insulin END AS Insulin,
        CASE WHEN BMI = 0 THEN NULL ELSE BMI END AS BMI,
        DiabetesPedigreeFunction,
        Age,
        Outcome
    FROM __THIS__
    """
)

# Assuming imputer logic: Impute missing values with the mean
imputer = Imputer(
    inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI'],
    outputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI'],
    strategy='mean'
)


pipeline = Pipeline(stages=[replace_zero_with_null, imputer, assembler, lr])
pipeline_model = pipeline.fit(my_data1)
pred = pipeline_model.transform(my_data1)
pred.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|          6|    148|           72|           35|    125|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|[-0.8806900636527...|[0.29303480117503...|       1.0|
|          1|     85|           66|           29|    125|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|[3.31433489007416...|[0.96491732110206...|       0.0|
|          8|    183|           64|           29|    125|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|[-1.396523

In [27]:
#create a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[replace_zero_with_null,imputer, assembler, lr])
pipeline_model = pipeline.fit(my_data1)
pred = pipeline_model.transform(my_data1)
pred.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|          6|    148|           72|           35|    125|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|[-0.8806900636527...|[0.29303480117503...|       1.0|
|          1|     85|           66|           29|    125|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|[3.31433489007416...|[0.96491732110206...|       0.0|
|          8|    183|           64|           29|    125|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|[-1.396523

In [28]:
#create new data for prediction with outcome column
Pregnancies=1
Glucose=166
BloodPressure=72
SkinThickness=15
Insulin=17
BMI=33.6
DiabetesPedigreeFunction=0.627
Age=50
# Add a placeholder value for 'Outcome' (you'll need to determine the appropriate value based on your problem)
Outcome = 0
new_data=spark.createDataFrame([(Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age, Outcome)],
                               ['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age', 'Outcome'])
new_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|    166|           72|           15|     17|33.6|                   0.627| 50|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+



In [29]:
#predicte new data through pipline
pipeline_model.transform(new_data).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+
|          1|    166|           72|           15|     17|33.6|                   0.627| 50|      0|[1.0,166.0,72.0,1...|[-0.8911289512497...|[0.29087690648059...|       1.0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+--------------------+----------+

