<a href="https://colab.research.google.com/github/webrockerz2020/waste_classifcation_traditional_machine_learning/blob/main/waste_classification_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import numpy as np
import pandas as pd
import math
import cv2 as cv
import tqdm
import threading
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import pickle

from PIL import Image
from pathlib import Path
from skimage.feature import hog
from tqdm import tqdm #add progress bars to loops and iterable objects.
from multiprocessing import Pool
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score, matthews_corrcoef


warnings.filterwarnings("ignore")
pd.options.display.max_columns = None

#Connecting the driver
from google.colab import drive

drive.mount('/content/drive/')


Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [2]:
# Setting all the directories

root = '/content/drive/MyDrive/ML'
model = 'models'
#style_file = 'styles.csv'
image_folder_train = root + '/data/train/'
image_folder_train_o =image_folder_train + 'O/'
image_folder_train_r =image_folder_train + 'R/'

In [3]:
# Feature Engineering using HoG
def get_all_image_names_and_its_class_to_df(folder_name):
    #print(folder_name)
    df_local=pd.DataFrame(columns=['id','masterCategory'])
    for folder in os.listdir(folder_name):
      #print(folder)
      if folder != '.DS_Store':
            folder_path =os.path.join(folder_name,folder)
            #print(folder_path)
            temp_df=pd.DataFrame(columns=['id','masterCategory'])
            img_list=[]
            for img in os.listdir(folder_path):
                img_list.append(img)
            #print(folder)
            temp_df['id'] =img_list
            temp_df['masterCategory']= folder
            #print(temp_df)
            df_local = pd.concat([df_local, temp_df], ignore_index=True)
    return df_local

# Loading the images
def load_image(ids, path):
    img = cv.imread(path + ids, cv.IMREAD_GRAYSCALE)
    return img, ids

def process_image(id_path_tuple):
    id, path = id_path_tuple
    img, id = load_image(id, path)
    if img is not None:
        return [img, id]

# resizing of images
def resize_image(img,ids):
    return cv.resize(img, (80, 80),interpolation =cv.INTER_LINEAR) # Tired with 60,80 # tired 60,60 # tired 100 by 100 system crash

#bluring the images
def hog_to_blur_the_images(image):
    ppcr = 7 # tired with 8,7,5
    ppcc = 7 # Tired with 8,7,5
    blur = cv.GaussianBlur(image, (5, 5), 0) #kernal size 7,8 checked , # larger kernel more blur # changes sigma from 0
    fd, hog_image = hog(blur, orientations=5, pixels_per_cell=(ppcr, ppcc), cells_per_block=(2, 2), block_norm='L2', visualize=True)
    return hog_image, fd

#get the edges
def get_edges(img_list,n_samples):
  edges = [cv.Canny(image,50,150,apertureSize = 3) for image in img_list]
  #edges = [cv.Canny(image, 50, 200, apertureSize=3) for image in img_list]
  #edges = [cv.Canny(image, 50, 250, apertureSize=3) for image in img_list]
  edges = np.array(edges)
  #print(edges)
  n_samples_edges = len(edges)
  edge_images_train = edges.reshape((n_samples, -1))
  edge_images_train.shape
  return edge_images_train

#pixel intensity
def get_pixel_intnsity(img_list):
  histr_train = [cv.calcHist([img],[0],None,[256],[0,256]) for img in img_list] # bin size,color channel
  #histr_train = [cv.calcHist([img],[0],None,[300],[0,300]) for img in img_list]
  histr_train = np.array(histr_train)
  n_samples_histr_train = len(histr_train)
  image_hist_train = histr_train.reshape((n_samples_histr_train, -1))
  image_hist_train.shape
  return image_hist_train


In [4]:
#Multiproessing
# Mulitprocessing 1 --> Process Image
def create_a_multiprocessing(folder_name,df):
    df_ids = list(df.id)
    # Create a list of (id, path) tuples
    id_path_tuples = [(id, folder_name) for id in df_ids]
    # Create a multiprocessing pool
    pool = Pool(processes=8)
    # Process images in parallel
    results = list(tqdm(pool.imap(process_image, id_path_tuples), total=len(df_ids)))
    # Close the pool
    pool.close()
    pool.join()
    images= [result for result in results if result is not None]
    len(images)
    return images

# Multiprocessing 2 --> Blurring Image
def create_a_multiprocessing_blur(image_lst):
    pool = Pool(processes=5)
    hog_images, hog_features = zip(*pool.map(hog_to_blur_the_images, image_lst))
    pool.close()
    pool.join()

    hog_features = np.array(hog_features)
    return hog_images, hog_features

In [5]:
# Loading all the images
df_train = get_all_image_names_and_its_class_to_df(image_folder_train)
print(df_train.shape)
print(df_train.head(10))
print(df_train['masterCategory'].value_counts())

(25077, 2)
           id masterCategory
0  R_9506.jpg              R
1  R_9613.jpg              R
2  R_9751.jpg              R
3  R_9872.jpg              R
4  R_9354.jpg              R
5  R_9943.jpg              R
6  R_9265.jpg              R
7  R_9280.jpg              R
8  R_9287.jpg              R
9  R_9210.jpg              R
O    13966
R    11111
Name: masterCategory, dtype: int64


In [6]:
train_images_o =create_a_multiprocessing(image_folder_train_o,df_train)
train_images_r =create_a_multiprocessing(image_folder_train_r,df_train)
all_train_images = train_images_o+train_images_r
print("Length of train images",len(all_train_images))

100%|██████████| 25077/25077 [01:28<00:00, 282.96it/s]
100%|██████████| 25077/25077 [00:43<00:00, 581.71it/s] 


Length of train images 25077


In [7]:
all_images_resized_train = [[resize_image(x,y),y] for x,y in all_train_images]
df_labels_train = pd.DataFrame(all_images_resized_train,columns=['image','id'])


In [8]:
# Adding all the labels
target = 'masterCategory'
df_labels_train = pd.merge(df_labels_train,df_train,how='left',on=['id'])
df_labels_train = df_labels_train.fillna('Others')
df_labels_train['class'] = pd.factorize(df_labels_train[target])[0]
print("Data Shape: ", str(df_labels_train.shape))
print(df_labels_train[target].value_counts())

Data Shape:  (25077, 4)
O    13966
R    11111
Name: masterCategory, dtype: int64


In [9]:
train_images = np.stack(df_labels_train.image.values,axis=0)
n_samples_train = len(train_images)
print(n_samples_train)
data_images_train = train_images.reshape((n_samples_train, -1))

25077


In [None]:
hog_images_train, hog_features_train = create_a_multiprocessing_blur(train_images)

In [None]:
for img in hog_images_train[:2]:
    plt.imshow(img)
    plt.show()

In [None]:
#getting the edges
edge_images_train = get_edges(train_images,n_samples_train)

In [None]:
train_images.shape, hog_features_train.shape, edge_images_train.shape


In [None]:
edge_hog_train = np.hstack([hog_features_train,edge_images_train]) #to stack the sequence of input arrays horizontally (i.e. column wise) to make a single array
edge_hog_train.shape

In [None]:
#getting pixel intensity
image_hist_train = get_pixel_intnsity(train_images)
image_hist_train.shape

In [None]:
edge_hog = np.hstack([hog_features_train,edge_images_train,image_hist_train])
edge_hog.shape

In [None]:
del train_images #--> freeing up the space

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score, KFold
from sklearn import datasets, svm, metrics
from sklearn import metrics
from sklearn.decomposition import PCA


In [None]:
# Models
X_train, X_test, y_train, y_test = train_test_split(hog_features_train, df_train['masterCategory'], test_size=0.2, random_state=42)

**KNN**

In [None]:
def accuracy_measures(y_pred,y_test,pos):
  # Sensitivity (Recall)
    sensitivity = recall_score(y_test, y_pred, pos_label=pos)  # or pos_label='O' depending on your positive class
    print("Sensitivity (Recall):", sensitivity)

    # Precision
    precision = precision_score(y_test, y_pred, pos_label=pos)  # or pos_label='O' depending on your positive class
    print("Precision:", precision)

    # F1-score
    f1 = f1_score(y_test, y_pred, pos_label=pos)  # or pos_label='O' depending on your positive class
    print("F1-score:", f1)

    # Cohen's Kappa statistic
    kappa = cohen_kappa_score(y_test, y_pred)
    print("Cohen's Kappa:", kappa)

    # Matthews Correlation Coefficient (MCC)
    mcc = matthews_corrcoef(y_test, y_pred)
    print("Matthews Correlation Coefficient (MCC):", mcc)

    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)
y_train.value_counts()

In [None]:
# Testing with different values of K
test_accuracy = []
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)
lst_of_k= [3,5,7,10,15,20,25]
for i in lst_of_k:
  print(i)
  classifier = KNeighborsClassifier(n_neighbors=i,algorithm='brute')
  classifier.fit(X_scaled, y_train)
  y_pred = classifier.predict(scaler.transform(X_test))
  accuracy = accuracy_score(y_test, y_pred)
  print("Accuracy:", accuracy)

In [None]:
# Creating the Classifer for best Value of K =20
classifier = KNeighborsClassifier(n_neighbors=20, algorithm='brute')
classifier.fit(X_scaled, y_train)
test_accuracy = classifier.score(scaler.transform(X_test), y_test)
y_pred = classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred,'R')
accuracy_measures = (y_test, y_pred,'O')

In [None]:
# Applying PCA as the results were not good
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Apply PCA
n_components = 2000
pca = PCA(n_components=n_components)
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)

# Initialize and train the KNN classifier
k_neighbors = 20
classifier = KNeighborsClassifier(n_neighbors=k_neighbors, algorithm='brute')
classifier.fit(X_train_pca, y_train)
y_pred = classifier.predict(X_test_pca)
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

