In [54]:
import os
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from torchvision.datasets import MNIST
import pytorch_lightning as pl
import torchmetrics
import pandas as pd
from tqdm import tqdm

In [28]:
SEED = 2024
pl.seed_everything(SEED)

Seed set to 2024


2024

In [29]:
LEARING_RATE = 1e-3
BATCH_SIZE = 64
EPOCHS = 5
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [30]:
def load_data(batch_size=BATCH_SIZE, num_workers=4):
    # Transformaciones para los datos
    transform = transforms.ToTensor()

    # Carga de datos de entrenamiento
    mnist_train = MNIST(os.getcwd(), train=True, download=True, transform=transform)
    
    # División entre entrenamiento y validación
    train_size = int(0.8 * len(mnist_train))
    val_size = len(mnist_train) - train_size
    mnist_train, mnist_val = random_split(mnist_train, [train_size, val_size])

    # DataLoader para entrenamiento y validación
    train_loader = DataLoader(mnist_train, batch_size=batch_size, num_workers=num_workers, shuffle=True, persistent_workers=True)
    val_loader = DataLoader(mnist_val, batch_size=batch_size, num_workers=num_workers, shuffle=False, persistent_workers=True)

    # Carga de datos de test
    mnist_test = MNIST(os.getcwd(), train=False, download=True, transform=transform)
    test_loader = DataLoader(mnist_test, batch_size=batch_size, num_workers=num_workers, persistent_workers=True)

    return train_loader, val_loader, test_loader

train_loader, val_loader, test_loader = load_data()

In [32]:

class MNISTModel(pl.LightningModule):
    def __init__(self, num_classes=10, lr=1e-3):
        super(MNISTModel, self).__init__()
        self.layer_1 = nn.Linear(28 * 28, 128)
        self.layer_2 = nn.Linear(128, 256)
        self.layer_3 = nn.Linear(256, 10)
        self.lr = lr
        self.num_classes = num_classes
        
        # metrics
        self.train_acc = torchmetrics.Accuracy(num_classes=self.num_classes, task='multiclass')
        self.val_acc = torchmetrics.Accuracy(num_classes=self.num_classes, task='multiclass')
        self.test_acc = torchmetrics.Accuracy(num_classes=self.num_classes, task='multiclass')
        self.precision = torchmetrics.Precision(num_classes=self.num_classes, average='macro', task='multiclass')
        self.recall = torchmetrics.Recall(num_classes=self.num_classes, average='macro', task='multiclass')
        self.f1 = torchmetrics.F1Score(num_classes=self.num_classes, average='macro', task='multiclass')

    def forward(self, x, record_activations=False):
        x = x.view(x.size(0), -1)
        x = self.layer_1(x)
        x_layer1 = F.relu(x)

        x = self.layer_2(x_layer1)
        x_layer2 = F.relu(x)

        x = self.layer_3(x_layer2)
        x_layer3 = F.log_softmax(x, dim=1)

        if record_activations:
            return x_layer3, [x_layer1, x_layer2, x_layer3]

        return x_layer3
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        self.log('train_loss', loss, prog_bar=True)
        self.log('train_acc', self.train_acc(logits, y), prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', self.val_acc(logits, y), prog_bar=True)

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        self.log('test_loss', loss, prog_bar=True)
        self.log('test_acc', self.test_acc(logits, y), prog_bar=True)
        self.log('precision', self.precision(logits, y), prog_bar=True)
        self.log('recall', self.recall(logits, y), prog_bar=True)
        self.log('f1', self.f1(logits, y), prog_bar=True)
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)
    
model = MNISTModel(lr=LEARING_RATE)

## Train

