In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, Flatten, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input

# Ensure Eager Execution is enabled
tf.config.run_functions_eagerly(True)

2024-06-07 08:12:54.168631: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-07 08:12:54.168763: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-07 08:12:54.434121: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
import os
import shutil
from sklearn.model_selection import train_test_split

# Define the root directory
root_dir = '/kaggle/input/capstone-final/Dataset Final'

# Define the output directories
output_train_dir = 'train'
output_val_dir = 'val'
output_test_dir = 'test'

# Create output directories if they don't exist
for output_dir in [output_train_dir, output_val_dir, output_test_dir]:
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

# Create output directories if they don't exist
for output_dir in [output_train_dir, output_val_dir, output_test_dir]:
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

def create_train_val_test_split(root, output_train, output_val, output_test, test_size=0.1, val_size=0.1):
    for category in os.listdir(root):
        category_path = os.path.join(root, category)
        if os.path.isdir(category_path):
            for sub_category in os.listdir(category_path):
                sub_category_path = os.path.join(category_path, sub_category)
                if os.path.isdir(sub_category_path):
                    files = [f for f in os.listdir(sub_category_path) if os.path.isfile(os.path.join(sub_category_path, f))]
                    
                    # Check if there are enough samples for splitting
                    if len(files) >= 3:  # Minimum 3 samples required for split
                        train_val_files, test_files = train_test_split(files, test_size=test_size, random_state=42)
                        train_files, val_files = train_test_split(train_val_files, test_size=val_size, random_state=42)
                        
                        train_output_path = os.path.join(output_train, category, sub_category)
                        val_output_path = os.path.join(output_val, category, sub_category)
                        test_output_path = os.path.join(output_test, category, sub_category)
                        os.makedirs(train_output_path, exist_ok=True)
                        os.makedirs(val_output_path, exist_ok=True)
                        os.makedirs(test_output_path, exist_ok=True)
                        
                        for file in train_files:
                            shutil.copy2(os.path.join(sub_category_path, file), os.path.join(train_output_path, file))
                        for file in val_files:
                            shutil.copy2(os.path.join(sub_category_path, file), os.path.join(val_output_path, file))
                        for file in test_files:
                            shutil.copy2(os.path.join(sub_category_path, file), os.path.join(test_output_path, file))
                    else:
                        print(f"Not enough samples in category {category}/{sub_category} for splitting.")

# Apply the function to create train-validation-test split
create_train_val_test_split(root_dir, output_train_dir, output_val_dir, output_test_dir)

In [3]:
# Define the main categories and sub-categories
main_category_labels = ['Anorganik', 'B3', 'Organik']
sub_category_labels = {
    'Anorganik': ['Kaleng', 'PET', 'Tas Plastik Belanja'],
    'B3': ['Aerosol', 'Baterai', 'Obat Kapsul'],
    'Organik': ['Daun', 'Kardus', 'Makanan Olahan']
}

In [4]:
# IMAGE_SIZE = (128, 128)
# BATCH_SIZE = 32

# # Create ImageDataGenerator instances with augmentation
# train_datagen = ImageDataGenerator(
#     rescale=1./255,
#     validation_split=0.2,
#     rotation_range=20,
#     width_shift_range=0.2,
#     height_shift_range=0.2,
#     shear_range=0.2,
#     zoom_range=0.2,
#     horizontal_flip=True,
#     fill_mode='nearest'
# )
# val_datagen = ImageDataGenerator(rescale=1./255)

In [5]:
# Custom function to extract labels from the file path
def extract_labels(file_path):
    parts = file_path.split(os.path.sep)
    main_category = parts[-3]  # Main category is the parent directory
    sub_category = parts[-2]   # Sub-category is the sub-directory
    return main_category, sub_category

