# Training Data

### Labeling DataSet

In [5]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import load_img
from sklearn.metrics import classification_report 
import numpy as np
import sklearn
import os 
import shutil 
import random

In [6]:
train_gen = ImageDataGenerator(rescale=1./255)

# Turning every folder name into class label
train_data = train_gen.flow_from_directory(
    "../Datasets/combined-cleaned-dataset",
    target_size=(224,224),
    batch_size=32,
    class_mode= "categorical"
)

print(train_data.class_indices)

Found 18042 images belonging to 13 classes.
{'battery': 0, 'biological': 1, 'brown-glass': 2, 'cardboard': 3, 'clothes': 4, 'glass': 5, 'green-glass': 6, 'metal': 7, 'paper': 8, 'plastic': 9, 'shoes': 10, 'trash': 11, 'white-glass': 12}


In [7]:
# Spliting our dataset into training, validation, and testing 
def split_dataset(source_dir, output_dir, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    random.seed(42)
    
    for class_folder in os.listdir(source_dir):
        class_path = os.path.join(source_dir, class_folder)
        if not os.path.isdir(class_path):
            continue 
        
        images = [f for f in os.listdir(class_path) if f.lower().endswith(('.jpg','.jpeg','.png'))]
        random.shuffle(images)
    
        total = len(images)
        train_end = int(total * train_ratio)
        val_end = train_end + int(total * val_ratio)
    
        train_images = images[:train_end]
        val_images = images[train_end:val_end]
        test_images = images[val_end:]
    
        train_class_dir = os.path.join(output_dir, 'train', class_folder)
        val_class_dir = os.path.join(output_dir, 'val', class_folder)
        test_class_dir = os.path.join(output_dir,'test',class_folder)
    
        os.makedirs(train_class_dir, exist_ok=True)
        os.makedirs(val_class_dir, exist_ok=True)
        os.makedirs(test_class_dir, exist_ok=True)
    
        for img in train_images:
            shutil.copy2(os.path.join(class_path, img), os.path.join(train_class_dir, img))

        for img in val_images:
            shutil.copy2(os.path.join(class_path, img), os.path.join(val_class_dir, img))

        for img in test_images:
            shutil.copy2(os.path.join(class_path, img), os.path.join(test_class_dir, img))

        print(f" {class_folder}: {len(train_images)} train / {len(val_images)} val / {len(test_images)} test")
    
if __name__ == "__main__":
    source = "../Datasets/combined-cleaned-dataset"   
    destination = "../Notebooks/the_final_sortdown"          
    split_dataset(source, destination, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15)
    

 battery: 661 train / 141 val / 143 test
 biological: 689 train / 147 val / 149 test
 brown-glass: 424 train / 91 val / 92 test
 cardboard: 905 train / 194 val / 195 test
 clothes: 3727 train / 798 val / 800 test
 glass: 350 train / 75 val / 76 test
 green-glass: 440 train / 94 val / 95 test
 metal: 825 train / 176 val / 178 test
 paper: 1150 train / 246 val / 248 test
 plastic: 942 train / 202 val / 203 test
 shoes: 1383 train / 296 val / 298 test
 trash: 583 train / 125 val / 126 test
 white-glass: 542 train / 116 val / 117 test


In [12]:
# Data Augmentation
train_gen = ImageDataGenerator(
    rescale = 1./255,
    rotation_range = 20,
    width_shift_range = 0.1,
    height_shift_range = 0.1,
    brightness_range = [0.8, 1.2]
)

print("Training Data:")
train_data = tf.keras.utils.image_dataset_from_directory(
    "the_final_sortdown/train",
    label_mode='categorical',
    image_size=(224, 224),
    batch_size=32,
    shuffle=True
)
class_names = train_data.class_names


val_gen = ImageDataGenerator(rescale=1./255)
test_gen = ImageDataGenerator(rescale=1./255)

print("\nValidation Data:")
val_data = tf.keras.utils.image_dataset_from_directory(
    "the_final_sortdown/val",
    label_mode='categorical',
    image_size=(224, 224),
    batch_size=32,
    shuffle=True
)
print("\nTesting Data:")
test_data = tf.keras.utils.image_dataset_from_directory(
    "the_final_sortdown/test",
    label_mode='categorical',
    image_size=(224, 224),
    batch_size=32,
    shuffle=False
)

Training Data:
Found 16410 files belonging to 13 classes.

Validation Data:
Found 4985 files belonging to 13 classes.

Testing Data:
Found 5047 files belonging to 13 classes.


In [9]:
# Transfer Learning MobileNetV2
base_model = MobileNetV2(
    include_top = False,
    weights ='imagenet',
    input_shape = (224,224,3)
)
base_model.trainable = False

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x)

