In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when, col

# Create Spark session
spark = SparkSession.builder.appName("DiabetesPipeline").getOrCreate()

# Read CSV file with schema
my_data = spark.read.csv('/content/drive/MyDrive/diabetes.csv', header=True, schema=my_schema)

# Show data and schema
my_data.printSchema()
my_data.show()

# Define schema
my_schema = tp.StructType([
    tp.StructField('Pregnancies', tp.IntegerType(), True),
    tp.StructField('Glucose', tp.IntegerType(), True),
    tp.StructField('BloodPressure', tp.IntegerType(), True),
    tp.StructField('SkinThickness', tp.IntegerType(), True),
    tp.StructField('Insulin', tp.IntegerType(), True),
    tp.StructField('BMI', tp.FloatType(), True),
    tp.StructField('DiabetesPedigreeFunction', tp.FloatType(), True),
    tp.StructField('Age', tp.IntegerType(), True),
    tp.StructField('Outcome', tp.IntegerType(), True)
])

# Show and schema
my_data.printSchema()

# Replace zero with null for filling missing values
def replace_zero_with_null(df):
    for i, column_name in enumerate(df.columns):
        if i == 0 or i == len(df.columns) - 1:
            continue
        df = df.withColumn(column_name, when(col(column_name) == 0, None).otherwise(col(column_name)))
    return df

my_data = replace_zero_with_null(my_data)
my_data.show()

# Impute values in null places
imputer = Imputer(
    inputCols=my_data.columns[:-1],  # Exclude the outcome column
    outputCols=my_data.columns[:-1]
).setStrategy("median")

my_data1 = imputer.fit(my_data).transform(my_data)

# Specify the input and output columns of the vector assembler
assembler = VectorAssembler(
    inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'],
    outputCol='features'
)

final_data = assembler.transform(my_data1)

# Split data for train and test
train_data, test_data = final_data.randomSplit([0.8, 0.2])

# Train the model
lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=10)
model = lr.fit(train_data)

# Test model
prediction = model.transform(test_data)
prediction.show(5)

# Create a pipeline
pipeline = Pipeline(stages=[imputer, assembler, lr])
pipeline_model = pipeline.fit(my_data1)

# Create new data for prediction
new_data = spark.createDataFrame(
    [(1, 166, 72, 15, 17, 33.6, 0.627, 50, 0)],
    ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
)

# Predict new data through the pipeline
predictions_new_data = pipeline_model.transform(new_data)
predictions_new_data.show()
