#  Parallelized Grid Search Optimization for Water Quality Classification

In aquatic systems, water quality is an important indicator of overall health. Numerous adverse effects can result from poor water quality, both on human health and the surrounding environment. So, this dataset is about water quality metrics and how to classify water potability. We have 3276 samples and 10 features

X features:

1-ph: pH of 1. water (0 to 14).

2-Hardness: Capacity of water to precipitate soap in mg/L.

3-Solids: Total dissolved solids in ppm.

4-Chloramines: Amount of Chloramines in ppm.

5-Sulfate: Amount of Sulfates dissolved in mg/L.

6-Conductivity: Electrical conductivity of water in μS/cm.

7-Organic_carbon: Amount of organic carbon in ppm.

8-Trihalomethanes: Amount of Trihalomethanes in μg/L.

9-Turbidity: Measure of light emiting property of water in NTU.

Y feature:

10-Potability : Indicates if water is safe for human consumption where 1 means Potable and 0 means Not potable.

The link to the dataset: https://data.world/gymprathap/water-quality-dataset


### Libraries

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score,confusion_matrix
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTETomek
import matplotlib.pyplot as plt, seaborn as sns
from threading import Thread, Lock
from sklearn.metrics import confusion_matrix
import seaborn as sns
from joblib import parallel_backend
import time
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
import os
from joblib import Parallel, delayed

### Read The File

In [None]:
df2 = pd.read_csv(r"C:\Users\Noor\Desktop\parallel_pro\water_potability.csv")

#### Check for missing values

In [None]:
df2.isnull().sum()

### Fill the Missing Values with Median and Mean

#### Filling pH Missing Values

In [None]:
pH_nan_1 = df2.query('Potability == 1')['ph'][df2['ph'].isna()].index
df2.loc[pH_nan_1,'ph'] =df2.query('Potability == 1')['ph'][df2['ph'].notna()].median()

pH_nan_0 = df2.query('Potability == 0')['ph'][df2['ph'].isna()].index
df2.loc[pH_nan_0,'ph'] = df2.query('Potability == 0')['ph'][df2['ph'].notna()].median()

#### Filling Sulfate Missing Values

In [None]:
Sulfate_nan_1 = df2.query('Potability == 1')['Sulfate'][df2['Sulfate'].isna()].index
df2.loc[Sulfate_nan_1,'Sulfate'] =df2.query('Potability == 1')['Sulfate'][df2['Sulfate'].notna()].median()

Sulfate_nan_0 = df2.query('Potability == 0')['Sulfate'][df2['Sulfate'].isna()].index
df2.loc[Sulfate_nan_0,'Sulfate'] = df2.query('Potability == 0')['Sulfate'][df2['Sulfate'].notna()].median()

#### Filling Trihalomethanes Missing Values

In [None]:
Trihalomethanes_nan_1 = df2.query('Potability == 1')['Trihalomethanes'][df2['Trihalomethanes'].isna()].index
df2.loc[Trihalomethanes_nan_1,'Trihalomethanes'] =df2.query('Potability == 1')['Trihalomethanes'][df2['Trihalomethanes'].notna()].median()

Trihalomethanes_nan_0 = df2.query('Potability == 0')['Trihalomethanes'][df2['Trihalomethanes'].isna()].index
df2.loc[Trihalomethanes_nan_0,'Trihalomethanes'] = df2.query('Potability == 0')['Trihalomethanes'][df2['Trihalomethanes'].notna()].median()

#### Count the Null Values After Filling

In [None]:
df2.isnull().sum()

#### Count the Potable and Non-potable Values

In [None]:
Potability=df2["Potability"].value_counts()
Potability

## Train and Test Split

In [None]:
X = df2.drop('Potability',axis=1).values
Y = df2['Potability'].values

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split (X,Y, random_state= 10,stratify=Y ,test_size = 0.2)

###  SMOTETomek for  Balancing the Data - Oversampling

In [None]:
#smote
smote = SMOTETomek(random_state=10)
x_train_smote, y_train_smote = smote.fit_resample(X_train, Y_train)
print("X_train shape after smote: {} andn/ y_train shape after smote: {}".format(x_train_smote.shape,y_train_smote.shape))

