In [3]:
# Install the required packages if you haven't already (only needed once)
# !pip install pyspark

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as F

# Create a Spark session
spark = SparkSession.builder.appName("DiabetesPredictionPipeline").getOrCreate()

# Define the schema for the dataset
schema = StructType([
    StructField('Pregnancies', IntegerType(), True),
    StructField('Glucose', IntegerType(), True),
    StructField('BloodPressure', IntegerType(), True),
    StructField('SkinThickness', IntegerType(), True),
    StructField('Insulin', IntegerType(), True),
    StructField('BMI', DoubleType(), True),
    StructField('DiabetesPedigreeFunction', DoubleType(), True),
    StructField('Age', IntegerType(), True),
    StructField('Outcome', IntegerType(), True)
])

# Load the data using the defined schema
data = spark.read.csv('C:/Users/Yashraj/Downloads/diabaties.csv', schema=schema, header=True)

# Columns where 0 values are invalid
zero_invalid_cols = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']

# Replace 0 values with nulls in invalid columns
for col in zero_invalid_cols:
    data = data.withColumn(col, F.when(F.col(col) == 0, None).otherwise(F.col(col)))

# Define the feature columns
features = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 
            'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age']

# Imputer to fill missing values (zeros replaced by nulls)
imputer = Imputer(inputCols=features, outputCols=[f"{c}_imputed" for c in features])

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=[f"{c}_imputed" for c in features], outputCol='features')

# Logistic Regression classifier
lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=100)

# Create a pipeline with imputation, assembling features, and logistic regression
pipeline = Pipeline(stages=[imputer, assembler, lr])

# Split the dataset into training (70%) and test (30%) sets
xtrain, xtest = data.randomSplit([0.7, 0.3], seed=42)

# Fit the pipeline model on the training data
model = pipeline.fit(xtrain)

# Make predictions on the test data
predictions = model.transform(xtest)

# Evaluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Output the accuracy
print(f"Model Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()

# Split the dataset into training and test sets (70% train, 30% test)
X_train, X_test, y_train, y_test = train_test_split(data[features], y, test_size=0.3, random_state=42)

# Create a pipeline with imputation and logistic regression
pipeline = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),  # Step 1: Imputation
    ('classifier', LogisticRegression(max_iter=1000))  # Step 2: Logistic Regression
])

# Fit the pipeline model on the training data
pipeline.fit(X_train, y_train)

# Make predictions on the test set
y_pred = pipeline.predict(X_test)

# Evaluate the accuracy of the model
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy}")


Model Accuracy: 0.7402597402597403



[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip
