In [None]:
import matplotlib.pyplot as plt
import tensorflow.keras
import zipfile as zf
import numpy as np
import seaborn as sns
import pandas as pd

import sys
import cv2
import csv
import os
import re
import shutil
import random
from PIL import Image

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

from tensorflow.keras import backend as K
from tensorflow.keras.utils import to_categorical 
from tensorflow.keras.models import Sequential 
from tensorflow.data import Dataset
from tensorflow.keras.layers import InputLayer, Conv2D, MaxPooling2D, Dense, Dropout, Flatten, BatchNormalization

## Helper Functions

The following helper functions were written by Collin Ching for his post "How to build an image classifier for waste sorting"

article : https://towardsdatascience.com/how-to-build-an-image-classifier-for-waste-sorting-6d11d3c9c478

python notebook : https://nbviewer.jupyter.org/github/collindching/Waste-Sorter/blob/master/Waste%20sorter.ipynb

In [None]:
# splits indices for a folder into train, validation, and test indices with random sampling   
def split_indices(folder,seed1,seed2):    
    n = len(os.listdir(folder))
    full_set = list(range(1,n+1))

    ## train indices
    random.seed(seed1)
    train = random.sample(list(range(1,n+1)),int(.5*n))

    ## temp
    remain = list(set(full_set)-set(train))

    ## separate remaining into validation and test
    random.seed(seed2)
    valid = random.sample(remain,int(.5*len(remain)))
    test = list(set(remain)-set(valid))
    
    return(train,valid,test)

# gets file names for a particular type of trash, given indices
def get_names(waste_type,indices):
    file_names = [waste_type+str(i)+".jpg" for i in indices]
    return(file_names)    

# moves group of source files to another folder
def move_files(source_files,destination_folder):
    for file in source_files:
        shutil.move(file,destination_folder)

In [None]:
def create_directory(): 
    # paths will be train/cardboard, train/glass, etc...
    subsets = ['train','valid']
    waste_types = ['cardboard','glass','metal','paper','plastic','trash']

    # create destination folders for data subset and waste type
    for subset in subsets:
        for waste_type in waste_types:
            folder = os.path.join(data_path,subset,waste_type)
            if not os.path.exists(folder):
                os.makedirs(folder)

    if not os.path.exists(os.path.join(data_path,'test')):
        os.makedirs(os.path.join(data_path,'test'))

    # move files to destination folders for each waste type
    for waste_type in waste_types:
        source_folder = os.path.join(path,waste_type)
        train_ind, valid_ind, test_ind = split_indices(source_folder,1,1)

        # move source files to train
        train_names = get_names(waste_type,train_ind)
        train_source_files = [os.path.join(source_folder,name) for name in train_names]
        train_dest = train_path+"/"+waste_type
        move_files(train_source_files,train_dest)

        # move source files to valid
        valid_names = get_names(waste_type,valid_ind)
        valid_source_files = [os.path.join(source_folder,name) for name in valid_names]
        valid_dest = valid_path+"/"+waste_type
        move_files(valid_source_files,valid_dest)

        # move source files to test
        test_names = get_names(waste_type,test_ind)
        test_source_files = [os.path.join(source_folder,name) for name in test_names]

        # I use data/test here because the images can be mixed up
        move_files(test_source_files,test_path)

In [None]:
def createFileList(myDir, format='.jpg'):
    fileList = []
    print(myDir)
    for root, dirs, files in os.walk(myDir, topdown=False):
        for name in files:
            if name.endswith(format):
                fullName = os.path.join(root, name)
                fileList.append(fullName)
    return fileList

End of Collin's code

In [None]:
def stringContains(filePath):
    if "paper" in filePath:
        return 0
    elif "cardboard" in filePath:
        return 1
    elif "trash" in filePath:
        return 2
    elif "plastic" in filePath:
        return 3
    elif "metal" in filePath:
        return 4
    elif "glass" in filePath:
        return 5
    else:
        return 6