In [None]:
y_dist= pd.DataFrame(data=y_train_smote, index=range(y_train_smote.shape[0]),columns=["Potability"])
sns.set_style("ticks", {"xtick.major.size": 8, "ytick.major.size": 8})
plt.figure(figsize=(8, 5))
plt.title("Potability Class Distribution After SMOTE")
sns.countplot(x="Potability", data=y_dist, palette='Paired');

# First Algorithm: Random Forest

In [None]:
#Define the RandomForestClassifier
rf_params = {
    'n_estimators': [ 100, 200],
    'max_depth': [ 10, 20],
    'min_samples_split': [2, 5],
    'min_samples_leaf': [1, 2, 4],
    'max_features': [ 'sqrt', 'log2']
}

## Create a function to train and evaluate the model:

In [None]:
# Create a function to train a RandomForestClassifier on a subset of the data
def train_rf(X_subset, y_subset, params):
    rf = RandomForestClassifier(**params)
    rf.fit(X_subset, y_subset)
    return rf

# Define a function to evaluate a model on the test set
def evaluate_model(model, X_test, Y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(Y_test, y_pred)
    return accuracy


## Non Parallel part:

In [None]:
#  GridSearchCV
grid_search_non_paralle_RF = GridSearchCV(RandomForestClassifier(), rf_params, cv=5, scoring='accuracy' )

# Measure time for non-parallelized grid search
start_time_non_parallel = time.time()
grid_search_non_paralle_RF.fit(x_train_smote, y_train_smote)
end_time_non_parallel = time.time()


In [None]:
# Evaluate the non-parallelized model

# Display the best hyperparameters and their corresponding accuracy
print("Best Hyperparameters (Non-parallel) of RF:", grid_search_non_paralle_RF.best_params_)

#print training time
training_time_of_non_parallel_RF=end_time_non_parallel-start_time_non_parallel

print(f"\nNon-parallelized model training time of RF= {training_time_of_non_parallel_RF} Sec")

RF_non_parallel = grid_search_non_paralle_RF.best_estimator_
y_pred_non_parallel = RF_non_parallel.predict(X_test)

accuracy_non_parallel_RF = accuracy_score(Y_test, y_pred_non_parallel)
print("\nAccuracy (Non-parallel) of RF:", accuracy_non_parallel_RF)

# Display the confusion matrix for the non-parallelized model
confusion_matrix_non_parallel = confusion_matrix(Y_test, y_pred_non_parallel)
print("\nConfusion Matrix (Non-parallel) of RF:")
print(confusion_matrix_non_parallel)

# Plot the confusion matrix for the non-parallelized model
sns.heatmap(confusion_matrix_non_parallel, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix (Non-parallel) of RF")
plt.show()

## Parallel Part

In [None]:
#  GridSearchCV
grid_search_paralle_RF = GridSearchCV(RandomForestClassifier(), rf_params, cv=5, scoring='accuracy',n_jobs=-1)
# Train RandomForestClassifiers in parallel on each subset
num_processors1 = os.cpu_count()

start_time_parallel_RF = time.time()
grid_search_paralle_RF.fit(x_train_smote, y_train_smote)
rf_models_parallel = Parallel(n_jobs=-1)(
    delayed(train_rf)(X_subset, y_subset, grid_search_paralle_RF.best_params_) for X_subset, y_subset in zip(np.array_split(x_train_smote, 4), np.array_split(y_train_smote, 4))
)

end_time_parallel_RF = time.time()

In [None]:
# Evaluate the parallelized models
accuracies_parallel = [evaluate_model(rf, X_test, Y_test) for rf in rf_models_parallel]

# Display the best hyperparameters and their corresponding accuracy
best_accuracy_parallel_RF = max(accuracies_parallel)
print("\nBest Hyperparameters (Parallel) of Rf:", grid_search_paralle_RF.best_params_)

#print training time
training_time_of_parallel_RF=end_time_parallel_RF-start_time_parallel_RF

print(f"\nTraining time of parallel RF = {training_time_of_parallel_RF} Sec" )

print("\nBest Accuracy (Parallel) of RF:", best_accuracy_parallel_RF)


# Evaluate the parallelized models
confusion_matrices_parallel = []
for rf in rf_models_parallel:
    y_pred_parallel = rf.predict(X_test)
    cm = confusion_matrix(Y_test, y_pred_parallel)
    confusion_matrices_parallel.append(cm)

# Display the confusion matrix for the last model in the parallelized set
print("\nConfusion Matrix (Parallel)of RF:")
print(confusion_matrices_parallel[-1])

# Plot the confusion matrix for the last model
sns.heatmap(confusion_matrices_parallel[-1], annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix (Parallel) of RF")
plt.show()

## Comparison between parallel and non-parallel  in RF

In [None]:
# Calculate Speed Up
training_RF_speed_up = training_time_of_non_parallel_RF / training_time_of_parallel_RF

# Calculate Efficiency
training_RF_efficiency = training_RF_speed_up / num_processors1


print("\nBest Hyperparameters of (Parallel)  RF:", grid_search_paralle_RF.best_params_)
print("Best Hyperparameters of (Non-parallel)  RF:", grid_search_non_paralle_RF.best_params_)

print("\nBest Accuracy of (Parallel)  RF:", best_accuracy_parallel_RF)
print("BestAccuracy of  (Non-parallel)  RF:", accuracy_non_parallel_RF)

print(f"\nTraining Time of (Parallelized)  RF: {training_time_of_parallel_RF} seconds")
print(f"Training time of (Non-parallelized) RF= {training_time_of_non_parallel_RF} Sec")


print(f"\nTraining Speed Up: {training_RF_speed_up:.4f}")

print(f"Training Efficiency: {training_RF_efficiency:.4f}")




# Second Algorithm: Support Vector Machine (SVM)

In [None]:
# Define the SVM parameters
svm_params  = {
    'C': [ 1, 10],
    'kernel': ['linear', 'rbf'],
    'gamma':  [0.01,0.1],
    'degree': [3,5]
}


## Create a function to train and evaluate the model:

In [None]:
# Create a function to train an SVM on a subset of the data
def train_svm(X_subset, y_subset, params):
    svm = SVC(**params)
    svm.fit(X_subset, y_subset)
    return svm

# Define a function to evaluate an SVM model on the test set
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

## Non Parallel part:

In [None]:
# Use GridSearchCV with Parallel and delayed for parallelized grid search
grid_search_non_parallel_svm = GridSearchCV(SVC(), svm_params, cv=5, scoring='accuracy')

# Measure time for non-parallelized grid search
start_time_non_parallel_svm = time.time()
grid_search_non_parallel_svm.fit(x_train_smote, y_train_smote)
end_time_non_parallel_svm = time.time()


In [None]:
# Evaluate the non-parallelized model

# Display the best hyperparameters and their corresponding accuracy
print("Best Hyperparameters of (Non-parallel) SVM:", grid_search_non_parallel_svm.best_params_)

#print training time
training_time_of_non_parallel_svm=end_time_non_parallel_svm-start_time_non_parallel_svm

print(f"\nNon-parallelized model training time of svm  = {training_time_of_non_parallel_svm} Sec")

SVM_non_parallel = grid_search_non_parallel_svm.best_estimator_
y_pred_non_parallel = SVM_non_parallel.predict(X_test)

accuracy_non_parallel_SVM = accuracy_score(Y_test, y_pred_non_parallel)
print("\nAccuracy of (Non-parallel) svm:", accuracy_non_parallel_SVM)

# Display the confusion matrix for the non-parallelized model
confusion_matrix_non_parallel = confusion_matrix(Y_test, y_pred_non_parallel)
print("\nConfusion Matrix of (Non-parallel) SVM:")
print(confusion_matrix_non_parallel)

# Plot the confusion matrix for the non-parallelized model
sns.heatmap(confusion_matrix_non_parallel, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix of (Non-parallel) SVM")
plt.show()

## Parallel Part

In [None]:
grid_search_paralle_svm = GridSearchCV(SVC(), svm_params, cv=5, scoring='accuracy', n_jobs=-1)

#Train SVMs in parallel on each subset
num_processors2 = os.cpu_count()

start_time_parallel_svm = time.time()
grid_search_paralle_svm.fit(x_train_smote, y_train_smote)

svm_models_parallel = Parallel(n_jobs=-1)(
    delayed(train_svm)(X_subset, y_subset, grid_search_paralle_svm.best_params_) for X_subset, y_subset in zip(np.array_split(x_train_smote, 4), np.array_split(y_train_smote, 4))
)
end_time_parallel_svm = time.time()


In [None]:
# Evaluate the parallelized models
accuracies_parallel = [evaluate_model(svm, X_test, Y_test) for svm in svm_models_parallel]

# Display the best hyperparameters and their corresponding accuracy
best_accuracy_parallel_SVM = max(accuracies_parallel)
print("\nBest Hyperparameters of (Parallel) SVM:", grid_search_paralle_svm.best_params_)

#print training time
training_time_of_parallel_SVM=end_time_parallel_svm-start_time_parallel_svm
print(f"\nparallelized model training time of SVM = {training_time_of_parallel_SVM} Sec")

print("\nBest Accuracy of (Parallel)  SVM :", best_accuracy_parallel_SVM)


# Evaluate the parallelized models
confusion_matrices_parallel = []
for svm in svm_models_parallel:
    y_pred_parallel = svm.predict(X_test)
    cm = confusion_matrix(Y_test, y_pred_parallel)
    confusion_matrices_parallel.append(cm)

# Display the confusion matrix for the last model in the parallelized set
print("\nConfusion Matrix of (Parallel) SVM :")
print(confusion_matrices_parallel[-1])

# Plot the confusion matrix for the last model
sns.heatmap(confusion_matrices_parallel[-1], annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix of  (Parallel) SVM ")
plt.show()

## Comparison between parallel and non-parallel  in SVM

In [None]:
# Calculate Speed Up
training_SVM_speed_up = training_time_of_non_parallel_svm / training_time_of_parallel_SVM

# Calculate Efficiency
training_SVM_efficiency = training_SVM_speed_up / num_processors2



print("\nBest Hyperparameters of (Parallel) SVM:", grid_search_paralle_svm.best_params_)
print("Best Hyperparameters of (Non-parallel) SVM:", grid_search_non_parallel_svm.best_params_)

print("\nBest Accuracy of (Parallel)  SVM :", best_accuracy_parallel_SVM)
print("Best Accuracy of (Non-parallel) SVM:", accuracy_non_parallel_SVM)

print(f"\nTraining Time of (Parallelized)  SVM: {training_time_of_parallel_SVM} seconds")
print(f"Training Time of (Non-parallelized)  SVM: {training_time_of_non_parallel_svm} Sec")


print(f"\nTraining Speed Up: {training_SVM_speed_up:.4f}")

print(f"Training Efficiency: {training_SVM_efficiency:.4f}")




# Third Algorithm: Gradient Boosting

In [None]:

# Define the Gradient Boosting parameters
gb_params = {
    'n_estimators': [100, 200],
    'learning_rate': [0.01, 0.1],
    'max_depth': [5, 7],
    'min_samples_split': [2, 5],
    'min_samples_leaf': [2, 4]
}

In [None]:
# Create a function to train a GradientBoostingClassifier on a subset of the data
def train_gb(X_subset, y_subset, params):
    gb = GradientBoostingClassifier(**params)
    gb.fit(X_subset, y_subset)
    return gb

# Define a function to evaluate a model on the test set
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

## Non paraller part

In [None]:
# Use GridSearchCV with Parallel and delayed for parallelized grid search
grid_search_non_parallel_gb = GridSearchCV(GradientBoostingClassifier(), gb_params, cv=5, scoring='accuracy')

# Measure time for non-parallelized grid search
start_time_non_parallel_gb = time.time()
grid_search_non_parallel_gb.fit(x_train_smote, y_train_smote)
end_time_non_parallel_gb = time.time()


In [None]:
# Evaluate the non-parallelized model

# Display the best hyperparameters and their corresponding accuracy
print("Best Hyperparameters of (Non-parallel) GB:", grid_search_non_parallel_gb.best_params_)

#print training time
training_time_of_non_paralle_GB=end_time_non_parallel_gb-start_time_non_parallel_gb
print(f"\nNon-parallelized model training time of GB = {training_time_of_non_paralle_GB} Sec")

GB_non_parallel = grid_search_non_parallel_gb.best_estimator_
y_pred_non_parallel = GB_non_parallel.predict(X_test)

accuracy_non_parallel_GB = accuracy_score(Y_test, y_pred_non_parallel)
print("\nAccuracy of (Non-parallel) GB:", accuracy_non_parallel_GB)

# Display the confusion matrix for the non-parallelized model
confusion_matrix_non_parallel = confusion_matrix(Y_test, y_pred_non_parallel)
print("\nConfusion Matrix of (Non-parallel) GB:")
print(confusion_matrix_non_parallel)

# Plot the confusion matrix for the non-parallelized model
sns.heatmap(confusion_matrix_non_parallel, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix of (Non-parallel) GB")
plt.show()

## parallel part:

In [None]:
# Train GradientBoostingClassifiers in parallel on each subset
# Use GridSearchCV with Parallel and delayed for parallelized grid search
grid_search_parallel_gb = GridSearchCV(GradientBoostingClassifier(), gb_params, cv=5, scoring='accuracy', n_jobs=-1)

num_processors3 = os.cpu_count()
start_time_parallel_gb = time.time()

grid_search_parallel_gb.fit(x_train_smote, y_train_smote)
gb_models_parallel = Parallel(n_jobs=-1)(
    delayed(train_gb)(X_subset, y_subset, grid_search_parallel_gb.best_params_) for X_subset, y_subset in zip(np.array_split(x_train_smote, 4), np.array_split(y_train_smote, 4))
)
end_time_parallel_gb = time.time()

In [None]:
# Evaluate the parallelized models
accuracies_parallel = [evaluate_model(gb, X_test, Y_test) for gb in gb_models_parallel]

# Display the best hyperparameters and their corresponding accuracy
best_accuracy_parallel_GB = max(accuracies_parallel)
print("\nBest Hyperparameters of (Parallel) GB:", grid_search_parallel_gb.best_params_)

#print training time
training_time_of_parallel_GB=end_time_parallel_gb-start_time_parallel_gb

print(f"\nparallelized model training time of GB = {training_time_of_parallel_GB} Sec")

print("\nBest Accuracy of (Parallel) GB :", best_accuracy_parallel_GB)


# Evaluate the parallelized models
confusion_matrices_parallel = []
for gb in gb_models_parallel:
    y_pred_parallel = gb.predict(X_test)
    cm = confusion_matrix(Y_test, y_pred_parallel)
    confusion_matrices_parallel.append(cm)

# Display the confusion matrix for the last model in the parallelized set
print("\nConfusion Matrix of (Parallel) GB :")
print(confusion_matrices_parallel[-1])

# Plot the confusion matrix for the last model
sns.heatmap(confusion_matrices_parallel[-1], annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix of (Parallel) GB ")
plt.show()

## Comparison between parallel and non-parallel  in GB

In [None]:
# Calculate Speed Up
training_GB_speed_up = training_time_of_non_paralle_GB / training_time_of_parallel_GB

# Calculate Efficiency
training_GB_efficiency = training_GB_speed_up / num_processors3



print("\nBest Hyperparameters of (Parallel) GB:", grid_search_parallel_gb.best_params_)
print("Best Hyperparameters of (Non-parallel) GB:", grid_search_non_parallel_gb.best_params_)

print("\nBest Accuracy of (Parallel) GB:", best_accuracy_parallel_GB)
print("Best Accuracy of (Non-parallel) GB:", accuracy_non_parallel_GB)

print(f"\nTraining Time of (Parallelized) GB: {training_time_of_parallel_GB} seconds")
print(f"Training Time  of (Non-parallelized)GB: {training_time_of_non_paralle_GB} seconds")


print(f"\n Training Speed Up: {training_GB_speed_up:.4f}")

print(f"Training Efficiency: {training_GB_efficiency:.4f}")



In [None]:
# Plotting the speed up the results
models = ['GB', 'SVM', 'RF']
durations = [training_GB_speed_up, training_SVM_speed_up, training_RF_speed_up]

fig, ax = plt.subplots()
ax.bar(models, durations, color=['blue', 'green', 'Red'])
ax.set_ylabel('Training Time (seconds)')
ax.set_title('Training Speedup Comparison')
plt.show()

In [None]:
# Plotting the results of efficiency
models = ['GB', 'SVM', 'RF']
durations = [training_GB_efficiency, training_SVM_efficiency, training_RF_efficiency]

fig, ax = plt.subplots()
ax.bar(models, durations, color=['blue', 'green', 'Red'])
ax.set_ylabel('Training Time (seconds)')
ax.set_title('Training efficiency Comparison')
plt.show()

This code was done by group AI7-1 for ARTI 503. Parallel AI7-1 course.

Prepared for Dr. Naya Nagy.

Group Members:
- Joury Alzayat
- Noor Aljashi
- Abrar Sebiany
- Manar Alsayed
