<a href="https://colab.research.google.com/github/kteppris/RealWaste/blob/main/DeepLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import sys
import os
from pathlib import Path

# Funktion, die prüft, ob dieses Skript in Colab ausgeführt wird
def in_colab():
    # sys beinhaltet alle bereits geladenen Pakete
    return 'google.colab' in sys.modules  # google.colab wird automatisch in Colab vorgeladen, daher Indiz für Colab Environment

code_repo_path = Path("/content/drive/MyDrive/MADS/RealWaste") if in_colab() else Path.cwd()

if in_colab():
  from google.colab import drive
  drive.mount('/content/drive')
  if not code_repo_path.exists():
    raise FileNotFoundError("Code und Datenordner konnte nicht gefunden werden. Bitte zuerst das Notebook '1_Hauptnotebook.ipynb' ausführen.")
  else:
    # wechsle Arbeitsverzeichnis zu code_repo_path
    %cd {code_repo_path}
    print(f"Wechsel Arbeitsverzeichnis zu {code_repo_path}")
    %pip install -r requirements.txt
else:
  print("Stellen Sie sicher, dass das richtige Venv als Kernel ausgewählt ist,\nwelches wie in '1_Hauptnotebook.ipynb' beschrieben erstellt wurde.")

data_path = code_repo_path / "data"

Stellen Sie sicher, dass das richtige Venv als Kernel ausgewählt ist,
welches wie in Hauptdokument beschrieben erstellt wurde.


In [4]:
from pytorch_lightning import LightningDataModule
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import os

class ConfigurableDataModule(LightningDataModule):
    """Class wraper für mit austauschbaren transforms"""
    def __init__(self, data_dir: str, batch_size: int, transform):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.transform = transform

    def setup(self, stage=None):
        # Erstellen des Datensatzes als Instanz von ImageFolder
        full_dataset = ImageFolder(root=self.data_dir, transform=self.transform)
        # Setzen der Trainingsset/Validierungsset Größe
        train_size = int(0.8 * len(full_dataset))
        val_size = len(full_dataset) - train_size
        # Zufälliges aufteilen in Training- und Validierungdatensatz
        self.train_dataset, self.val_dataset = random_split(full_dataset, [train_size, val_size])

    def train_dataloader(self):
        # Setzen des Traindataloader
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=os.cpu_count())

    def val_dataloader(self):
        # Setzen des Validation Dataloader
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=os.cpu_count())

In [14]:
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import pytorch_lightning as pl
import torch
import torchmetrics
from torchmetrics import MeanMetric
import torchvision
import torchvision.transforms.functional as F
import torchvision.utils as vutils
from torchvision.transforms.functional import to_pil_image, to_tensor
import random
import time
import shutil
from pathlib import Path