def convertToMatrix(fileList,num_images):
    
    num_pixels = 196608
    data = np.empty((num_images,num_pixels))
    labels = []
    label_count = [0,0,0,0,0,0,0]
    index = 0
    
    for file in fileList:
        
        img = Image.open(file) # open image
        arr = np.array(img.convert('L')) # convert to grey and place in numpy array
        vec = arr.ravel() # flatten to vector
        data[index,:] = vec.T # add to matrix
        
        name = stringContains(img.filename) # get image name
        labels = np.append(labels,name) # add to array
        label_count[name] += 1
        
        index += 1

    return data, labels, label_count

## Import Images

In [None]:
path = "dataset-resized"
data_path = "/data"
test_path = "/data/test"
train_path = "/data/train"
valid_path = "/data/valid"

In [None]:
files = zf.ZipFile("dataset-resized.zip",'r')
files.extractall()
files.close()

print(files)

In [None]:
os.listdir(os.path.join(os.getcwd(),"dataset-resized"))

In [None]:
create_directory()

In [None]:
train_files = createFileList(train_path,format='.jpg')
test_files = createFileList(test_path,format='.jpg')
valid_files = createFileList(valid_path,format='.jpg')
fileList = createFileList(path,format='.jpg')

In [None]:
print(len(train_files), "training images")
print(len(test_files), "testing images")
print(len(valid_files), "validation images")

## Data Processing

Convert images to arrays

In [None]:
train,train_l,train_count = convertToMatrix(train_files,1262)
test,test_l,test_count = convertToMatrix(test_files,635)
valid,valid_l,valid_count = convertToMatrix(valid_files,630)

train = train.astype('float32')
train_l = train_l.astype('uint8')
test = test.astype('float32')
test_l = test_l.astype('uint8')
valid = valid.astype('float32')
valid_l = valid_l.astype('uint8')

print("data")
print(train.shape)
print(test.shape)
print(valid.shape)

print("")

print("labels")
print(train_l.shape)
print(test_l.shape)
print(valid_l.shape)

print("")
print("count")
print(train_count)
print(test_count)
print(valid_count)

In [None]:
class_names = ['paper', 'cardboard', 'trash', 'plastic', 'metal','glass']

random_idx = []
for i in range(25):
    random.seed(i)
    random_idx.append(random.randrange(0,1262))
    
plt.figure(figsize=(10,10))
for i in range(25):
    
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    
    idx = random_idx[i]
    image = train[idx].reshape((384,512))
    plt.imshow(image)
    plt.xlabel(class_names[train_l[idx]])
    
plt.show()

Convert labels to one hot encoding

In [None]:
train_labels = to_categorical(train_l).astype('uint8')
test_labels = to_categorical(test_l).astype('uint8')
valid_labels = to_categorical(valid_l).astype('uint8')

print("Labels:")
print("train", train_l.shape)
print("test", test_l.shape)
print("validate", valid_l.shape)

print("")

print("One Hot Encoded Labels:")
print("train", train_labels.shape)
print("test", test_labels.shape)
print("validate", valid_labels.shape)

print("")

print(train_l[0])
print(train_labels[0])

Reshape data

In [None]:
rows = 384
cols = 512

## channel last configuration
train_data = train.reshape(train.shape[0],rows,cols,1)
test_data = test.reshape(test.shape[0],rows,cols,1)
valid_data = valid.reshape(valid.shape[0],rows,cols,1)

Normalise data

In [None]:
train_data /= 255.0
test_data /= 255.0
valid_data /= 255.0

print(train_data.shape)
print(test_data.shape)
print(valid_data.shape)

## CNN Architecture

In [None]:
# input data
n_classes = 6
rows = 384
cols = 512
input_shape = (rows,cols,1)
class_weights = {0: 7,
                 1: 10,
                 2: 30,
                 3: 8,
                 4: 10,
                 5: 8}

# hyperparameters
batch_size = 75
epochs = 200
dropout_prob = 0.5
lr = 0.01
m = 0.0

