# Visión por computadora II
## Trabajo Práctico Integrador

## Inferencia Modelo EfficientNet B0 BEST

### Configuración de librerías

In [1]:
import os
import gc
import random
from time import time
from glob import glob
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
from collections import Counter
import dill as pickle

from plotly import graph_objects as go
import plotly.express as px
import plotly.figure_factory as ff
from plotly.subplots import make_subplots


import cv2

from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import fbeta_score, confusion_matrix

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms as T, models
from torchvision.models.resnet import ResNet18_Weights
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR

from torchsummary import summary

from matplotlib import pyplot as plt

In [2]:
# Creamos el directorio de salida si no existe
path_output = "../output"
if not os.path.exists(path_output):
    os.makedirs(path_output)

In [3]:
# Crear el directorio de logs si no existe para guardar los logs de tensorboard
logs_dir = '../logs'
if not os.path.exists(logs_dir):
    os.makedirs(logs_dir)

In [4]:
# Si tenemos disponible GPU, lo usamos
# Chequeamos si tenemos disponible GPU (CUDA)
if torch.cuda.is_available():
    device = "cuda"
# Chequeamos si tenemos disponible aceleración por hardware en un chip de Apple (MPS)
elif torch.backends.mps.is_available():
    device = "mps"
# Por defecto usamos CPU
else:
    device = "cpu"

print(f"device: {device}")

device: cuda


In [5]:
# Semilla para reproducibilidad de los experimentos
random.seed(42)
np.random.seed(42)
torch.manual_seed(42);

### Cargamos el dataset test preprocesado.

In [6]:
path = "../data"

# Esto es asi ya que no olvidemos que usamos el dataset de train y lo dividimos 
# en train, valid y test o sea que todas las imagenes estan en el path_train
path_test = os.path.join(path, "train") # 

print(
    f"test files: {len(os.listdir(path_test))}, "
)

test files: 11466, 


In [7]:
# Cargamos el dataset de test
path_test_class = os.path.join(path, "test_dataset_preprocesado.csv")
df_test = pd.read_csv(path_test_class)
print(df_test.shape)
df_test.head()

(2291, 37)


Unnamed: 0,filename,Aerosols,Aluminum can,Cardboard,Cellulose,Ceramic,Container for household chemicals,Disposable tableware,Electronics,Furniture,...,Plastic toys,Postal packaging,Printing industry,Scrap metal,Stretch film,Tetra pack,Textile,Tin,Unknown plastic,Zip plastic bag
0,2ccff6c6-AluCan257_jpg.rf.a8f53f21395d0d5757d7...,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,59b34896-R_2091_jpg.rf.a967d601319609446bd2cd5...,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,d254face-R_4159_jpg.rf.a95781333b43a5c8c62d42b...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,eb559464-O_13694_jpg.rf.a8ece945730b0647388b64...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0d43474a-R_1215_jpg.rf.a98e6508a5ea28fc11d6868...,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


#### Funciones de creación de dataloaders

In [8]:
def obtener_transforms():
    transform_train = T.Compose([
        T.ToPILImage(),
        T.Resize(224),
        T.ToTensor(),
        T.Normalize(
            mean=[0.485, 0.456, 0.406], # Media extraída de ImageNet
            std=[0.229, 0.224, 0.225], # Desviación estándar extraída de ImageNet
        )
    ])
    transform_val = T.Compose([
        T.ToPILImage(),
        T.Resize(224),
        T.ToTensor(),
        T.Normalize(
            mean=[0.485, 0.456, 0.406], # Media extraída de ImageNet
            std=[0.229, 0.224, 0.225], # Desviación estándar extraída de ImageNet
        )
    ])
    return transform_train, transform_val

In [9]:
class YoloWasteDatasetError(Exception):
    pass

