In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import classification_report
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import BaggingClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier

This project analyzes traffic patterns using PySpark, leveraging 2,976 records with features like time, date, day of the week, vehicle counts (cars, bikes, buses, trucks), and traffic situations. The goal is to classify traffic conditions based on these inputs.

Use of Classification:
Predicting Traffic Congestion based on vehicle count and time.
Identifying Peak Hours using traffic trends.
Optimizing Transportation Planning by analyzing vehicle distribution.
Improving Public Transit Efficiency with bus and bike data.
This model aids city planners in managing traffic flow and reducing congestion.

In [2]:
Traffic = pd.read_csv("/Traffic.csv")

FileNotFoundError: [Errno 2] No such file or directory: '/Traffic.csv'

In [None]:
Traffic.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2976 entries, 0 to 2975
Data columns (total 9 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   Time               2976 non-null   object
 1   Date               2976 non-null   int64 
 2   Day of the week    2976 non-null   object
 3   CarCount           2976 non-null   int64 
 4   BikeCount          2976 non-null   int64 
 5   BusCount           2976 non-null   int64 
 6   TruckCount         2976 non-null   int64 
 7   Total              2976 non-null   int64 
 8   Traffic Situation  2976 non-null   object
dtypes: int64(6), object(3)
memory usage: 209.4+ KB


In [None]:
Traffic.head(20)

Unnamed: 0,Time,Date,Day of the week,CarCount,BikeCount,BusCount,TruckCount,Total,Traffic Situation
0,12:00:00 AM,10,Tuesday,31,0,4,4,39,low
1,12:15:00 AM,10,Tuesday,49,0,3,3,55,low
2,12:30:00 AM,10,Tuesday,46,0,3,6,55,low
3,12:45:00 AM,10,Tuesday,51,0,2,5,58,low
4,1:00:00 AM,10,Tuesday,57,6,15,16,94,normal
5,1:15:00 AM,10,Tuesday,44,0,5,4,53,low
6,1:30:00 AM,10,Tuesday,37,0,1,4,42,low
7,1:45:00 AM,10,Tuesday,42,4,4,5,55,low
8,2:00:00 AM,10,Tuesday,51,0,9,7,67,low
9,2:15:00 AM,10,Tuesday,34,0,4,7,45,low


In [None]:
# Preprocessing
# Encode the target variable as it is multi class
le = LabelEncoder()
Traffic['Traffic Situation'] = le.fit_transform(Traffic['Traffic Situation'])

In [None]:
Traffic.head(30)

Unnamed: 0,Time,Date,Day of the week,CarCount,BikeCount,BusCount,TruckCount,Total,Traffic Situation
0,12:00:00 AM,10,Tuesday,31,0,4,4,39,2
1,12:15:00 AM,10,Tuesday,49,0,3,3,55,2
2,12:30:00 AM,10,Tuesday,46,0,3,6,55,2
3,12:45:00 AM,10,Tuesday,51,0,2,5,58,2
4,1:00:00 AM,10,Tuesday,57,6,15,16,94,3
5,1:15:00 AM,10,Tuesday,44,0,5,4,53,2
6,1:30:00 AM,10,Tuesday,37,0,1,4,42,2
7,1:45:00 AM,10,Tuesday,42,4,4,5,55,2
8,2:00:00 AM,10,Tuesday,51,0,9,7,67,2
9,2:15:00 AM,10,Tuesday,34,0,4,7,45,2


In [None]:
Traffic.drop(columns=['Time','Date','Day of the week','Total'], inplace=True)

In [None]:
Traffic.head(10)

Unnamed: 0,CarCount,BikeCount,BusCount,TruckCount,Traffic Situation
0,31,0,4,4,2
1,49,0,3,3,2
2,46,0,3,6,2
3,51,0,2,5,2
4,57,6,15,16,3
5,44,0,5,4,2
6,37,0,1,4,2
7,42,4,4,5,2
8,51,0,9,7,2
9,34,0,4,7,2


In [None]:
#Clean the data and determine predictors and outcome

predictors = ["CarCount", "BikeCount", "BusCount", "TruckCount"]

x = Traffic[predictors]
y = Traffic["Traffic Situation"]

In [None]:
#3. Split the data
train_x, valid_x, train_y, valid_y = train_test_split(x, y, test_size = 0.2, random_state = 0)

In [None]:
train_x.info()
train_y.info()
valid_x.info()
valid_y.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2380 entries, 879 to 2732
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   CarCount    2380 non-null   int64
 1   BikeCount   2380 non-null   int64
 2   BusCount    2380 non-null   int64
 3   TruckCount  2380 non-null   int64
dtypes: int64(4)
memory usage: 93.0 KB
<class 'pandas.core.series.Series'>
Index: 2380 entries, 879 to 2732
Series name: Traffic Situation
Non-Null Count  Dtype
--------------  -----
2380 non-null   int64
dtypes: int64(1)
memory usage: 37.2 KB
<class 'pandas.core.frame.DataFrame'>
Index: 596 entries, 2783 to 1790
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   CarCount    596 non-null    int64
 1   BikeCount   596 non-null    int64
 2   BusCount    596 non-null    int64
 3   TruckCount  596 non-null    int64
dtypes: int64(4)
memory usage: 23.3 KB
<class 'pandas.core.series.Series'>
Index: 

In [None]:
pip install dmba



In [None]:
#baggingoriginal
BG = BaggingClassifier(DecisionTreeClassifier(random_state = 1),
                       n_estimators = 100, random_state = 1)
BG.fit(train_x, train_y)

predBG1 = BG.predict(valid_x)
print(classification_report(valid_y, predBG1))

              precision    recall  f1-score   support

           0       0.92      0.96      0.94       138
           1       0.88      0.85      0.86        60
           2       0.95      0.95      0.95        64
           3       0.98      0.97      0.97       334

    accuracy                           0.95       596
   macro avg       0.93      0.93      0.93       596
weighted avg       0.95      0.95      0.95       596



In [None]:
overs = SMOTE()
unders = RandomUnderSampler()
x_train_OS, y_train_OS = overs.fit_resample(train_x, train_y)
x_train_US, y_train_US = unders.fit_resample(train_x, train_y)


In [None]:
#baggingoversampled
BG2 = BaggingClassifier(DecisionTreeClassifier(random_state = 1),
                       n_estimators = 100, random_state = 1)
BG2.fit(x_train_OS, y_train_OS)

predBG2 = BG2.predict(valid_x)
print(classification_report(valid_y, predBG2))

              precision    recall  f1-score   support

           0       0.94      0.97      0.96       138
           1       0.81      0.93      0.87        60
           2       0.98      0.98      0.98        64
           3       0.99      0.95      0.97       334

    accuracy                           0.96       596
   macro avg       0.93      0.96      0.95       596
weighted avg       0.96      0.96      0.96       596



In [None]:
#baggingundersample
BG3 = BaggingClassifier(DecisionTreeClassifier(random_state = 1),
                       n_estimators = 100, random_state = 1)
BG3.fit(x_train_US, y_train_US)

predBG3 = BG3.predict(valid_x)
print(classification_report(valid_y, predBG3))

              precision    recall  f1-score   support

           0       0.94      0.95      0.94       138
           1       0.64      0.90      0.75        60
           2       0.88      1.00      0.93        64
           3       0.99      0.89      0.94       334

    accuracy                           0.91       596
   macro avg       0.86      0.93      0.89       596
weighted avg       0.93      0.91      0.92       596



In [None]:
#boostingoriginal
BT = AdaBoostClassifier(DecisionTreeClassifier(random_state=1),
                        n_estimators = 100, random_state=1)
BT.fit(train_x, train_y)

predBT1 = BT.predict(valid_x)
print(classification_report(valid_y,predBT1))


              precision    recall  f1-score   support

           0       0.93      0.93      0.93       138
           1       0.79      0.83      0.81        60
           2       0.91      0.97      0.94        64
           3       0.97      0.95      0.96       334

    accuracy                           0.94       596
   macro avg       0.90      0.92      0.91       596
weighted avg       0.94      0.94      0.94       596



In [None]:
#Boostingoversample
BT2 = AdaBoostClassifier(DecisionTreeClassifier(random_state=1),
                        n_estimators = 100, random_state=1)
BT2.fit(x_train_OS, y_train_OS)

predBT2 = BT2.predict(valid_x)
print(classification_report(valid_y,predBT2))

              precision    recall  f1-score   support

           0       0.94      0.93      0.94       138
           1       0.79      0.90      0.84        60
           2       0.98      0.97      0.98        64
           3       0.98      0.96      0.97       334

    accuracy                           0.95       596
   macro avg       0.92      0.94      0.93       596
weighted avg       0.95      0.95      0.95       596



In [None]:
#Boostingundersample
BT3 = AdaBoostClassifier(DecisionTreeClassifier(random_state=1),
                        n_estimators = 100, random_state=1)
BT3.fit(x_train_US, y_train_US)

predBT3 = BT3.predict(valid_x)
print(classification_report(valid_y,predBT3))

              precision    recall  f1-score   support

           0       0.94      0.90      0.92       138
           1       0.67      0.87      0.75        60
           2       0.88      1.00      0.93        64
           3       0.97      0.91      0.94       334

    accuracy                           0.91       596
   macro avg       0.86      0.92      0.89       596
weighted avg       0.92      0.91      0.92       596



In [None]:
#Random Forest original

RF = RandomForestClassifier(n_estimators = 500, random_state = 1)
RF.fit(train_x, train_y)

predRF = RF.predict(valid_x)
print(classification_report(valid_y, predRF))

              precision    recall  f1-score   support

           0       0.92      0.95      0.94       138
           1       0.85      0.78      0.82        60
           2       0.95      0.95      0.95        64
           3       0.96      0.97      0.97       334

    accuracy                           0.94       596
   macro avg       0.92      0.91      0.92       596
weighted avg       0.94      0.94      0.94       596



In [None]:
#RandomForestOverSampled

RF2 = RandomForestClassifier(n_estimators = 500, random_state = 1)
RF2.fit(x_train_OS, y_train_OS)

predRF2 = RF2.predict(valid_x)
print(classification_report(valid_y, predRF2))

              precision    recall  f1-score   support

           0       0.96      0.97      0.96       138
           1       0.84      0.97      0.90        60
           2       0.94      0.97      0.95        64
           3       0.99      0.95      0.97       334

    accuracy                           0.96       596
   macro avg       0.93      0.96      0.95       596
weighted avg       0.96      0.96      0.96       596



In [None]:
#RandomForestUnderSampled

RF3 = RandomForestClassifier(n_estimators = 500, random_state = 1)
RF3.fit(x_train_US, y_train_US)

predRF3 = RF3.predict(valid_x)
print(classification_report(valid_y, predRF3))

              precision    recall  f1-score   support

           0       0.92      0.95      0.94       138
           1       0.66      0.87      0.75        60
           2       0.88      1.00      0.93        64
           3       0.99      0.90      0.94       334

    accuracy                           0.92       596
   macro avg       0.86      0.93      0.89       596
weighted avg       0.93      0.92      0.92       596



In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("R_Forest").getOrCreate()

In [None]:
path = "/content/drive/MyDrive/Traffic.csv"
df = spark.read.csv(path, header = True, inferSchema = True)

In [None]:
df.show()

+-----------+----+---------------+--------+---------+--------+----------+-----+-----------------+
|       Time|Date|Day of the week|CarCount|BikeCount|BusCount|TruckCount|Total|Traffic Situation|
+-----------+----+---------------+--------+---------+--------+----------+-----+-----------------+
|12:00:00 AM|  10|        Tuesday|      31|        0|       4|         4|   39|              low|
|12:15:00 AM|  10|        Tuesday|      49|        0|       3|         3|   55|              low|
|12:30:00 AM|  10|        Tuesday|      46|        0|       3|         6|   55|              low|
|12:45:00 AM|  10|        Tuesday|      51|        0|       2|         5|   58|              low|
| 1:00:00 AM|  10|        Tuesday|      57|        6|      15|        16|   94|           normal|
| 1:15:00 AM|  10|        Tuesday|      44|        0|       5|         4|   53|              low|
| 1:30:00 AM|  10|        Tuesday|      37|        0|       1|         4|   42|              low|
| 1:45:00 AM|  10|  

In [None]:
df2 = df.drop("Time","Date","Day of the week", "Total")
df2.show()



+--------+---------+--------+----------+-----------------+
|CarCount|BikeCount|BusCount|TruckCount|Traffic Situation|
+--------+---------+--------+----------+-----------------+
|      31|        0|       4|         4|              low|
|      49|        0|       3|         3|              low|
|      46|        0|       3|         6|              low|
|      51|        0|       2|         5|              low|
|      57|        6|      15|        16|           normal|
|      44|        0|       5|         4|              low|
|      37|        0|       1|         4|              low|
|      42|        4|       4|         5|              low|
|      51|        0|       9|         7|              low|
|      34|        0|       4|         7|              low|
|      45|        0|       1|         1|              low|
|      45|        0|       1|         3|              low|
|      50|        0|       3|         0|              low|
|      34|        0|       4|         4|              lo

In [None]:
from pyspark.sql import functions as fn

df2 = df2.withColumn("Traffic Situation", fn.when(fn.col("Traffic Situation") == "high",1)
                     .when(fn.col("Traffic Situation") == "low",2).when(fn.col("Traffic Situation") == "normal",3)
                     .otherwise(0))

df2.show()

+--------+---------+--------+----------+-----------------+
|CarCount|BikeCount|BusCount|TruckCount|Traffic Situation|
+--------+---------+--------+----------+-----------------+
|      31|        0|       4|         4|                2|
|      49|        0|       3|         3|                2|
|      46|        0|       3|         6|                2|
|      51|        0|       2|         5|                2|
|      57|        6|      15|        16|                3|
|      44|        0|       5|         4|                2|
|      37|        0|       1|         4|                2|
|      42|        4|       4|         5|                2|
|      51|        0|       9|         7|                2|
|      34|        0|       4|         7|                2|
|      45|        0|       1|         1|                2|
|      45|        0|       1|         3|                2|
|      50|        0|       3|         0|                2|
|      34|        0|       4|         4|                

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ["CarCount", "BikeCount", "BusCount", "TruckCount"], outputCol = "Features")