num_classes = len(train_data.class_names)
output = Dense(num_classes, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=output)

model.compile(
    optimizer=Adam(),
    loss = 'categorical_crossentropy',
    metrics = ['accuracy']
)

#model.summary()

In [10]:
early_stop = EarlyStopping(
    monitor='val_loss',   # What to monitor (validation loss is typical)
    patience=5,           # How many epochs without improvement before stopping
    restore_best_weights=True  # Optional, but restores the best model, not the last one
)


In [23]:
history = model.fit(
    train_data,
    validation_data=val_data,
    epochs = 7, 
    callbacks = [early_stop]
)

Epoch 1/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m291s[0m 562ms/step - accuracy: 0.5695 - loss: 1.3950 - val_accuracy: 0.7135 - val_loss: 0.9101
Epoch 2/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m262s[0m 511ms/step - accuracy: 0.7182 - loss: 0.8830 - val_accuracy: 0.7503 - val_loss: 0.7894
Epoch 3/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m281s[0m 548ms/step - accuracy: 0.7576 - loss: 0.7515 - val_accuracy: 0.7775 - val_loss: 0.6862
Epoch 4/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m279s[0m 545ms/step - accuracy: 0.7869 - loss: 0.6656 - val_accuracy: 0.7559 - val_loss: 0.7892
Epoch 5/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m271s[0m 528ms/step - accuracy: 0.8003 - loss: 0.6175 - val_accuracy: 0.8078 - val_loss: 0.6027
Epoch 6/7
[1m513/513[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m277s[0m 539ms/step - accuracy: 0.8150 - loss: 0.5538 - val_accuracy: 0.7922 - val_loss: 0.6237
Epoch 7/7


In [14]:
-
test_loss, test_acc = model.evaluate(test_data)
print(f"Test accuarcy: {test_acc:.2f}")

[1m158/158[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 390ms/step - accuracy: 0.0322 - loss: 2.9960
Test accuarcy: 0.06


In [13]:
# Adding Precision, Recall, and F1
from sklearn.metrics import classification_report

# Predict
y_pred = model.predict(test_data)
y_pred_classes = np.argmax(y_pred, axis=1)

# Get true labels
y_true = []
for images, labels in test_data.unbatch():
    y_true.append(np.argmax(labels.numpy()))
y_true = np.array(y_true)

# Print report
print(classification_report(y_true, y_pred_classes, target_names=class_names))


[1m158/158[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 407ms/step
              precision    recall  f1-score   support

     battery       0.00      0.00      0.00       265
  biological       0.10      0.03      0.05       268
 brown-glass       0.02      0.04      0.02       173
   cardboard       0.00      0.00      0.00       360
     clothes       0.33      0.00      0.00      1493
       glass       0.00      0.00      0.00       140
 green-glass       0.00      0.00      0.00       175
       metal       0.06      0.68      0.11       333
       paper       0.19      0.03      0.04       470
     plastic       0.00      0.00      0.00       381
       shoes       0.00      0.00      0.00       545
       trash       0.05      0.07      0.06       235
 white-glass       0.05      0.02      0.03       209

    accuracy                           0.06      5047
   macro avg       0.06      0.07      0.02      5047
weighted avg       0.13      0.06      0.02      5047



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [1]:
# Adding Reinforcement Learning Sysytem

import gym 
from gym import spaces 
import numpy as np 

In [2]:
class WasteDisposalEnv(gym.Env): 

    def __init__(self, classifier_model, test_data, co2_impact_mapping):

        super(WasteDisposalEnv, self).__init__()

        #used for predicting material type 

        self.classfier = classifier_model 

        #data for predict 

        self.data = test_data
        self.images, self.labels = self.get_data()  
        self.co2_impact_mapping = co2_impact_mapping 

        #define action space 4 disposal options(Recycle, Compost, Donate, Landfill)
        self.action_space = spaces.Discrete(4)

        self.observation_space = spaces.Box(low=0, high=1, shape=(self.classifier.output_shape[-1],), dtype=np.float32)
        
        self.current_idx = 0


    def get_data(self): 
        images = []
        labels = []
        for batch_x, batch_y in self.data.unbatch():
            images.append(batch_x.numpy())
            labels.append(np.argmax(batch_y.numpy()))
        images = np.array(images)
        labels = np.array(labels)
        return images, labels
        
    def reset(self): 
        self.current_idx = 0 
        image = self.images[self.current_idx]
        probs = self.classifier.predict(image[np.newaxis, ...])[0]
        return probs 
    
    def step(self, action):
        image = self.images[self.current_idx]
        true_label = self.labels[self.current_idx]

        probs = self.classifier.predict(image[np.newaxis, ...])[0]
        predicted_class = np.argmax(probs)

        reward = self._calculate_reward(predicted_class, action)

        self.current_idx += 1
        done = self.current_idx >= len(self.images)

        if not done:
            next_image = self.images[self.current_idx]
            next_probs = self.classifier.predict(next_image[np.newaxis, ...])[0]
        else:
            next_probs = np.zeros_like(probs)

        return next_probs, reward, done, {}
        
    def _calculate_reward(self, predicted_class, action):
        item_type = list(self.data.class_indices.keys())[predicted_class]
        impact_info = self.co2_impact_mapping.get(item_type, None)

        if impact_info is None:
            return -1  # Unknown item, small penalty

        max_co2_saved = impact_info["max_co2_saved"]
        actions = impact_info["actions"]

        action_name = {0: "recycle", 1: "compost", 2: "donate", 3: "landfill"}.get(action, None)

        if action_name is None:
            return -1  # Invalid action penalty

        # Get multiplier for action
        action_multiplier = actions.get(action_name, 0.0)  # 0 if action not listed

        # Reward is proportional
        reward = max_co2_saved * action_multiplier
        return reward


In [3]:
co2_impact_mapping = {
    "plastic": {
        "max_co2_saved": 10,  # in kg CO2
        "actions": {
            "recycle": 1.0,    # 100% of CO₂ savings if recycled
            "landfill": 0.0    # 0% savings if landfilled
        }
    },
    "food_waste": {
        "max_co2_saved": 8,
        "actions": {
            "compost": 1.0,
            "landfill": 0.1    # 10% saving (maybe still some methane capture)
        }
    },
    "electronics": {
        "max_co2_saved": 15,
        "actions": {
            "donate": 1.0,
            "landfill": 0.0
        }
    },
    "textile": {
        "max_co2_saved": 12,
        "actions": {
            "donate": 1.0,
            "landfill": 0.2    # Landfilling might still recover some energy
        }
    }
}


In [4]:
env = WasteDisposalEnv(classifier_model=model, test_data=test_data, co2_impact_mapping=co2_impact_mapping)


episode_rewards = []
co2_savings = []

num_episodes = 10  # you can change this to 100 later if you want

for episode in range(num_episodes):
    observation = env.reset()
    done = False
    total_reward = 0
    total_co2_saved = 0
     
while not done:
        # Random action for now (you can replace with a policy later)
        action = env.action_space.sample()

        next_observation, reward, done, info = env.step(action)

        total_reward += reward
        total_co2_saved += reward  # because reward is proportional to CO₂ saved

        observation = next_observation

episode_rewards.append(total_reward)
co2_savings.append(total_co2_saved)


NameError: name 'model' is not defined

In [None]:
print(f"Episode {episode}: Total Reward = {total_reward}, Total CO₂ Saved = {total_co2_saved:.2f} kg")


In [None]:
import matplotlib.pyplot as plt

# Plot total reward (CO₂ saved) per episode
plt.figure(figsize=(10, 6))
plt.plot(episode_rewards, marker='o')
plt.title('Total CO₂ Saved per Episode')
plt.xlabel('Episode')
plt.ylabel('Total CO₂ Saved (kg)')
plt.grid(True)
plt.show()