class YoloWasteDataset(Dataset):
    def __init__(self, df, ohe_tags, transform, path, is_train=True, idx_tta=None):
        super().__init__()
        self.df = df
        self.ohe_tags = ohe_tags
        self.transform = transform
        if isinstance(path, str):
            self.paths = [path]
        elif isinstance(path, (list, tuple)):
            self.paths = path
        else:
            raise YoloWasteDatasetError(f"Path type must be str, list or tuple, got: {type(path)}")
        self.is_train = is_train
        if not is_train:
            if not idx_tta in list(range(6)):
                raise YoloWasteDatasetError(
                    f"In test mode, 'idx_tta' must be an int belonging to [0, 5], got: {repr(idx_tta)}"
                )
            self.idx_tta = idx_tta

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        filename = self.df.iloc[idx, 0] # Asumiendo que la primer columna es filename
        for path in self.paths:
            if filename in os.listdir(path):
                file_path = os.path.join(path, filename)
                break
        else:
            raise YoloWasteDatasetError(f"Can't fetch {filename} among {self.paths}")
        img = cv2.imread(file_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        label = self.ohe_tags[idx]
        return img, label

    def collate_fn(self, batch):
        imgs, labels = [], []
        for (img, label) in batch:
            img = self.custom_augment(img)
            img = torch.tensor(img)
            img = img.permute(2, 0, 1)
            img = self.transform(img)
            imgs.append(img[None])
            labels.append(label)
        imgs = torch.cat(imgs).float().to(device)
        labels = torch.tensor(labels).float().to(device)
        return imgs, labels

    def load_img(self, idx, ax=None):
        img, ohe_label = self[idx]
        label = self.df.iloc[idx]
        title = f"{label} - {ohe_label}"
        if ax is None:
            plt.imshow(img)
            plt.title(title)
        else:
            ax.imshow(img)
            ax.set_title(title)
    
    def custom_augment(self, img):
        """
        Discrete rotation and horizontal flip.
        Random during training and non random during testing for TTA.
        Not implemented in torchvision.transforms, hence this function.
        """
        choice = np.random.randint(0, 6) if self.is_train else self.idx_tta
        if choice == 0:
            # Rotate 90
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_90_CLOCKWISE)
        if choice == 1:
            # Rotate 90 and flip horizontally
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_90_CLOCKWISE)
            img = cv2.flip(img, flipCode=1)
        if choice == 2:
            # Rotate 180
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_180)
        if choice == 3:
            # Rotate 180 and flip horizontally
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_180)
            img = cv2.flip(img, flipCode=1)
        if choice == 4:
            # Rotate 90 counter-clockwise
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_90_COUNTERCLOCKWISE)
        if choice == 5:
            # Rotate 90 counter-clockwise and flip horizontally
            img = cv2.rotate(img, rotateCode=cv2.ROTATE_90_COUNTERCLOCKWISE)
            img = cv2.flip(img, flipCode=1)
        return img

#### Funciones auxiliares para la inferencia

In [10]:
@torch.no_grad()
def batch_predict(model, X):
    model.eval()
    Y = model(X)
    return Y.detach().float().cpu().numpy()

In [11]:
def get_test_data(df_test, path_test, batch_size, idx_tta):
    transform_train, transform_val = obtener_transforms()
    ohe_tags_test = df_test.iloc[:, 1:].values
    #ohe_tags_test = df_test.iloc[:, 1:].values.astype(np.float32)
    ds_test = YoloWasteDataset(df_test, ohe_tags_test, transform_val, path=path_test, is_train=False, idx_tta=idx_tta)
    dl_test = DataLoader(ds_test, batch_size=batch_size, shuffle=False, collate_fn=ds_test.collate_fn)
    return dl_test, df_test

##### Calculamos el puntaje final del modelo ResNet18 con los datos de prueba

Inferencia del Modelo resNet18 usando datos de prueba usando TTA

In [12]:
def calcular_fbeta_score(nombre_modelo, modelo, df_test, path_test, batch_size, path_output, threshs=0.2, tta_steps=6, beta=2):
    modelo.eval()  # Cambiamos el modelo a modo de evaluación

    # Definimos el umbral
    threshs = threshs

    # Inicializamos las predicciones
    Y_hat_test = []
    for idx_tta in range(tta_steps):  # Realizamos TTA
        Y_hat_test_tta = []
        dl_test, df_test = get_test_data(df_test, path_test, batch_size, idx_tta)  # Cargamos los datos de prueba
        for X, _ in tqdm(dl_test, leave=False):
            Y_hat_test_batch = batch_predict(modelo, X)
            Y_hat_test_tta.extend(Y_hat_test_batch)
        Y_hat_test.append(Y_hat_test_tta)
    
    Y_hat_test = np.mean(np.array(Y_hat_test), axis=0)
    Y_hat_test = (Y_hat_test > threshs).astype(float)

    # Guardar los resultados de la inferencia
    df_test['predicted_labels'] = list(Y_hat_test)
    output_path = os.path.join(path_output, f'{nombre_modelo}_predicciones_test.csv')
    df_test.to_csv(output_path, index=False)
    print(f"Predicciones guardadas en {output_path}")

    # Calcular y mostrar el fbeta_score
    Y_test = df_test.iloc[:, 1:-1].values  # Etiquetas reales, sin la columna de nombres de archivo y etiquetas predichas
    final_score = fbeta_score(Y_test, Y_hat_test, beta=beta, average="samples")
    print(f"Puntaje final de Fbeta en el conjunto de pruebas para {nombre_modelo} con TTA: {final_score}")


In [13]:
# Cargamos el modelo EfficientNet-B0 best
modelo_efficientnet = torch.load(os.path.join(path_output, "efficientnet_b0_BEST_fold2.pth"))

In [14]:

batch_size = 64
calcular_fbeta_score( "modelo_efficientnet", modelo_efficientnet, df_test, path_test, batch_size, path_output)

  0%|          | 0/36 [00:00<?, ?it/s]

  labels = torch.tensor(labels).float().to(device)


  0%|          | 0/36 [00:00<?, ?it/s]

  0%|          | 0/36 [00:00<?, ?it/s]

  0%|          | 0/36 [00:00<?, ?it/s]

  0%|          | 0/36 [00:00<?, ?it/s]

  0%|          | 0/36 [00:00<?, ?it/s]

Predicciones guardadas en ../output/modelo_efficientnet_predicciones_test.csv
Puntaje final de Fbeta en el conjunto de pruebas para modelo_efficientnet con TTA: 0.9857709098215428
