## Reserved for library installs

In [1]:
#!pip3 install pillow

### Imports

In [2]:
import argparse
from collections import Counter
import datetime
import os
import sys
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torchvision
from torchvision.transforms.functional import normalize, to_tensor, resize, to_pil_image
import torchvision.transforms as transforms

import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning import Trainer
from pytorch_lightning import seed_everything
from pytorch_lightning.callbacks import BatchSizeFinder, EarlyStopping, LearningRateFinder
from pytorch_lightning import Trainer

from torchcam.utils import overlay_mask
from torchcam.methods import SmoothGradCAMpp

import config
import model

from dataset import createConfusionMatrix
from dataset import MixtecGenders
from model import MixtecModel

  from .autonotebook import tqdm as notebook_tqdm
  warn(


In [3]:
def _printdate(dt=datetime.datetime.now()):
    """print a date and time string containing only numbers and dashes"""

    # your code here
    if dt.hour < 10:
        hour = "0" + str(dt.hour)
    else:
        hour = str(dt.hour)

    if dt.minute < 10:
        minute = "0" + str(dt.minute)
    else:
        minute = str(dt.minute)

    d = "{}-{}-{}-{}-{}".format(str(dt.month), str(dt.day), str(dt.year), hour, minute)
    return d

In [4]:
class LoggingCallback(pl.Callback):
    def on_validation_end(self, trainer, pl_module):
        metrics = trainer.callback_metrics
        pl_module.logger.log_metrics(metrics, step=trainer.global_step)
        for k, v in metrics.items():
            pl_module.logger.log_metrics({k: v}, step=trainer.global_step)
            pl_module.logger.experiment.add_scalar(k, v, trainer.global_step)

In [5]:
# Get the data set
# Using only one worker is faster
dataset = MixtecGenders(num_workers=1, batch_size=config.BATCH_SIZE)

#print(dict(Counter(dataset.targets)))

logger = TensorBoardLogger(save_dir="../runs", name="cam_test", default_hp_metric=False)

# Configure the model
#model = NN(config.BATCH_SIZE, config.LEARNING_RATE)
model = MixtecModel(learning_rate=config.LEARNING_RATE, num_epoch=config.EPOCHS, model_name="resnet18")
# model.set_reference_dataloader(dataset.reference_dataloader)

print(model)

MixtecModel(
  (loss_fn): CrossEntropyLoss()
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, mo

In [6]:
# Train the model
early_stopping = EarlyStopping(
    monitor="val_f1",
    # min_delta=1e-6,
    stopping_threshold=1e-4,
    # divergence_threshold=9.0,
    check_finite=True,
)

In [7]:
trainer = Trainer(devices="auto", accelerator="auto", #auto_lr_find=True,
                    logger=logger, log_every_n_steps=1, enable_progress_bar=True,
                    min_epochs=1, max_epochs=config.EPOCHS,
                    callbacks=[
                    #   BatchSizeFinder(init_val=64),
                    # LearningRateFinder(),
                    #early_stopping,
                    # LoggingCallback(),
                        ])

# Tune the model
# trainer.tune(model, datamodule=dataset)


# Run the evaluation

fitresults = trainer.fit(model, datamodule=dataset)

# Create and log confusion matrix
logger.experiment.add_figure("Confusion matrix", createConfusionMatrix(dataset.train_dataloader(), model), config.EPOCHS)
logger.experiment.add_figure("Confusion matrix", createConfusionMatrix(dataset.val_dataloader(), model), config.EPOCHS)

valresults = trainer.validate(model, datamodule=dataset)
print('-'*80)
print(f"{valresults=}")
print('-'*80)
#print(trainer.predict(model, datamodule=dataset))
#trainer.test(model, dm)



GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Training Set: Counter({1: 535, 0: 237})
Validation Set: Counter({1: 267, 0: 118})


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [5]

  | Name          | Type             | Params
---------------------------------------------------
0 | loss_fn       | CrossEntropyLoss | 0     
1 | model         | ResNet           | 12.4 M
2 | train_metrics | MetricCollection | 0     
3 | val_metrics   | MetricCollection | 0     
4 | test_metrics  | MetricCollection | 0     
---------------------------------------------------
12.4 M    Trainable params
0         Non-trainable params
12.4 M    Total params
49.429    Total estimated model params size (MB)
SLURM auto-requeueing enabled. Setting signal handlers.


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(


Sanity Checking DataLoader 0:   0%|          | 0/2 [00:00<?, ?it/s]

RuntimeError: non-empty 3D or 4D (batch mode) tensor expected for input

In [None]:
print(model.eval())

# Run the test
ref_img = Image.open("../reference_images/male/067-a-09.png")
to_tensor        = transforms.ToTensor()
to_square        = transforms.Resize((224, 224), antialias=True)
to_three_channel = transforms.Lambda(lambda x: x[:3])

ref_img              = to_three_channel(to_square(to_tensor(ref_img)))

# Preprocess it for your chosen model
input_tensor = normalize(resize(ref_img, (224, 224)) / 255., [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

cam_extractor = SmoothGradCAMpp(model, 'model.fc')

# Preprocess your data and feed it to the model
out = model(input_tensor.unsqueeze(0))

# Retrieve the CAM by passing the class index and the model output
activation_map = cam_extractor(out.squeeze(0).argmax().item())

plt.imshow(activation_map[0].squeeze(0).numpy()); plt.axis('off'); plt.tight_layout(); plt.show()
    
# Resize the CAM and overlay it
result = overlay_mask(to_pil_image(ref_img), to_pil_image(activation_map[0].squeeze(0), mode='F'), alpha=0.5)
# Display it
plt.imshow(result); plt.axis('off'); plt.tight_layout(); plt.show()