accuracy = accuracy_score(y_test, y_pred,'R')
accuracy_measures = (y_test, y_pred,'O')

In [None]:
# Saved the model using Pickel

with open(os.path.join(root,model,'knn_model_brute.pkl'), 'wb') as knnPickle:
    pickle.dump(classifier, knnPickle)

# Load the model from disk
loaded_model = pickle.load(open(os.path.join(root,model,'knn_model_brute.pkl'), 'rb'))


In [None]:
"""classifier = KNeighborsClassifier(n_neighbors=20,algorithm='kd_tree')
classifier.fit(X_scaled, y_train)
test_accuracy = classifier.score(scaler.transform(X_test), y_test)
print(test_accuracy)"""

"""classifier = KNeighborsClassifier(n_neighbors=20,algorithm='ball_tree')
classifier.fit(X_scaled, y_train)
test_accuracy = classifier.score(scaler.transform(X_test), y_test)
print(test_accuracy)"""

In [None]:
# Assuming you have the new image path

test_image = 'test_image/'
image = "pla.jfif"
image_name = image.split('.')[0]
image_name = image_name + ".jpeg"
new_image_path = root_dir + "/" + test_image + image
# Converting the Image to JPEG

im = Image.open(os.path.join(root_dir, test_image, image))
rgb_im = im.convert('RGB')
rgb_im.save(os.path.join(root_dir, test_image, image_name))
print("Image saved successfully ...")

# Load and preprocess the new image
X_new = Image.open(os.path.join(root_dir,test_image,image_name))
plt.imshow(X_new)
new_image = cv.imread(new_image_path, cv.IMREAD_GRAYSCALE)
new_image = cv.resize(new_image, (60, 80), interpolation=cv.INTER_LINEAR)  # Resize if needed
# Extract HOG features
ppcr = 7
ppcc = 7
blur = cv.GaussianBlur(new_image, (7, 7), 1)
fd, hog_features = hog(blur, orientations=5, pixels_per_cell=(ppcr, ppcc), cells_per_block=(2, 2), block_norm='L2', visualize=True)
stacked_features = np.stack([hog_features], axis=0)
reshaped_features = stacked_features.reshape((1, -1))
if reshaped_features.shape[1] != X_train.shape[1]:
    reshaped_features = reshaped_features[:, :X_train.shape[1]]

new_image_scaled = scaler.transform(reshaped_features)
new_image_pca = pca.transform(new_image_scaled)
predicted_class = classifier.predict(new_image_pca)
print("Predicted Class:", predicted_class)




In [None]:
Done with KNN

In [None]:
#Logistic regression
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

test_accuracy = []
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)

classifier = LogisticRegression(max_iter=5)
classifier.fit(X_scaled, y_train)
test_accuracy = classifier.score(scaler.transform(X_test), y_test)
print(test_accuracy)

In [None]:
#Random Forest

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler

In [None]:
rfClassifier = RandomForestClassifier()
rfClassifier.fit(X_train, y_train)
rfClassifier_accuracy = rfClassifier.score(X_test, y_test)
print('Accuracy of the Random Forest Classifier is: ', rfClassifier_accuracy)
print('\n')

cv_scores = cross_val_score(rfClassifier, X_test, y_test, cv=5)
print('Scores from cross-validation is: ', cv_scores)
print('Average accuracy from cross-validation is: {}'.format(np.mean(cv_scores)))
print('\n')

predictions = rfClassifier.predict(X_test)
print('---------- Model evaluation ----------')
print(classification_report(y_test, predictions))

In [None]:
scaler = StandardScaler()
X_scaled_train = scaler.fit_transform(X_train)
X_scaled_test = scaler.transform(X_test)

rfClassifier = RandomForestClassifier()
rfClassifier.fit(X_scaled_train, y_train)
rfClassifier_accuracy = rfClassifier.score(X_scaled_test, y_test)
print('Accuracy of the Random Forest Classifier is: ', rfClassifier_accuracy)
print('\n')

cv_scores = cross_val_score(rfClassifier, X_scaled_test, y_test, cv=5)
print('Scores from cross-validation is: ', cv_scores)
print('Average accuracy from cross-validation is: {}'.format(np.mean(cv_scores)))
print('\n')

predictions = rfClassifier.predict(X_scaled_test)
print('---------- Model evaluation ----------')
print(classification_report(y_test, predictions))

In [None]:
### Tuning hyperparams for the Random Forest Classifier

In [None]:
param_grid = {
	'n_estimators': [25, 50, 100, 150, 200],
	'max_features': ['sqrt', 'log2', None],
	'max_depth': [3, 6, 9, None],
	'max_leaf_nodes': [3, 6, 9, None],
}

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
from sklearn.model_selection import RandomizedSearchCV

random_search = RandomizedSearchCV(RandomForestClassifier(), param_grid)
random_search.fit(X_train, y_train)
print(random_search.best_estimator_)