# model architecture
opt = tensorflow.keras.optimizers.SGD(learning_rate=lr, momentum=m)
pool_size = 2
num_filters1 = 32
num_filters2 = 64
num_filters3 = 128
filter_size = 5
filter_size = 5

In [None]:
p = {'lr': (0.001,0.01,0.1),
     'batch_size': (50,75,100),
     'epochs': (50,100,200),
     'dropout': (0,0.2,1),
     'mom': (0.0,0.2,0.4)
}

In [None]:
def waste_classifier_model_talos(x_train,y_train,x_val,y_val,params): 
    model = Sequential() #name="Waste Classifier Model 2")

    model.add(InputLayer(input_shape=input_shape))

    model.add(Dropout(params['dropout']))

    model.add(Conv2D(num_filters1,filter_size,activation='relu',data_format="channels_last"))#,input_shape=input_shape,))
    model.add(MaxPooling2D(pool_size=pool_size))
    
    model.add(Conv2D(num_filters2,filter_size, activation='relu'))
    model.add(MaxPooling2D(pool_size=pool_size))

    # the output softmax layer will have one node for each class
    model.add(Flatten())
    model.add(Dense(n_classes,activation='softmax'))
    
    opt = tensorflow.keras.optimizers.SGD(learning_rate=params['lr'], momentum=params['mom'])
    model.compile(loss='mse',optimizer=opt,metrics=['accuracy'])

    model.summary()

    output = model.fit(x_train,y_train,batch_size=params['batch_size'],epochs=params['epochs'],class_weight=class_weights,validation_data=[x_val,y_val])

    return output, model

In [None]:
scan_object = talos.Scan(train_data,
                         train_labels,
                         params=p,
                         model=waste_classifier_model_talos,
                         experiment_name='waste_classifier',
                         fraction_limit=.001)

In [None]:
def waste_classifier_model(): 
    model = Sequential()

    model.add(InputLayer(input_shape=input_shape))

    model.add(Conv2D(num_filters1,filter_size,activation='relu',data_format="channels_last"))
    model.add(MaxPooling2D(pool_size=pool_size))
    
    model.add(Conv2D(num_filters2,filter_size, activation='relu'))
    model.add(MaxPooling2D(pool_size=pool_size))

    # the output softmax layer will have one node for each class
    model.add(Flatten())
    model.add(Dense(n_classes,activation='softmax'))
    
    opt = tensorflow.keras.optimizers.SGD(learning_rate=lr, momentum=m)
    model.compile(loss='mse',optimizer=opt,metrics=['accuracy'])

    model.summary()

    return model

In [None]:
# K.clear_session()
# waste_model = waste_classifier_model()

Convert to tensorflow dataset

In [None]:
# train_dataset = Dataset.from_tensor_slices((train_data, train_labels))
# test_dataset = Dataset.from_tensor_slices((test_data, test_labels))
# valid_dataset = Dataset.from_tensor_slices((valid_data, valid_labels))

# train_dataset = train_dataset.batch(batch_size)
# test_dataset = test_dataset.batch(batch_size)
# valid_dataset = valid_dataset.batch(batch_size)

# print(train_dataset)


In [None]:
# waste_model.fit(train_dataset.shuffle(3000),
#                 class_weight=class_weights,
#                 epochs=epochs,
#                 verbose=2)

In [None]:
# test_loss, test_acc = waste_model.evaluate(test_dataset)

# print("Test accuracy:", test_acc)

In [None]:
# print(test_dataset)
# predictions = waste_model.predict(test_dataset)

In [None]:
# print(predictions.shape)

# print(predictions[0])

In [None]:
# class_labels = ['paper', 'cardboard', 'trash', 'plastic', 'metal', 'glass']
# matrix = confusion_matrix(test_labels.argmax(axis=1), predictions.argmax(axis=1))

In [None]:
# print(test_labels.shape)

In [None]:
# print('Confusion Matrix')
# print(matrix)
# print('Classification Report')
# print(classification_report(test_labels.argmax(axis=1), predictions.argmax(axis=1), target_names=class_labels))