In [33]:
trainer = pl.Trainer(max_epochs=EPOCHS)
trainer.fit(model, train_loader, val_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type                | Params
--------------------------------------------------
0 | layer_1   | Linear              | 100 K 
1 | layer_2   | Linear              | 33.0 K
2 | layer_3   | Linear              | 2.6 K 
3 | train_acc | MulticlassAccuracy  | 0     
4 | val_acc   | MulticlassAccuracy  | 0     
5 | test_acc  | MulticlassAccuracy  | 0     
6 | precision | MulticlassPrecision | 0     
7 | recall    | MulticlassRecall    | 0     
8 | f1        | MulticlassF1Score   | 0     
--------------------------------------------------
136 K     Trainable params
0         Non-trainable params
136 K     Total params
0.544     Total estimated model params size (MB)


Epoch 4: 100%|██████████| 750/750 [00:03<00:00, 232.33it/s, v_num=8, train_loss=0.040, train_acc=0.984, val_loss=0.0966, val_acc=0.972] 

`Trainer.fit` stopped: `max_epochs=5` reached.


Epoch 4: 100%|██████████| 750/750 [00:03<00:00, 231.76it/s, v_num=8, train_loss=0.040, train_acc=0.984, val_loss=0.0966, val_acc=0.972]


In [34]:
trainer.test(model, test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing DataLoader 0: 100%|██████████| 157/157 [00:00<00:00, 211.19it/s]
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
           f1               0.9711270332336426
        precision           0.9746068120002747
         recall             0.9731449484825134
        test_acc            0.9749000072479248
        test_loss            0.086024709045887
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.086024709045887,
  'test_acc': 0.9749000072479248,
  'precision': 0.9746068120002747,
  'recall': 0.9731449484825134,
  'f1': 0.9711270332336426}]

## Activations

In [35]:
def load_data_for_activations():
    # load only one image of the test set
    transform = transforms.ToTensor()
    mnist_test = MNIST(os.getcwd(), train=False, download=True, transform=transform)
    test_loader = DataLoader(mnist_test, batch_size=1, shuffle=False)
    return test_loader

test_loader_act = load_data_for_activations()

In [36]:
model_for_act = MNISTModel.load_from_checkpoint('lightning_logs/version_8/checkpoints/epoch=4-step=3750.ckpt')
model_for_act.eval()

MNISTModel(
  (layer_1): Linear(in_features=784, out_features=128, bias=True)
  (layer_2): Linear(in_features=128, out_features=256, bias=True)
  (layer_3): Linear(in_features=256, out_features=10, bias=True)
  (train_acc): MulticlassAccuracy()
  (val_acc): MulticlassAccuracy()
  (test_acc): MulticlassAccuracy()
  (precision): MulticlassPrecision()
  (recall): MulticlassRecall()
  (f1): MulticlassF1Score()
)

In [55]:
df = pd.DataFrame(columns=['Number', 'Layer 1', 'Layer 2', 'Layer 3'])
for batch in tqdm(test_loader_act, desc='Running activations'):
    x, y = batch
    x = x.to(DEVICE)
    _, r = model_for_act(x, record_activations=True)
    row = {'Number': y.item(), 'Layer 1': r[0].cpu().detach().numpy(), 'Layer 2': r[1].cpu().detach().numpy(), 'Layer 3': r[2].cpu().detach().numpy()}
    df.loc[len(df)] = row

Running activations: 100%|██████████| 10000/10000 [00:11<00:00, 902.99it/s]


In [56]:
df.head()

Unnamed: 0,Number,Layer 1,Layer 2,Layer 3
0,7,"[[0.4021675, 0.0, 0.0, 0.17226669, 0.0, 0.0, 0...","[[0.86871254, 0.24626496, 2.14166, 1.0335073, ...","[[-16.708138, -15.080975, -10.517571, -12.7543..."
1,2,"[[1.2451289, 0.0, 3.4739122, 2.4116392, 1.1928...","[[0.0, 0.294271, 0.0, 4.9220953, 0.0, 0.0, 2.1...","[[-19.028486, -11.932887, -5.2569914e-05, -10...."
2,1,"[[0.784067, 0.0, 0.41940135, 1.1673068, 0.0, 0...","[[0.0, 0.0, 0.0, 1.7142372, 0.0, 1.0349343, 0....","[[-12.076187, -0.0011462554, -8.229529, -12.27..."
3,0,"[[0.0, 0.0, 0.23350655, 0.0, 1.3085879, 0.8529...","[[0.69394875, 2.487968, 0.39706898, 0.0, 0.0, ...","[[-1.180165e-05, -19.415659, -11.505971, -20.6..."
4,4,"[[0.0, 0.87694776, 0.9655731, 0.0, 0.0, 0.0, 0...","[[0.0, 0.0, 0.33645225, 0.0, 0.0, 0.51041484, ...","[[-13.942757, -17.600641, -15.314047, -20.6317..."


In [None]:
df.to_pickle('activations.pkl')