# Import Libraries

In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
import os #important without this wont work
os.environ['PYSPARK_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['JAVA_HOME'] = 'D:\\Java\\jdk1.8.0_202\\'

# Create Spark Context

In [None]:
spark = SparkSession \
    .builder \
    .appName("ML_Classifications_example1") \
    .getOrCreate()



In [None]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [None]:
# Upload files (Only in colabs)

In [None]:
# from google.colab import files


# uploaded = files.upload()


In [None]:
file='\\Users\\kimil\\OneDrive\\Desktop\\MUIC_work\\BigData\\BigData\\SparkML\\data\\drybeans.csv'
df = spark.read.csv(file,header='true',inferSchema=True)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.describe().toPandas().transpose()

In [None]:
df.select(["Area","Perimeter","Solidity","roundness","Compactness","Class"]).show(5)

In [None]:
df.groupBy('Class').count().orderBy('count').show()

In [None]:
# Convert Class column from string to numerical values
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Class", outputCol="label") #addition to the regression
df = indexer.fit(df).transform(df)

In [None]:
df.groupBy('label').count().orderBy('count').show()

#  Classification

In [None]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [None]:
featureColumns =df.columns[:-2]

In [None]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
df_assembled = assembler.transform(df)

In [None]:
df_assembled.show(10)

In [None]:
(trainingData, testData) = df_assembled.randomSplit([0.8,0.2], seed = 13234 )

In [None]:
trainingData.count(),testData.count()

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(trainingData)

In [None]:
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,minInstancesPerNode=20, impurity="gini")
#pipeline = Pipeline(stages=[dt])
#model = pipeline.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.show()

In [None]:
predictions.select("features","rawprediction","probability","prediction", "label").show(10)

In [None]:
prediction_save=predictions.select("features","rawprediction","probability","prediction", "label").show()

In [None]:
predictions.select("prediction", "label").write.save(path="predictions",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

In [None]:

prediction_save=predictions.select("prediction", "label").rdd.map(tuple)
prediction_save

# Evaluations

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

In [None]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)


In [None]:
metrics = MulticlassMetrics(prediction_save)

In [None]:
metrics.confusionMatrix().toArray().transpose()


# Try with fewer features/ Try with Decision Tree

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
import os #important without this wont work
os.environ['PYSPARK_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['JAVA_HOME'] = 'D:\\Java\\jdk1.8.0_202\\'
spark = SparkSession \
    .builder \
    .appName("ML_Classifications_example1") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [None]:
sc.stop()

# Exercise Diabates data

In [None]:
file='\\Users\\kimil\\OneDrive\\Desktop\\MUIC_work\\BigData\\BigData\\SparkML\\data\\diabetes.csv'
df = spark.read.csv(file,header='true',inferSchema=True)

In [None]:
df.show(5)
df.printSchema()

In [None]:
df.describe().toPandas().transpose()

In [None]:
df.select(["Pregnancies","Glucose","BloodPressure","SkinThickness","Insulin","BMI","DiabetesPedigreeFunction","Age","Outcome"]).show(5)

In [None]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [None]:
featureColumns =df.columns[:-2]
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
df_assembled = assembler.transform(df)
df_assembled = df_assembled.withColumnRenamed("Outcome","label")
df_assembled.show(10)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
df_label_0 = df_assembled.filter(df_assembled['label'] == 0).select('Age').toPandas()
df_label_1 = df_assembled.filter(df_assembled['label'] == 1).select('Age').toPandas()
df_label_0_insu = df_assembled.filter(df_assembled['label'] == 0).select('Insulin').toPandas()
df_label_1_insu = df_assembled.filter(df_assembled['label'] == 1).select('Insulin').toPandas()
df_label_0_glu = df_assembled.filter(df_assembled['label'] == 0).select('Glucose').toPandas()
df_label_1_glu = df_assembled.filter(df_assembled['label'] == 1).select('Glucose').toPandas()

fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(12, 18))

plt.figure(figsize=(12, 6))
sns.histplot(df_label_0, x='Age', bins=10, kde=False, color='blue', label='People without diabetes', ax=axes[0])
sns.histplot(df_label_1, x='Age', bins=10, kde=False, color='orange', label='People with diabetes', ax=axes[0])
axes[0].set_xlabel('Age')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Distribution of Age by diabetes(Label/Outcome)')
axes[0].legend()

sns.histplot(df_label_0_insu, x='Insulin', bins=10, kde=False, color='blue', label='People without diabetes', ax=axes[1])
sns.histplot(df_label_1_insu, x='Insulin', bins=10, kde=False, color='orange', label='People with diabetes', ax=axes[1])
axes[1].set_xlabel('Insulin')
axes[1].set_ylabel('Frequency')
axes[1].set_title('Distribution of Insulin by diabetes(Label/Outcome)')
axes[1].legend()

sns.histplot(df_label_0_glu, x='Glucose', bins=10, kde=False, color='blue', label='People without diabetes', ax=axes[2])
sns.histplot(df_label_1_glu, x='Glucose', bins=10, kde=False, color='orange', label='People with diabetes', ax=axes[2])
axes[2].set_xlabel('Glucose')
axes[2].set_ylabel('Frequency')
axes[2].set_title('Distribution of Glucose by diabetes(Label/Outcome)')
axes[2].legend()

plt.tight_layout()
plt.show()

In [None]:
(trainingData, testData) = df_assembled.randomSplit([0.8,0.2], seed = 13234 )
trainingData.count(),testData.count()

