In [4]:
!pip install numpy
!pip install panda


Collecting panda
  Downloading panda-0.3.1.tar.gz (5.8 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: panda
  Building wheel for panda (setup.py): started
  Building wheel for panda (setup.py): finished with status 'done'
  Created wheel for panda: filename=panda-0.3.1-py3-none-any.whl size=7296 sha256=a527e01aa05e319bdd1a0fcfae3f07ead2458574dfeb465f33a8dec9f866e7b4
  Stored in directory: c:\users\rising phoenix\appdata\local\pip\cache\wheels\df\5c\39\36f8dae25a1e88d6ec4411dec4a143781e64fdff6897758eec
Successfully built panda
Installing collected packages: panda
Successfully installed panda-0.3.1


  DEPRECATION: Building 'panda' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'panda'. Discussion can be found at https://github.com/pypa/pip/issues/6334


In [6]:
#import 
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnull, floor, skewness, kurtosis
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.stat import Correlation

In [10]:
# Create Spark Session
spark = SparkSession.builder.appName("Fraudscript").getOrCreate()

In [8]:
# 1. DATA ANALYSIS (15 Marks)

In [None]:
# Load data
df = spark.read.csv("data/diabetes.csv", header=True, inferSchema=True)
df.show()

In [None]:
# Schema and basic stats
df.printSchema()
df.describe().show()
print("Rows:", df.count())
print("Columns:", len(df.columns))


In [None]:
# Null value check
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
# Value counts
df.groupBy("target_column").count().show()

#df.groupBy("Outcome").count().show()


In [None]:
# Skewness and kurtosis
numeric_columns = [
    'Pregnancies',
    'Glucose',
    'BloodPressure',
    'SkinThickness',
    'Insulin',
    'BMI',
    'DiabetesPedigreeFunction',
    'Age'
]         
df.select([skewness(c).alias(c + "_skew") for c in numeric_columns]).show()
df.select([kurtosis(c).alias(c + "_skew") for c in numeric_columns]).show()


from pyspark.sql.functions import log1p  # needed when 1 < 0 > 1 skewness

df = df.withColumn("log_Insulin", log1p("Insulin"))

from pyspark.sql.functions import log1p #same for kurtosis <0 low =0 normal >3 high

df = df.withColumn("log_Insulin", log1p("Insulin"))

In [None]:
# Correlation matrix

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Numeric columns
numeric_cols = [
    'Pregnancies',
    'Glucose',
    'BloodPressure',
    'SkinThickness',
    'Insulin',
    'BMI',
    'DiabetesPedigreeFunction',
    'Age'
]

# Assemble numeric columns into a features vector
vec_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df_vector = vec_assembler.transform(df).select("features")

# Calculate correlation matrix
correlation_matrix = Correlation.corr(df_vector, "features").head()[0]
corr_array = correlation_matrix.toArray()  # Convert to numpy array

# Convert to Pandas DataFrame
df_corr = pd.DataFrame(data=corr_array, columns=numeric_cols, index=numeric_cols)


In [None]:
indexer = StringIndexer(inputCol="target_column", outputCol="label")
df = indexer.fit(df).transform(df) # only needed if you dont have 0/1 value in target instead yes and no


In [None]:
# Model Building

In [None]:

df.describe().show(truncate=False)

In [None]:
df.columns

In [None]:
assembler = VectorAssembler(inputCols=['Pregnancies',
                          'Glucose',
                          'BloodPressure',
                          'SkinThickness',
                          'Insulin',
                          'BMI',
                          'DiabetesPedigreeFunction',
                          'Age'],
                outputCol="features")
assembler.transform(df).show(truncate=False)

In [None]:
model_input_df = assembler.transform(df).select("features", "Outcome")
model_input_df.show(truncate=False)

In [None]:
train_data, test_data = model_input_df.randomSplit([0.8, 0.2], seed=100)

In [None]:
train_data.count()

In [None]:
test_data.count()

In [None]:
# Logistic Regression

lr = LogisticRegression(labelCol="Outcome", featuresCol="features")
lr_model = lr.fit(train_data)
lr_train_predictions = lr_model.transform(train_data)
lr_test_predictions = lr_model.transform(test_data)

In [None]:
lr_model

In [None]:
lr_train_predictions.show()

In [None]:
# Decision Tree

dt = DecisionTreeClassifier(labelCol="Outcome", featuresCol="features", maxDepth=3)
dt_model = dt.fit(train_data)
dt_train_predictions = dt_model.transform(train_data)
dt_test_predictions = dt_model.transform(test_data)

In [None]:
# RandomForest

rf = RandomForestClassifier(labelCol="Outcome", featuresCol="features", numTrees=20, maxDepth=5)
rf_model = rf.fit(train_data)
rf_train_predictions = rf_model.transform(train_data)
rf_test_predictions = rf_model.transform(test_data)

In [None]:
#vModel Evaluation

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [None]:
# Evaluation
evaluator_acc = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="f1")
evaluator_prec = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="precisionByLabel")
evaluator_rec = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="recallByLabel")
evaluator_auc = BinaryClassificationEvaluator(labelCol="Outcome", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [None]:
# Decision Tree Metrics train -
print("Decision Tree Train  - Accuracy:", round(evaluator_acc.evaluate(dt_train_predictions)*100,2))
print("Decision Tree Train- F1 Score:", round(evaluator_f1.evaluate(dt_train_predictions)*100,2))
print("Decision Tree Train- Precision:", round(evaluator_prec.evaluate(dt_train_predictions)*100,2))
print("Decision Tree Train- Recall:", round(evaluator_rec.evaluate(dt_train_predictions)*100,2))
dt_train_predictions.groupBy("outcome", "prediction").count().show()

print("Decision Tree Test- Accuracy:", round(evaluator_acc.evaluate(dt_test_predictions)*100,2))
print("Decision Tree Test - F1 Score:", round(evaluator_f1.evaluate(dt_test_predictions)*100,2))
print("Decision Tree Test- Precision:", round(evaluator_prec.evaluate(dt_test_predictions)*100,2))
print("Decision Tree Test- Recall:", round(evaluator_rec.evaluate(dt_test_predictions)*100,2))
dt_test_predictions.groupBy("outcome", "prediction").count().show()

In [None]:
# same for Random forest make sure all value as needed

print("RandomForest  Train  - Accuracy:", round(evaluator_acc.evaluate(rf_train_predictions)*100,2))
# print("RandomForest Train- F1 Score:", round(evaluator_f1.evaluate(rf_train_predictions)*100,2))
# print("RandomForest Train- Precision:", round(evaluator_prec.evaluate(rf_train_predictions)*100,2))
# print("RandomForest Train- Recall:", round(evaluator_rec.evaluate(rf_train_predictions)*100,2))
rf_train_predictions.groupBy("outcome", "prediction").count().show() #fill test


In [None]:
# Logistic Regression Metrics - same for logistic
print("Logistic Regression - Accuracy:", evaluator_acc.evaluate(lr_pred))

In [None]:
# HyperParameter Tuning

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
# Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="Outcome", featuresCol="features")

#parameter grid for tuning
dt_grid = ParamGridBuilder().addGrid(dt.maxDepth, range(4, 10)).build()

#evaluator
evaluator = BinaryClassificationEvaluator(labelCol="Outcome", metricName="areaUnderROC")

#Set up CrossValidator
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_grid, evaluator=evaluator, numFolds=5, seed=50 )

