<a href="https://colab.research.google.com/github/strucker-eth/Arrow-funtions-Js-Solidity/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. Install spark**

In [None]:
#Install JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
#Install Pyspark
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

# Unzip the file
!tar xf spark-3.3.2-bin-hadoop3.tgz


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"


In [None]:
!pip install findspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import findspark
findspark.init('/content/spark-3.3.2-bin-hadoop3')

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
'''
load models
'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics


# **2. Load dataset**

In [None]:
'''
load models
'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics


In [None]:
import pandas as pd

from pyspark.sql.functions import col

In [None]:
data = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/dataset.csv")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
data.dtypes

age              int64
bloodpressure    int64
diabetes         int64
dtype: object

In [None]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
data['indexedLabel'] = label_encoder.fit_transform(data['age'])


In [None]:
train_data = data.sample(frac=0.7, random_state=42)
test_data = data.drop(train_data.index)


# **2. Decision Tree**
Run below codes and answer question 1.

reference:

model:
https://spark.apache.org/docs/latest/mllib-decision-tree.html

evaluation:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#multiclass-classification

## **Model**

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import pandas as pd
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
data = pd.read_csv("dataset.csv")

In [None]:
train_data, test_data = train_test_split(data, test_size=0.3, random_state=42)

In [None]:
ct = ColumnTransformer(
    transformers=[
        ("num", StandardScaler(), ["age", "bloodpressure", "diabetes"])
    ])


In [None]:
dt = DecisionTreeClassifier(random_state=42)

In [None]:
# Create the pipeline
dt_pipeline = Pipeline(steps=[("preprocessor", ct), ("classifier", dt)])

TypeError: ignored

In [None]:
print(train_data.columns)


Index(['age', 'bloodpressure', 'diabetes'], dtype='object')


In [None]:
dt_pipeline.fit(train_data, train_data["age"])

In [None]:
predictions = model.transform(test_data)
predictions.show(5)

+---+-------------+--------+---------------+--------------------+--------------------+----------+
|age|bloodpressure|diabetes|       features|       rawPrediction|         probability|prediction|
+---+-------------+--------+---------------+--------------------+--------------------+----------+
| 45|           63|       1|[45.0,63.0,1.0]|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|      45.0|
| 45|           80|       0|[45.0,80.0,0.0]|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|      45.0|
+---+-------------+--------+---------------+--------------------+--------------------+----------+



In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert predictions and labels to RDD
predictionAndLabels = rf_predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)



Confusion Matrix:
 [[20.  0.  1.]
 [ 0. 21.  0.]
 [ 2.  0. 19.]]


## **Model Evaluation**
You finish codes on the f1 and recall parts and run the code. Answer the question 1.

Accuracy

In [None]:
acc_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_dt = acc_evaluator_dt.evaluate(dt_predictions)
print("accuracy:"+str(acc_dt))

accuracy:0.9523809523809523


Precision

In [None]:
pr_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_dt = pr_evaluator_dt.evaluate(dt_predictions)
print("precision:"+str(precision_dt))

precision:0.9090909090909091


F1_score

In [None]:
f1_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator_dt.evaluate(dt_predictions)

print("F1 score = %g" % f1_score)

F1 score = 0.952354


Recall

In [None]:
rec_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
recall_dt = rec_evaluator_dt.evaluate(dt_predictions)
print("Recall score = %g" % recall_dt)

Recall score = 0.952381


## **3. Random forest Model**

## **Model**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
from pyspark.mllib.evaluation import MulticlassMetrics


In [None]:
# Load the data from pandas dataframe
pdf = pd.read_csv("/content/dataset.csv")
df = spark.createDataFrame(pdf)

In [None]:
# Define the features and label column names
feature_cols = ['age', 'bloodpressure', 'diabetes']
label_col = 'label'


In [None]:
# Convert the features and label column into a vector using VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [None]:
# Debugging code: print the schema of the training data
train_data.printSchema()


root
 |-- age: long (nullable = true)
 |-- bloodpressure: long (nullable = true)
 |-- diabetes: long (nullable = true)



In [None]:
pipeline = Pipeline(stages=[assembler, rf])


In [None]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=123)

In [None]:
# Fit the pipeline on the training data
model = pipeline.fit(train_data)

In [None]:
print(rf_model.stages[2])