class BaseWasteClassifier(pl.LightningModule):
    CLASS_NAMES = ['Cardboard', 'Food Organics', 'Glass', 'Metal', 'Miscellaneous Trash', 'Paper', 'Plastic', 'Textile Trash', 'Vegetation']

    def __init__(self, num_classes: int, results_dir="results"):
        super().__init__()
        self.num_classes = num_classes
        # Get the class name of the model instance
        model_class_name = self.__class__.__name__
        # Initialize paths
        self.results_dir = Path(results_dir) / model_class_name
        self.models_dir = self.results_dir / "models"
        self.images_dir = self.results_dir / "images"
        self.logs_dir = self.results_dir / "logs"

        # Create directories if they don't exist
        self.results_dir.mkdir(parents=True, exist_ok=True)
        self.models_dir.mkdir(parents=True, exist_ok=True)
        self.images_dir.mkdir(parents=True, exist_ok=True)
        self.logs_dir.mkdir(parents=True, exist_ok=True)

        # Placeholder for the actual model
        self.model = None

        # Initialize metrics
        self.accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=num_classes, average='macro')
        self.precision = torchmetrics.Precision(task='multiclass', num_classes=num_classes, average='weighted')
        self.recall = torchmetrics.Recall(task='multiclass', num_classes=num_classes, average='weighted')
        self.f1_score = torchmetrics.F1Score(task='multiclass', num_classes=num_classes, average='weighted')


    def forward(self, x):
        raise NotImplementedError("This method should be overridden by subclasses.")

    def training_step(self, batch, batch_idx):
        x, y = batch
        outputs = self(x)
        logits = outputs if isinstance(outputs, torch.Tensor) else outputs[0]  # Select logits
        loss = torch.nn.functional.cross_entropy(logits, y)
        acc = self.accuracy(torch.argmax(logits, dim=1), y)
        precision = self.precision(torch.argmax(logits, dim=1), y)
        recall = self.recall(torch.argmax(logits, dim=1), y)
        f1 = self.f1_score(torch.argmax(logits, dim=1), y)  # Calculate F1 Score

        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        # Log precision, recall, and F1 Score
        self.log('train_precision', precision, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log('train_recall', recall, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log('train_f1', f1, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def on_validation_start(self):
        self.clear_images_directory()

    def validation_step(self, batch, batch_idx):
        start_time = time.perf_counter()  # Start timing for inference speed

        x, y = batch
        logits = self(x)
        loss = torch.nn.functional.cross_entropy(logits, y)
        acc = self.accuracy(torch.argmax(logits, dim=1), y)
        precision = self.precision(torch.argmax(logits, dim=1), y)
        recall = self.recall(torch.argmax(logits, dim=1), y)
        f1 = self.f1_score(torch.argmax(logits, dim=1), y)  # Calculate F1 Score

        inference_time = time.perf_counter() - start_time  # Stop timing for inference speed
        self.log('val_inference_time', inference_time, prog_bar=True, logger=True)

        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        # Log precision, recall, and F1 Score
        self.log('val_precision', precision, prog_bar=True)
        self.log('val_recall', recall, prog_bar=True)
        self.log('val_f1', f1, prog_bar=True)

        predictions = torch.argmax(logits, dim=1)  # Convert logits to predicted class indices

        if random.random() < 0.1:  # Log images randomly
            self.log_images_with_labels(x, y, predictions, batch_idx)
        return loss

    def log_images_with_labels(self, images, labels, predictions, batch_idx):
        """Save a batch of images with their actual and predicted labels, organized by class name and model class."""
        annotated_images = []
        for i, (image, label, prediction) in enumerate(zip(images, labels, predictions)):
            # Unnormalize the image for visualization
            image = self.unnormalize(image)  # Make sure to call with self if it's an instance method
            # Determine class name for the actual label
            actual_class_name = self.CLASS_NAMES[label.item()]

            # Ensure the class-specific directory exists within the model class directory
            image_dir = self.images_dir / actual_class_name
            image_dir.mkdir(parents=True, exist_ok=True)

            # Convert to PIL Image for easy manipulation
            pil_img = F.to_pil_image(image)

            # Annotate image with actual and predicted labels
            draw = ImageDraw.Draw(pil_img)
            annotation_text = f'Actual: {actual_class_name}, Predicted: {self.CLASS_NAMES[prediction.item()]}'
            draw.text((10, 10), annotation_text, fill="white")

            # Define the file path for saving the image within the specific class directory
            file_path = image_dir / f"epoch_{self.current_epoch}_batch_{batch_idx}_image_{i}.png"

            # Save the annotated image
            pil_img.save(file_path)

            # Convert back to tensor and add to list
            annotated_img = to_tensor(pil_img)
            annotated_images.append(annotated_img.unsqueeze(0))  # Add batch dimension

        # Stack all annotated images into a single tensor for logging
        annotated_images_tensor = torch.cat(annotated_images, dim=0)
        img_grid = torchvision.utils.make_grid(annotated_images_tensor, nrow=4)

        # Log the grid of annotated images
        self.logger.experiment.add_image(f'Validation Images, Batch {batch_idx}', img_grid, self.current_epoch)

    def clear_images_directory(self):
        if self.images_dir.exists() and self.images_dir.is_dir():
            for class_dir in self.images_dir.iterdir():
                if class_dir.is_dir():  # Ensure it's a directory
                    shutil.rmtree(class_dir)  # Delete the directory and all its contents

    def unnormalize(self, image, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
        """Revert normalization of an image tensor."""
        image = image.clone()  # Clone the tensor to avoid in-place operations
        for t, m, s in zip(image, mean, std):
            t.mul_(s).add_(m)  # Multiply by std and add mean
        return image

    def configure_optimizers(self):
        # Subclasses can override this if needed
        optimizer = torch.optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        return optimizer


In [6]:
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks import EarlyStopping

def get_callbacks(model_checkpoint_path: str, early_stop_patience=2):
    callbacks = []

    # Configure the ModelCheckpoint callback
    checkpoint_callback = ModelCheckpoint(
        dirpath=model_checkpoint_path,
        filename='{epoch}-{val_loss:.2f}',
        save_top_k=2,  # Save only the best checkpoint
        verbose=True,
        monitor='val_loss',  # Monitor validation loss (change to val_acc or any other metric as needed)
        mode='min',  # 'min' for loss (use 'max' for accuracy)
    )
    callbacks.append(checkpoint_callback)

    # Stoppt des Training wenn sich val Metrik nicht verbessert
    early_stopping_callback = EarlyStopping(
        monitor="val_acc",  # Die Metrik beobachtet wird
        mode="max",  # Maximiert die Genauigkeit
        patience=early_stop_patience,  # "Wartet" 2 Epoche ohne Verbesserung
    )
    callbacks.append(early_stopping_callback)
    return callbacks



In [7]:
import os
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.loggers import CSVLogger


def train_model(model, data_module, log_dir="tb_logs", max_epochs=50, logger_name="model_logs", callbacks=[]):

    # Definieren der logger
    tb_logger = TensorBoardLogger(log_dir, name=logger_name)
    csv_logger = CSVLogger(model.results_dir, name="logs")
    trainer = Trainer(max_epochs=max_epochs, logger=[tb_logger, csv_logger], callbacks=callbacks)
    trainer.fit(model, datamodule=data_module)

## SimpleCNN

In [None]:
import torch.nn as nn

class SimpleCNN(BaseWasteClassifier):
    def __init__(self, num_classes=9, lr=1e-3):
        self.lr = 1e-3
        super().__init__(num_classes)
        self.model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 28 * 28, 512),  # Bestätigt, dass dies für eine Eingabegröße von 224x224 korrekt ist
            nn.ReLU(),
            nn.Dropout(0.5), # Overfitting vermeiden
            nn.Linear(512, num_classes) # lineare Schicht auf die Klassen
        )


    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return optimizer

### Train SimpleCNN

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=tb_logs/

In [None]:
from torchvision import transforms
# Definiere die Transformationspipeline
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Skaliere alle Bilder auf 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


data_module = ConfigurableDataModule(data_dir=data_path, batch_size=32, transform=transform)
model = SimpleCNN(num_classes=9, lr=1e-3)
callbacks = get_callbacks(
    model_checkpoint_path=model.models_dir,
    early_stop_patience=2
)
train_model(model, data_module, log_dir="tb_logs", max_epochs=50, logger_name="simple_CNN", callbacks=callbacks)

In [15]:
from torchvision.models import inception_v3

class InceptionWasteClassifier(BaseWasteClassifier):
    def __init__(self, num_classes=9):
        super().__init__(num_classes)
        self.model = inception_v3(aux_logits=True, weights="Inception_V3_Weights.DEFAULT")
        self.model.fc = torch.nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        if self.training:
            outputs = self.model(x)
            return outputs.logits, outputs.aux_logits
        else:
            return self.model(x)


In [None]:
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode

# Define the transformations as per Inception V3's requirements
transform = transforms.Compose([
    transforms.Resize(342, interpolation=InterpolationMode.BILINEAR),
    transforms.CenterCrop(299),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
batch_size = 32
data_module = ConfigurableDataModule(data_path, batch_size, transform)

model = InceptionWasteClassifier()
callbacks = get_callbacks(
    model_checkpoint_path=model.models_dir,
    early_stop_patience=2
)
train_model(model, data_module, log_dir="tb_logs", max_epochs=50, logger_name="InceptionWaste", callbacks=callbacks)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type                | Params
--------------------------------------------------
0 | accuracy  | MulticlassAccuracy  | 0     
1 | precision | MulticlassPrecision | 0     
2 | recall    | MulticlassRecall    | 0     
3 | f1_score  | MulticlassF1Score   | 0     
4 | model     | Inception3          | 25.1 M
--------------------------------------------------
25.1 M    Trainable params
0         Non-trainable params
25.1 M    Total params
100.523   Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]