#Fit model with cross-validation
dt_cv_model = dt_cv.fit(train_data)

#Get best model and print best params

best_dt_model = dt_cv_model.bestModel
print("Best Decision Tree Parameters:")
print("maxDepth:", best_dt_model.getOrDefault("maxDepth"))

#Predict and evaluate on test data
dt_test_predictions = best_dt_model.transform(test_data)
auc = evaluator.evaluate(dt_test_predictions)
print(f"Test AUC: {auc:.4f}")



In [None]:
# Random forest -- need to work on crossvalidation
rf = RandomForestClassifier(labelCol="Outcome", featuresCol="features")

rf_grid = (ParamGridBuilder()
           .addGrid(rf.numTrees, [50, 100])
           .addGrid(rf.maxDepth, [2, 3, 4, 5, 10])
           .build())

rf_cv = CrossValidator(estimator=rf,
                       estimatorParamMaps=rf_grid,
                       evaluator=evaluator,
                       numFolds=3)

rf_cv_model = rf_cv.fit(train_data)
rf_best_model = rf_cv_model.bestModel

rf_cv_predictions = rf_cv_model.transform(test_data)

best_rf_model = rf_cv_model.bestModel

print("  Best Random Forest Parameters:")
print("  numTrees:", best_rf_model.getOrDefault("numTrees"))
print("  maxDepth:", best_rf_model.getOrDefault("maxDepth"))


In [None]:
lr = LogisticRegression(labelCol="Outcome", featuresCol="features")

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.01, 0.1])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5])\
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="Outcome", metricName="areaUnderROC")

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

cv_model = cv.fit(train_data)
best_model = cv_model.bestModel
predictions = best_model.transform(test_data)

print("Test AUC:", evaluator.evaluate(predictions))

In [None]:
map reduce - 