In [None]:
trainingData

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Assuming 'trainingData' and 'testData' are Spark DataFrames
# Convert to Pandas DataFrames for plotting (assuming they are small enough)
training_pd = trainingData.select('*').toPandas()
test_pd = testData.select('*').toPandas()

# Example: Plotting feature distribution
plt.figure(figsize=(12, 6))
sns.histplot(training_pd['label'], bins=10, kde=False, color='blue', label='Training Data')
sns.histplot(test_pd['label'], bins=10, kde=False, color='orange', label='Test Data')
plt.xlabel('label')
plt.ylabel('Frequency')
plt.title('Distribution of label in Training and Test Data')
plt.legend()
plt.show()


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(trainingData)

In [None]:
predictions = model.transform(testData)
predictions.printSchema()

In [None]:
predictions.select("features","rawprediction","probability","prediction", "label").show(10)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

In [None]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

In [None]:
predictions.select("prediction", "label").write.save(path="diabete_predictions",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

In [None]:
prediction_save=predictions.select("prediction", "label").rdd.map(tuple)
metrics = MulticlassMetrics(prediction_save)

In [None]:
from sklearn.metrics import classification_report

# Convert predictions DataFrame to Pandas DataFrame (if small enough) or use Spark's RDD operations
predictions_pd = predictions.select("prediction", "label").toPandas()

# Calculate custom metrics using scikit-learn
report = classification_report(predictions_pd['label'], predictions_pd['prediction'])
print(report)

In [None]:
metrics.confusionMatrix().toArray().transpose() 
#I somehow cannot do confusionMatrix, it works the first time however after that it unfortunately doesnt work anymore 
# (Colab works fine only local)

# HW Excercise ML PIPELINE

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
import os #important without this wont work
os.environ['PYSPARK_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:\\Python311\\python.exe'
os.environ['JAVA_HOME'] = 'D:\\Java\\jdk1.8.0_202\\'

spark = SparkSession \
    .builder \
    .appName("ML_Classifications_Pipeline") \
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [50]:
rel_file='data/drybeans.csv'
file = os.path.join(os.getcwd(), rel_file)
df = spark.read.csv(file,header='true',inferSchema=True)

In [15]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [None]:
featureColumns =df.columns[:-2]
featureColumns

In [8]:
indexer = StringIndexer(inputCol="Class", outputCol="label")
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

In [17]:
assembled = assembler.transform(df)

In [16]:
pipeline = Pipeline(stages=[indexer,assembler, scaler, lr])

In [18]:
(traindata, testData) = df.randomSplit([0.8,0.2], seed = 140140 )

In [31]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.fitIntercept, [False, True]) \
    .addGrid(lr.maxIter, [5, 10,20]) \
    .build()

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=4)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)
cvModel.avgMetrics

[np.float64(0.10769119711936476),
 np.float64(0.11004629236083396),
 np.float64(0.8801365645336956),
 np.float64(0.861490263888053),
 np.float64(0.9126263520563774),
 np.float64(0.9138081614531232)]

In [43]:
predictionsCV = cvModel.transform(testData)
predictionsCV.show(5)

+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+--------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| Area|Perimeter|MajorAxisLength|MinorAxisLength|AspectRation|Eccentricity|ConvexArea|EquivDiameter|     Extent|   Solidity|  roundness|Compactness|ShapeFactor1|ShapeFactor2|ShapeFactor3|ShapeFactor4|   Class|label|            features|     scaled_features|       rawPrediction|         probability|prediction|
+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+--------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|20464|  528.408|     191.249312|    136.3684624| 1.402445321| 0.70

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
def evaluate(result):
    predictionAndLabels = result.select("prediction", "label")
    metrics = ["f1","precisionByLabel","recallByLabel","weightedPrecision","weightedRecall","accuracy"]
    for m in metrics:
        evaluator = MulticlassClassificationEvaluator(metricName=m)
        print(str(m) + ": " + str(evaluator.evaluate(predictionAndLabels)))
evaluate(predictionsCV)

f1: 0.9214791960308689
precisionByLabel: 0.9403409090909091
recallByLabel: 0.9118457300275482
weightedPrecision: 0.9217719212369953
weightedRecall: 0.9213813372520205
accuracy: 0.9213813372520205


In [19]:
model = pipeline.fit(traindata)

In [29]:
predictions = model.transform(testData)
predictions.show(5)

+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+--------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| Area|Perimeter|MajorAxisLength|MinorAxisLength|AspectRation|Eccentricity|ConvexArea|EquivDiameter|     Extent|   Solidity|  roundness|Compactness|ShapeFactor1|ShapeFactor2|ShapeFactor3|ShapeFactor4|   Class|label|            features|     scaled_features|       rawPrediction|         probability|prediction|
+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+--------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|20464|  528.408|     191.249312|    136.3684624| 1.402445321| 0.70

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
def evaluate(result):
    predictionAndLabels = result.select("prediction", "label")
    metrics = ["f1","precisionByLabel","recallByLabel","weightedPrecision","weightedRecall","accuracy"]
    for m in metrics:
        evaluator = MulticlassClassificationEvaluator(metricName=m)
        print(str(m) + ": " + str(evaluator.evaluate(predictionAndLabels)))

evaluate(predictions)
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(predictions)
# print("Accuracy = ", accuracy)

f1: 0.9265614804147871
precisionByLabel: 0.9367977528089888
recallByLabel: 0.918732782369146
weightedPrecision: 0.9267282615621796
weightedRecall: 0.9265246142542247
accuracy: 0.9265246142542248


In [51]:
sc.stop()