data = assembler.transform(df2)

In [None]:
data.show()

+--------+---------+--------+----------+-----------------+--------------------+
|CarCount|BikeCount|BusCount|TruckCount|Traffic Situation|            Features|
+--------+---------+--------+----------+-----------------+--------------------+
|      31|        0|       4|         4|                2|  [31.0,0.0,4.0,4.0]|
|      49|        0|       3|         3|                2|  [49.0,0.0,3.0,3.0]|
|      46|        0|       3|         6|                2|  [46.0,0.0,3.0,6.0]|
|      51|        0|       2|         5|                2|  [51.0,0.0,2.0,5.0]|
|      57|        6|      15|        16|                3|[57.0,6.0,15.0,16.0]|
|      44|        0|       5|         4|                2|  [44.0,0.0,5.0,4.0]|
|      37|        0|       1|         4|                2|  [37.0,0.0,1.0,4.0]|
|      42|        4|       4|         5|                2|  [42.0,4.0,4.0,5.0]|
|      51|        0|       9|         7|                2|  [51.0,0.0,9.0,7.0]|
|      34|        0|       4|         7|

In [None]:
data = data.withColumnRenamed("Traffic Situation", "label")
data = data["features", "label"]
data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|  [31.0,0.0,4.0,4.0]|    2|
|  [49.0,0.0,3.0,3.0]|    2|
|  [46.0,0.0,3.0,6.0]|    2|
|  [51.0,0.0,2.0,5.0]|    2|
|[57.0,6.0,15.0,16.0]|    3|
|  [44.0,0.0,5.0,4.0]|    2|
|  [37.0,0.0,1.0,4.0]|    2|
|  [42.0,4.0,4.0,5.0]|    2|
|  [51.0,0.0,9.0,7.0]|    2|
|  [34.0,0.0,4.0,7.0]|    2|
|  [45.0,0.0,1.0,1.0]|    2|
|  [45.0,0.0,1.0,3.0]|    2|
|  [50.0,0.0,3.0,0.0]|    2|
|  [34.0,0.0,4.0,4.0]|    2|
|[129.0,22.0,42.0,...|    0|
|[144.0,16.0,49.0,...|    0|
|[111.0,28.0,20.0,...|    3|
|[67.0,11.0,10.0,1...|    3|
|[65.0,24.0,7.0,16.0]|    3|
|[94.0,27.0,7.0,16.0]|    3|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
train, valid = data.randomSplit([0.8, 0.2])

In [None]:
print(data.describe().show())

print()

print(train.describe().show())

print()

print(valid.describe().show())

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|              2976|
|   mean|1.9946236559139785|
| stddev|1.2593067291765943|
|    min|                 0|
|    max|                 3|
+-------+------------------+

None

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|              2375|
|   mean|1.9856842105263157|
| stddev|1.2660949094677194|
|    min|                 0|
|    max|                 3|
+-------+------------------+

None

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|               601|
|   mean|2.0299500831946755|
| stddev|1.2325183558487725|
|    min|                 0|
|    max|                 3|
+-------+------------------+

None


In [None]:
rf = RandomForestClassifier(labelCol = "label", featuresCol = "features",
                            numTrees = 40, maxDepth = 10)

mod = rf.fit(train)

In [None]:
pred = mod.transform(valid)

In [None]:
pred.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[6.0,24.0,12.0,13.0]|    3|[0.0,1.0129032258...|[0.0,0.0253225806...|       3.0|
| [9.0,18.0,24.0,9.0]|    3|[0.01282051282051...|[3.20512820512820...|       3.0|
| [10.0,0.0,0.0,22.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,0.0,0.0,29.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,0.0,1.0,34.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,0.0,1.0,38.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,1.0,0.0,26.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,1.0,1.0,12.0]|    2|  [0.0,0.0,40.0,0.0]|   [0.0,0.0,1.0,0.0]|       2.0|
| [10.0,1.0,1.0,34.0]|    3|  [0.0,0.0,0.0,40.0]|   [0.0,0.0,0.0,1.0]|       3.0|
| [10.0,1.0,13.0

In [None]:
#true positives tp0,tp1,tp2,tp3
tp00 = pred.filter((pred["label"]==0) & (pred["prediction"]==0)).count()
tp11 = pred.filter((pred["label"]==1) & (pred["prediction"]==1)).count()
tp22 = pred.filter((pred["label"]==2) & (pred["prediction"]==2)).count()
tp33 = pred.filter((pred["label"]==3) & (pred["prediction"]==3)).count()

#predicted as 0 but actual is 1,2,3
#fp1, fp2, fp3,fn0,fn0,fn0
fp10 = pred.filter((pred["label"]==1) & (pred["prediction"]==0)).count()
fp20 = pred.filter((pred["label"]==2) & (pred["prediction"]==0)).count()
fp30 = pred.filter((pred["label"]==3) & (pred["prediction"]==0)).count()

#predicted as 1 but actual is 0, 2, 3
#fp0, fp2, fp3, fn1, fn1,fn1
fp01 = pred.filter((pred["label"]==0) & (pred["prediction"]==1)).count()
fp21 = pred.filter((pred["label"]==2) & (pred["prediction"]==1)).count()
fp31 = pred.filter((pred["label"]==3) & (pred["prediction"]==1)).count()

#predicted as 1 but actual is 0,2,3
#fp0, fp1, fp3, fn2, fn2, fn2, fn2
fp02 = pred.filter((pred["label"]==0) & (pred["prediction"]==2)).count()
fp12 = pred.filter((pred["label"]==1) & (pred["prediction"]==2)).count()
fp32 = pred.filter((pred["label"]==3) & (pred["prediction"]==2)).count()

#predicted as 3 but actual is 0,1,2
#fp0, fp1,fp2, fn3, fn3, fn3
fp03 = pred.filter((pred["label"]==0) & (pred["prediction"]==3)).count()
fp13 = pred.filter((pred["label"]==1) & (pred["prediction"]==3)).count()
fp23 = pred.filter((pred["label"]==2) & (pred["prediction"]==3)).count()

print("Confusion Matrix")
print("TP00:", tp00)
print("TP11:", tp11)
print("TP22:", tp22)
print("TP33:", tp33)
print("FP10:", fp10)
print("FP20:", fp20)
print("FP30:", fp30)
print("FP01:", fp01)
print("FP21:", fp21)
print("FP31:", fp31)
print("FP02:", fp02)
print("FP12:", fp12)
print("FP32:", fp32)
print("FP03:", fp03)
print("FP13:", fp13)
print("FP23:", fp23)

Confusion Matrix
TP00: 123
TP11: 62
TP22: 67
TP33: 330
FP10: 3
FP20: 0
FP30: 3
FP01: 1
FP21: 0
FP31: 6
FP02: 0
FP12: 0
FP32: 0
FP03: 2
FP13: 4
FP23: 0


In [None]:
# Calculate the accuracy for each class
acc = ((tp00+ tp11 + tp22+ tp33)/(tp00+ tp11 + tp22+ tp33+ fp10 +fp20+ fp30+ fp01+ fp21+ fp31+ fp02+ fp12+ fp32+ fp03+ fp13+ fp23)) * 100
print("Accuracy",round(acc,2))




Accuracy 96.84


In [None]:
# Calculate the precision for each class

prec0 = tp00 / (tp00 + fp01 + fp02 + fp03) * 100
prec1 = tp11 / (tp11 + fp10 + fp12 +fp13) * 100
prec2 = tp22 / (tp22 + fp20 + fp21 + fp23) * 100
prec3 = tp33 / (tp33 + fp30 + fp31 + fp32) * 100

In [None]:
# Calculate the recall for each class


rec0 = tp00 / (tp00 + fp10+ fp20+ fp30) * 100
rec1 = tp11 / (tp11 + fp01 + fp21 + fp31) * 100
rec2 = tp22 / (tp22 + fp02 + fp12 + fp32) * 100
rec3 = tp33 / (tp33 + fp03 + fp13 + fp23) * 100

In [None]:
# Print the results
print("For class 0")
print("Precision:", round(prec0, 2))
print("Recall:", round(rec0, 2))


print("")
print("For class 1")
print("Precision:", round(prec1, 2))
print("Recall:", round(rec1, 2))


print("")
print("For class 2")
print("Precision:", round(prec2, 2))
print("Recall:", round(rec2, 2))




print("")
print("For class 3")
print("Precision:", round(prec3, 2))
print("Recall:", round(rec3, 2))



For class 0
Precision: 97.62
Recall: 95.35

For class 1
Precision: 89.86
Recall: 89.86

For class 2
Precision: 100.0
Recall: 100.0

For class 3
Precision: 97.35
Recall: 98.21


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import random

# Create a Spark session
spark = SparkSession.builder \
    .appName("Oversampling") \
    .getOrCreate()


# Calculate value counts of the column
value_counts = data.groupBy("label").count().orderBy("label")

# Show the value counts
value_counts.show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  682|
|    1|  321|
|    2|  304|
|    3| 1669|
+-----+-----+



In [None]:
# Define minority and majority labels
minority_labels0 = [0]
minority_labels1 = [1]
minority_labels2 = [2] # Labels considered as minority
majority_labels = [3]  # Labels considered as majority


# Separate minority and majority classes
minority_class0 = data.filter(col("label").isin(minority_labels0))
minority_class1 = data.filter(col("label").isin(minority_labels1))
minority_class2 = data.filter(col("label").isin(minority_labels2))
majority_class = data.filter(col("label").isin(majority_labels))

# Calculate the number of synthetic samples to generate
num_minority0 = minority_class0.count()
num_minority1= minority_class1.count()
num_minority2= minority_class2.count()
num_majority = majority_class.count()


oversampling_ratio0 = num_majority / num_minority0
oversampling_ratio1 = num_majority / num_minority1
oversampling_ratio2 = num_majority / num_minority2

# Duplicate minority class samples to balance the dataset
oversampled_minority0 = minority_class0.unionAll(minority_class0.sample(True, oversampling_ratio0 - 1))
oversampled_minority1 = minority_class1.unionAll(minority_class1.sample(True, oversampling_ratio1 - 1))
oversampled_minority2 = minority_class2.unionAll(minority_class2.sample(True, oversampling_ratio2 - 1))

# Combine oversampled minority class with majority class
balanced_data = oversampled_minority0.unionAll(majority_class)
balanced_data= balanced_data.unionAll(oversampled_minority1)
balanced_data= balanced_data.unionAll(oversampled_minority2)

# Show the balanced dataset
balanced_data.show()

# Calculate value counts of the column
value_counts = balanced_data.groupBy("label").count().orderBy("label")

# Show the value counts
value_counts.show()



+--------------------+-----+
|            features|label|
+--------------------+-----+
|[129.0,22.0,42.0,...|    0|
|[144.0,16.0,49.0,...|    0|
|[129.0,22.0,42.0,...|    0|
|[144.0,16.0,49.0,...|    0|
|[120.0,27.0,46.0,...|    0|
|[102.0,39.0,47.0,...|    0|
|[145.0,23.0,27.0,...|    0|
|[114.0,37.0,21.0,...|    0|
|[122.0,28.0,33.0,...|    0|
|[114.0,39.0,30.0,...|    0|
|[150.0,36.0,25.0,...|    0|
|[107.0,10.0,49.0,...|    0|
|[124.0,24.0,22.0,...|    0|
|[110.0,17.0,23.0,...|    0|
|[141.0,31.0,36.0,...|    0|
|[120.0,25.0,28.0,...|    0|
|[137.0,23.0,44.0,...|    0|
|[124.0,11.0,30.0,...|    0|
|[145.0,25.0,49.0,...|    0|
|[122.0,37.0,23.0,...|    0|
+--------------------+-----+
only showing top 20 rows

+-----+-----+
|label|count|
+-----+-----+
|    0| 1664|
|    1| 1657|
|    2| 1660|
|    3| 1669|
+-----+-----+



In [None]:
# Split the balanced data into training and testing sets
train_data, test_data = balanced_data.randomSplit([0.8, 0.2])


# Check if SparkContext is initialized
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
assert sc is not None

train_data.head()



Row(features=DenseVector([96.0, 16.0, 39.0, 17.0]), label=0)

In [None]:

# Train a Random Forest model on the balanced dataset
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)

# Make predictions on the test set
predictions = rf_model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9381368267831149


In [None]:
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[98.0,29.0,34.0,1...|    0|[6.37521142781598...|[0.31876057139079...|       1.0|
|[100.0,21.0,45.0,...|    0|[5.54832411673718...|[0.27741620583685...|       1.0|
|[100.0,40.0,29.0,...|    0|[13.0252955469737...|[0.65126477734868...|       0.0|
|[101.0,13.0,47.0,...|    0|[4.92904217760833...|[0.24645210888041...|       1.0|
|[101.0,20.0,47.0,...|    0|[5.54832411673718...|[0.27741620583685...|       1.0|
|[101.0,35.0,42.0,...|    0|[13.9092994817213...|[0.69546497408606...|       0.0|
|[101.0,37.0,31.0,...|    0|[13.0252955469737...|[0.65126477734868...|       0.0|
|[102.0,18.0,19.0,...|    0|[0.49375474979109...|[0.02468773748955...|       1.0|
|[103.0,34.0,49.0,...|    0|[13.9092994817213...|[0.69546497408606...|       0.0|
|[104.0,24.0,40.

In [None]:
#true positives tp0,tp1,tp2,tp3
tp00 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==0)).count()
tp11 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==1)).count()
tp22 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==2)).count()
tp33 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==3)).count()

#predicted as 0 but actual is 1,2,3
#fp1, fp2, fp3,fn0,fn0,fn0
fp10 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==0)).count()
fp20 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==0)).count()
fp30 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==0)).count()

#predicted as 1 but actual is 0, 2, 3
#fp0, fp2, fp3, fn1, fn1,fn1
fp01 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==1)).count()
fp21 = predictions.filter((predictions["label"]==2) & (predictions ["prediction"]==1)).count()
fp31 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==1)).count()

#predicted as 1 but actual is 0,2,3
#fp0, fp1, fp3, fn2, fn2, fn2, fn2
fp02 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==2)).count()
fp12 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==2)).count()
fp32 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==2)).count()

#predicted as 3 but actual is 0,1,2
#fp0, fp1,fp2, fn3, fn3, fn3
fp03 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==3)).count()
fp13 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==3)).count()
fp23 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==3)).count()

print("Confusion Matrix")
print("TP00:", tp00)
print("TP11:", tp11)
print("TP22:", tp22)
print("TP33:", tp33)
print("FP10:", fp10)
print("FP20:", fp20)
print("FP30:", fp30)
print("FP01:", fp01)
print("FP21:", fp21)
print("FP31:", fp31)
print("FP02:", fp02)
print("FP12:", fp12)
print("FP32:", fp32)
print("FP03:", fp03)
print("FP13:", fp13)
print("FP23:", fp23)

Confusion Matrix
TP00: 323
TP11: 328
TP22: 328
TP33: 310
FP10: 16
FP20: 0
FP30: 10
FP01: 19
FP21: 0
FP31: 25
FP02: 0
FP12: 0
FP32: 12
FP03: 0
FP13: 3
FP23: 0


In [None]:
# Calculate the accuracy for each class
acc = ((tp00+ tp11 + tp22+ tp33)/(tp00+ tp11 + tp22+ tp33+ fp10 +fp20+ fp30+ fp01+ fp21+ fp31+ fp02+ fp12+ fp32+ fp03+ fp13+ fp23)) * 100
print("Accuracy",round(acc,2))

Accuracy 93.81


In [None]:
# Calculate the precision for each class

prec0 = tp00 / (tp00 + fp01 + fp02 + fp03) * 100
prec1 = tp11 / (tp11 + fp10 + fp12 +fp13) * 100
prec2 = tp22 / (tp22 + fp20 + fp21 + fp23) * 100
prec3 = tp33 / (tp33 + fp30 + fp31 + fp32) * 100

# Calculate the recall for each class


rec0 = tp00 / (tp00 + fp10+ fp20+ fp30) * 100
rec1 = tp11 / (tp11 + fp01 + fp21 + fp31) * 100
rec2 = tp22 / (tp22 + fp02 + fp12 + fp32) * 100
rec3 = tp33 / (tp33 + fp03 + fp13 + fp23) * 100





In [None]:
# Print the results
print("For class 0")
print("Precision:", round(prec0, 2))
print("Recall:", round(rec0, 2))


print("")
print("For class 1")
print("Precision:", round(prec1, 2))
print("Recall:", round(rec1, 2))

print("")
print("For class 2")
print("Precision:", round(prec2, 2))
print("Recall:", round(rec2, 2))



print("")
print("For class 3")
print("Precision:", round(prec3, 2))
print("Recall:", round(rec3, 2))


For class 0
Precision: 94.44
Recall: 92.55

For class 1
Precision: 94.52
Recall: 88.17

For class 2
Precision: 100.0
Recall: 96.47

For class 3
Precision: 86.83
Recall: 99.04


In [None]:
#undersampling

# Create a Spark session
spark = SparkSession.builder \
    .appName("Undersampling") \
    .getOrCreate()

# Define minority and majority labels
majority_labels0 = [0]
majority_labels1 = [1]
minority_labels = [2]
majority_labels3 = [3]


# Separate minority and majority classes
majority_class0 = data.filter(col("label").isin(majority_labels0))
majority_class1 = data.filter(col("label").isin(majority_labels1))
minority_class2 = data.filter(col("label").isin(minority_labels2))
majority_class3 = data.filter(col("label").isin(majority_labels3))

# Calculate the number of synthetic samples to generate
num_majority0 = majority_class0.count()
num_majority1= majority_class1.count()
num_minority2= minority_class2.count()
num_majority3 = majority_class3.count()

undersampling_ratio0 = num_minority2 / num_majority0
undersampling_ratio1 = num_minority2 / num_majority1
undersampling_ratio3 = num_minority2 / num_majority3


# Sample the majority class to reduce its size
undersampled_majority0 = majority_class0.sample(False, undersampling_ratio0,seed=42)
undersampled_majority1 = majority_class1.sample(False, undersampling_ratio1,seed=42)
undersampled_majority3 = majority_class3.sample(False, undersampling_ratio3,seed=42)

# Combine undersampled majority class with minority class
balanced_data = undersampled_majority0.unionAll(minority_class2)
balanced_data = balanced_data.unionAll(undersampled_majority1)
balanced_data = balanced_data.unionAll(undersampled_majority3)


# Show the balanced dataset
balanced_data.show()

# Calculate value counts of the column
value_counts = balanced_data.groupBy("label").count().orderBy("label")

# Show the value counts
value_counts.show()



+--------------------+-----+
|            features|label|
+--------------------+-----+
|[144.0,16.0,49.0,...|    0|
|[114.0,37.0,21.0,...|    0|
|[124.0,24.0,22.0,...|    0|
|[137.0,23.0,44.0,...|    0|
|[145.0,25.0,49.0,...|    0|
|[125.0,34.0,37.0,...|    0|
|[125.0,19.0,25.0,...|    0|
|[125.0,15.0,49.0,...|    0|
|[125.0,31.0,48.0,...|    0|
|[142.0,22.0,37.0,...|    0|
|[122.0,29.0,30.0,...|    0|
|[118.0,23.0,38.0,...|    0|
|[125.0,30.0,48.0,...|    0|
|[114.0,20.0,43.0,...|    0|
|[101.0,37.0,31.0,...|    0|
|[105.0,35.0,48.0,...|    0|
|[116.0,25.0,33.0,...|    0|
|[112.0,11.0,50.0,...|    0|
|[104.0,31.0,30.0,...|    0|
|[146.0,18.0,40.0,...|    0|
+--------------------+-----+
only showing top 20 rows

+-----+-----+
|label|count|
+-----+-----+
|    0|  312|
|    1|  307|
|    2|  304|
|    3|  318|
+-----+-----+



In [None]:

train_data, test_data = balanced_data.randomSplit([0.8, 0.2], seed=42)


rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)

predictions = rf_model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)




Accuracy: 0.935064935064935


In [None]:
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[100.0,38.0,36.0,...|    0|[10.5922624074763...|[0.52961312037381...|       0.0|
|[101.0,23.0,45.0,...|    0|[8.06059815379780...|[0.40302990768988...|       1.0|
|[102.0,21.0,42.0,...|    0|[5.67318275229789...|[0.28365913761489...|       1.0|
|[105.0,14.0,16.0,...|    0|[0.26730930038788...|[0.01336546501939...|       1.0|
|[106.0,12.0,20.0,...|    0|[0.26730930038788...|[0.01336546501939...|       1.0|
|[107.0,28.0,38.0,...|    0|[10.7637558185034...|[0.53818779092517...|       0.0|
|[109.0,23.0,14.0,...|    0|[0.86992986716530...|[0.04349649335826...|       1.0|
|[110.0,23.0,34.0,...|    0|[7.40547217991520...|[0.37027360899576...|       1.0|
|[112.0,31.0,43.0,...|    0|[18.7559949521958...|[0.93779974760979...|       0.0|
|[112.0,33.0,29.

In [None]:
#true positives tp0,tp1,tp2,tp3
tp00 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==0)).count()
tp11 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==1)).count()
tp22 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==2)).count()
tp33 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==3)).count()

#predicted as 0 but actual is 1,2,3
#fp1, fp2, fp3,fn0,fn0,fn0
fp10 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==0)).count()
fp20 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==0)).count()
fp30 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==0)).count()

#predicted as 1 but actual is 0, 2, 3
#fp0, fp2, fp3, fn1, fn1,fn1
fp01 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==1)).count()
fp21 = predictions.filter((predictions["label"]==2) & (predictions ["prediction"]==1)).count()
fp31 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==1)).count()

#predicted as 1 but actual is 0,2,3
#fp0, fp1, fp3, fn2, fn2, fn2, fn2
fp02 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==2)).count()
fp12 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==2)).count()
fp32 = predictions.filter((predictions["label"]==3) & (predictions["prediction"]==2)).count()

#predicted as 3 but actual is 0,1,2
#fp0, fp1,fp2, fn3, fn3, fn3
fp03 = predictions.filter((predictions["label"]==0) & (predictions["prediction"]==3)).count()
fp13 = predictions.filter((predictions["label"]==1) & (predictions["prediction"]==3)).count()
fp23 = predictions.filter((predictions["label"]==2) & (predictions["prediction"]==3)).count()

print("Confusion Matrix")
print("TP00:", tp00)
print("TP11:", tp11)
print("TP22:", tp22)
print("TP33:", tp33)
print("FP10:", fp10)
print("FP20:", fp20)
print("FP30:", fp30)
print("FP01:", fp01)
print("FP21:", fp21)
print("FP31:", fp31)
print("FP02:", fp02)
print("FP12:", fp12)
print("FP32:", fp32)
print("FP03:", fp03)
print("FP13:", fp13)
print("FP23:", fp23)

Confusion Matrix
TP00: 42
TP11: 59
TP22: 55
TP33: 60
FP10: 2
FP20: 0
FP30: 1
FP01: 6
FP21: 0
FP31: 2
FP02: 0
FP12: 0
FP32: 3
FP03: 0
FP13: 1
FP23: 0


In [None]:
# Calculate the accuracy for each class
acc = ((tp00+ tp11 + tp22+ tp33)/(tp00+ tp11 + tp22+ tp33+ fp10 +fp20+ fp30+ fp01+ fp21+ fp31+ fp02+ fp12+ fp32+ fp03+ fp13+ fp23)) * 100
print("Accuracy",round(acc,2))

Accuracy 93.51


In [None]:
# Calculate the precision for each class

prec0 = tp00 / (tp00 + fp01 + fp02 + fp03) * 100
prec1 = tp11 / (tp11 + fp10 + fp12 +fp13) * 100
prec2 = tp22 / (tp22 + fp20 + fp21 + fp23) * 100
prec3 = tp33 / (tp33 + fp30 + fp31 + fp32) * 100

# Calculate the recall for each class


rec0 = tp00 / (tp00 + fp10+ fp20+ fp30) * 100
rec1 = tp11 / (tp11 + fp01 + fp21 + fp31) * 100
rec2 = tp22 / (tp22 + fp02 + fp12 + fp32) * 100
rec3 = tp33 / (tp33 + fp03 + fp13 + fp23) * 100




In [None]:
# Print the results
print("For class 0")
print("Precision:", round(prec0, 2))
print("Recall:", round(rec0, 2))


print("")
print("For class 1")
print("Precision:", round(prec1, 2))
print("Recall:", round(rec1, 2))


print("")
print("For class 2")
print("Precision:", round(prec2, 2))
print("Recall:", round(rec2, 2))



print("")
print("For class 3")
print("Precision:", round(prec3, 2))
print("Recall:", round(rec3, 2))


For class 0
Precision: 94.87
Recall: 92.5
Specificity: 98.98

For class 1
Precision: 97.14
Recall: 94.44
Specificity: 98.8

For class 2
Precision: 100.0
Recall: 95.65
Specificity: 100.0

For class 3
Precision: 89.71
Recall: 98.39
Specificity: 96.07