RandomForestClassificationModel: uid=RandomForestClassifier_f5d0e2a8eab3, numTrees=9, numClasses=3, numFeatures=4


In [None]:
rf_predictions.show(5)

+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
|label|            features|indexedLabel|     indexedFeatures|rawPrediction|         probability|prediction|
+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
|  0.0|(4,[0,1,2,3],[-0....|         0.0|(4,[0,1,2,3],[-0....|[0.0,0.0,9.0]|       [0.0,0.0,1.0]|       2.0|
|  0.0|(4,[0,1,2,3],[-0....|         0.0|(4,[0,1,2,3],[-0....|[8.0,0.0,1.0]|[0.88888888888888...|       0.0|
|  0.0|(4,[0,1,2,3],[-0....|         0.0|(4,[0,1,2,3],[-0....|[9.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[6.0,0.0,3.0]|[0.66666666666666...|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[9.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
only showing top 5 

In [None]:
# Convert predictions to RDD
rf_preds_rdd = rf_predictions.select(['prediction','indexedLabel']).rdd.map(tuple)

# Instantiate metrics object
rf_metrics = MulticlassMetrics(rf_preds_rdd)

# Confusion matrix
print("Confusion Matrix:")
print(rf_metrics.confusionMatrix().toArray())

Confusion Matrix:
[[20.  0.  1.]
 [ 0. 21.  0.]
 [ 2.  0. 19.]]


## **Model Evaluation**
You finish codes on the precision and recall parts and run the code. Answer the question 2.

Accurancy

In [None]:
acc_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_rf = acc_evaluator_rf.evaluate(rf_predictions)
print("accurancy:"+str(acc_rf))

accurancy:0.9523809523809523


F1_score

In [None]:
f_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score_rf = f_evaluator_rf.evaluate(rf_predictions)
print("f1 score:"+str(f1_score_rf))

f1 score:0.9523539421440725


Precision

In [None]:
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision"
)
precision = precision_evaluator.evaluate(rf_predictions)

print("Precision = %g" % precision)

Precision = 0.95303


Recall

In [None]:
rec_evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
recall = rec_evaluator.evaluate(rf_predictions)

print("Recall = %g" % recall)

Recall = 0.952381


## **4. Naive Bayes Model**

In [None]:
from pyspark.ml.classification import NaiveBayes

In [None]:
import pandas as pd

In [None]:
from sklearn.naive_bayes import MultinomialNB

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from pyspark.ml.feature import VectorAssembler


In [None]:
data = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/dataset.csv")

In [None]:
data['_c0'] = pd.to_numeric(data['age'])
data['_c1'] = pd.to_numeric(data['bloodpressure'])
data['_c2'] = pd.to_numeric(data['diabetes'])

In [None]:
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values


In [None]:
train_data = data.sample(frac=0.7, random_state=42)
test_data = data.drop(train_data.index)


In [None]:
nb = MultinomialNB()

In [None]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

In [None]:
print(data.columns)


Index(['age', 'bloodpressure', 'diabetes', '_c0', '_c1'], dtype='object')


In [None]:
model = GaussianNB()


In [None]:
model.fit(train_data[['age', 'bloodpressure']], train_data['diabetes'])

In [None]:
# Make predictions on the test data
predictions = model.predict(test_data[['age', 'bloodpressure']])

In [None]:
# Confusion Matrix
conf_matrix = confusion_matrix(test_data['diabetes'], predictions)
print("Confusion Matrix:")
print(conf_matrix)


Confusion Matrix:
[[2]]


## **Model Evaluation**
You finish codes on the precision and recall parts and run the code. Answer the question 2.

Accuracy

In [None]:
accuracy = accuracy_score(test_data['diabetes'], predictions)
print(f"Model accuracy: {accuracy}")


Model accuracy: 1.0


Precision

In [None]:
precision = precision_score(test_data['diabetes'], predictions, zero_division=1)
print(f"Model precision: {precision}")

Model precision: 1.0


F1_score

In [None]:
f1 = f1_score(test_data['diabetes'], predictions, zero_division=1)
print(f"Model F1 score: {f1}")

Model F1 score: 1.0


Recall

In [None]:
recall = recall_score(test_data['diabetes'], predictions, zero_division=1)
print(f"Model recall: {recall}")

Model recall: 1.0