# Create a custom data generator
class CustomImageDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, file_paths, labels, batch_size, image_size, augmentation=False, shuffle=True):
        self.file_paths = file_paths
        self.labels = labels
        self.batch_size = batch_size
        self.image_size = image_size
        self.augmentation = augmentation
        self.shuffle = shuffle
        self.on_epoch_end()
        
        if self.augmentation:
            self.datagen = ImageDataGenerator(
                rescale=1./255,
                rotation_range=20,
                width_shift_range=0.2,
                height_shift_range=0.2,
                shear_range=0.2,
                zoom_range=0.2,
                horizontal_flip=True,
                fill_mode='nearest'
            )
        else:
            self.datagen = ImageDataGenerator(rescale=1./255)

    def __len__(self):
        return len(self.file_paths) // self.batch_size

    def __getitem__(self, index):
        batch_file_paths = self.file_paths[index * self.batch_size:(index + 1) * self.batch_size]
        batch_labels = self.labels[index * self.batch_size:(index + 1) * self.batch_size]
        images = [tf.keras.preprocessing.image.load_img(fp, target_size=self.image_size) for fp in batch_file_paths]
        images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
        images = np.array(images) / 255.0
        
        if self.augmentation:
            augmented_images = []
            for img in images:
                augmented_img = self.datagen.random_transform(img)
                augmented_images.append(augmented_img)
            images = np.array(augmented_images)
        
        main_labels = [main_category_labels.index(lbl[0]) for lbl in batch_labels]
        sub_labels = [sub_category_labels[lbl[0]].index(lbl[1]) for lbl in batch_labels]
        return images, {
            'category_output': tf.keras.utils.to_categorical(main_labels, num_classes=len(main_category_labels)),
            'sub_category_output': tf.keras.utils.to_categorical(sub_labels, num_classes=len(sub_category_labels[batch_labels[0][0]]))
        }

    def on_epoch_end(self):
        if self.shuffle:
            temp = list(zip(self.file_paths, self.labels))
            np.random.shuffle(temp)
            self.file_paths, self.labels = zip(*temp)

In [6]:
# Define directory paths 
# base_dir = '/kaggle/working'
train_dir = '/kaggle/working/train'
train_file_paths = [os.path.join(root, name) for root, dirs, files in os.walk(train_dir) for name in files]
train_labels = [extract_labels(fp) for fp in train_file_paths]

val_dir = '/kaggle/working/val'
val_file_paths = [os.path.join(root, name) for root, dirs, files in os.walk(val_dir) for name in files]
val_labels = [extract_labels(fp) for fp in val_file_paths]

test_dir = '/kaggle/working/test'
test_file_paths = [os.path.join(root, name) for root, dirs, files in os.walk(test_dir) for name in files]
test_labels = [extract_labels(fp) for fp in test_file_paths]

In [7]:
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 32

# Create custom data generators with augmentation for the training set
train_generator = CustomImageDataGenerator(train_file_paths, train_labels, BATCH_SIZE, IMAGE_SIZE, augmentation=True, shuffle=True)
val_generator = CustomImageDataGenerator(val_file_paths, val_labels, BATCH_SIZE, IMAGE_SIZE, augmentation=False, shuffle=False)
test_generator = CustomImageDataGenerator(test_file_paths, test_labels, BATCH_SIZE, IMAGE_SIZE, augmentation=False, shuffle=False)


In [8]:
# # Create generators for training and validation data
# train_generator = train_datagen.flow_from_directory(
#     train_dir,
#     target_size=IMAGE_SIZE,
#     batch_size=BATCH_SIZE,
#     class_mode='categorical',
#     subset='training')

In [9]:
# val_generator = val_datagen.flow_from_directory(
#     '/kaggle/working/val',
#     target_size=IMAGE_SIZE,
#     batch_size=BATCH_SIZE,
#     class_mode='categorical')

In [10]:
# Load VGG16 model with pretrained weights
# resnet_base = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
effnetb3v2_base = tf.keras.applications.EfficientNetV2B3(input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3), include_top=False, weights='imagenet')
# Freeze the layers except the last 4 layers
# for layer in resnet_base.layers[:-4]:
#     layer.trainable = False

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/efficientnet_v2/efficientnetv2-b3_notop.h5
[1m52606240/52606240[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [11]:
# Create custom layers on top of the VGG16 base
# x = resnet_base.output
x = effnetb3v2_base.output
x = Flatten()(x)
x = Dense(1024, activation='relu')(x)
x = Dense(512, activation='relu')(x)
# x = BatchNormalization()(x)
x = Dropout(0.1)(x)
x = Dense(256, activation='relu')(x)
# x = BatchNormalization()(x)
x = Dropout(0.1)(x)
x = Dense(128, activation='relu')(x)
category_output = Dense(len(main_category_labels), activation='softmax', name='category_output')(x)
sub_category_output = Dense(len(sub_category_labels[main_category_labels[0]]), activation='softmax', name='sub_category_output')(x)

In [12]:
# Create the final model
model = Model(inputs=effnetb3v2_base.input, outputs=[category_output, sub_category_output])

# Compile the model with appropriate metrics
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss={'category_output': 'categorical_crossentropy', 'sub_category_output': 'categorical_crossentropy'},
    metrics={'category_output': 'accuracy', 'sub_category_output': 'accuracy'}
)
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [13]:
# Implement early stopping and learning rate reduction
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)


In [14]:
# Train the model
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=100,
    callbacks=[reduce_lr, early_stopping]
)

# Save the model
# model.save('trash_classification_vgg_model.h5')

Epoch 1/100


  self._warn_if_super_not_called()


[1m 47/114[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m3:14[0m 3s/step - category_output_accuracy: 0.5002 - loss: 2.0156 - sub_category_output_accuracy: 0.4599



[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m350s[0m 3s/step - category_output_accuracy: 0.5918 - loss: 1.7850 - sub_category_output_accuracy: 0.5358 - val_category_output_accuracy: 0.7995 - val_loss: 0.9931 - val_sub_category_output_accuracy: 0.7839 - learning_rate: 1.0000e-04
Epoch 2/100
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m341s[0m 3s/step - category_output_accuracy: 0.8353 - loss: 0.9280 - sub_category_output_accuracy: 0.7937 - val_category_output_accuracy: 0.8307 - val_loss: 0.8132 - val_sub_category_output_accuracy: 0.8125 - learning_rate: 1.0000e-04
Epoch 3/100
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m340s[0m 3s/step - category_output_accuracy: 0.8797 - loss: 0.6708 - sub_category_output_accuracy: 0.8611 - val_category_output_accuracy: 0.8932 - val_loss: 0.5973 - val_sub_category_output_accuracy: 0.8750 - learning_rate: 1.0000e-04
Epoch 4/100
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m341s[0m 3s/step - cate

In [15]:
# # Evaluate the model on the test set
# test_generator = val_datagen.flow_from_directory(
#     test_dir,
#     target_size=IMAGE_SIZE,
#     batch_size=BATCH_SIZE,
#     class_mode='categorical'
# )

model.evaluate(test_generator)
# Evaluate the model on the test dataset
# test_main_category_loss, test_sub_category_loss, test_main_category_acc, test_sub_category_acc = model.evaluate(test_generator)

# print(f'Test Loss: {test_loss}')
# print(f'Test Main Category Loss: {test_main_category_loss}')
# print(f'Test Sub Category Loss: {test_sub_category_loss}')
# print(f'Test Main Category Accuracy: {test_main_category_acc}')
# print(f'Test Sub Category Accuracy: {test_sub_category_acc}')

[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 505ms/step - category_output_accuracy: 0.9292 - loss: 0.4647 - sub_category_output_accuracy: 0.9455


[0.32574263215065, 0.953125, 0.9575892686843872]

In [16]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array

def predict_image(image_path):
    image = load_img(image_path, target_size=IMAGE_SIZE)
    image_array = img_to_array(image) / 255.0
    image_array = np.expand_dims(image_array, axis=0)  # Expand dimensions to match the model input shape

    predictions = model.predict(image_array)
    main_category_pred = main_category_labels[np.argmax(predictions[0])]
    sub_category_pred = sub_category_labels[main_category_pred][np.argmax(predictions[1])]

    print(f"Predicted Main Category: {main_category_pred}")
    print(f"Predicted Sub Category: {sub_category_pred}")

# Example usage of the predict_image function
example_image_path = '/kaggle/input/test-trashcan/WhatsApp Image 2024-06-07 at 3.00.53 PM.jpeg'
predict_image(example_image_path)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Predicted Main Category: Organik
Predicted Sub Category: Makanan Olahan


In [17]:
model.save('effnetb3v2_waste_classification.h5')

In [18]:
model.save_weights('effnetb3v2_waste_classification.weights.h5')

In [19]:
# # Function to predict and print results
# def predict_and_print_results(model, file_paths, labels):
#     true_labels_main = []
#     true_labels_sub = []
#     predictions_main = []
#     predictions_sub = []

#     for i, file_path in enumerate(file_paths):
#         img = load_img(file_path, target_size=IMAGE_SIZE)
#         img_array = img_to_array(img) / 255.0
#         img_array = np.expand_dims(img_array, axis=0)

#         prediction = model.predict(img_array)
#         predicted_main = np.argmax(prediction[0], axis=1)[0]
#         predicted_sub = np.argmax(prediction[1], axis=1)[0]

#         true_main_category = main_category_labels.index(labels[i][0])
#         true_sub_category = sub_category_labels_flat.index(labels[i][1])

#         true_labels_main.append(true_main_category)
#         true_labels_sub.append(true_sub_category)
#         predictions_main.append(predicted_main)
#         predictions_sub.append(predicted_sub)

#         print(f"File: {file_path}")
#         print(f"  True Main Category: {main_category_labels[true_main_category]}")
#         print(f"  Predicted Main Category: {main_category_labels[predicted_main]}")
#         print(f"  True Sub Category: {sub_category_labels_flat[true_sub_category]}")
#         print(f"  Predicted Sub Category: {sub_category_labels_flat[predicted_sub]}")
#         print()

#     return np.array(true_labels_main), np.array(true_labels_sub), np.array(predictions_main), np.array(predictions_sub)

# # Get true labels, predictions, and filenames
# true_labels_main, true_labels_sub, predictions_main, predictions_sub = predict_and_print_results(model, test_file_paths, test_labels)

In [20]:
# import numpy as np
# from sklearn.metrics import classification_report, confusion_matrix
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Flatten the sub-category labels dictionary
# sub_category_labels_flat = [label for sublist in sub_category_labels.values() for label in sublist]

# # Function to get true labels and predictions
# def get_labels_and_predictions(generator, model):
#     true_labels_main = []
#     true_labels_sub = []
#     predictions_main = []
#     predictions_sub = []

#     for batch_images, batch_labels in generator:
#         true_labels_main.extend(np.argmax(batch_labels['category_output'], axis=1))
#         true_labels_sub.extend(np.argmax(batch_labels['sub_category_output'], axis=1))
        
#         batch_predictions = model.predict(batch_images)
#         predictions_main.extend(np.argmax(batch_predictions[0], axis=1))
#         predictions_sub.extend(np.argmax(batch_predictions[1], axis=1))

#     return np.array(true_labels_main), np.array(true_labels_sub), np.array(predictions_main), np.array(predictions_sub)

# # Get true labels and predictions
# true_labels_main, true_labels_sub, predictions_main, predictions_sub = get_labels_and_predictions(test_generator, model)

# # Classification report for main category
# print("Classification Report for Main Category:")
# print(classification_report(true_labels_main, predictions_main, target_names=main_category_labels))

# # Ensure the labels parameter includes all possible labels
# all_sub_labels = list(range(len(sub_category_labels_flat)))

# # Classification report for sub-category
# print("Classification Report for Sub Category:")
# print(classification_report(true_labels_sub, predictions_sub, labels=all_sub_labels, target_names=sub_category_labels_flat))

# # Confusion matrix for main category
# conf_matrix_main = confusion_matrix(true_labels_main, predictions_main)
# plt.figure(figsize=(10, 7))
# sns.heatmap(conf_matrix_main, annot=True, fmt='d', cmap='Blues', xticklabels=main_category_labels, yticklabels=main_category_labels)
# plt.title('Confusion Matrix for Main Category')
# plt.xlabel('Predicted')
# plt.ylabel('True')
# plt.show()

# # Confusion matrix for sub-category
# conf_matrix_sub = confusion_matrix(true_labels_sub, predictions_sub, labels=all_sub_labels)
# plt.figure(figsize=(15, 10))
# sns.heatmap(conf_matrix_sub, annot=True, fmt='d', cmap='Blues', xticklabels=sub_category_labels_flat, yticklabels=sub_category_labels_flat)
# plt.title('Confusion Matrix for Sub Category')
# plt.xlabel('Predicted')
# plt.ylabel('True')
# plt.show()

In [21]:
# test_labels