# Constructing baseline

In [1]:
from dataclasses import dataclass
from functools import partial
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torchvision import transforms, datasets, models
import pytorch_lightning as pl
from torchmetrics import Accuracy
from random import shuffle
from tqdm import tqdm

In [2]:
# Define the LightningDataModule
@dataclass
class SUN397DataModule(pl.LightningDataModule):
    data_dir = "./data"
    transform = transforms.Compose([
        transforms.CenterCrop((32, 32)),
        transforms.ToTensor(),
    ])
    split = {"val": 10, "test": 40}
    batch_size = 128

    @property
    def num_classes(self)->int:
        return len(self.dataset.classes)

    def prepare_data(self):
        # Download or preprocess data if required
        _ = datasets.SUN397(root=self.data_dir, download=True)

    def setup(self, stage=None):
        # Load and split the dataset into train and validation sets
        self.dataset = datasets.SUN397(root=self.data_dir, transform=self.transform)
        self.dataloader = partial(DataLoader, 
            dataset=self.dataset, batch_size=self.batch_size, 
            num_workers=torch.get_num_threads(), pin_memory=True,
        )
        self.cls_idxs = [[] for _ in self.dataset.classes]
        for i, label in tqdm(enumerate(self.dataset._labels)):
            self.cls_idxs[label].append(i)

        for m in ("val", "test"):
            setattr(self, f"{m}_idxs", [])
        
        for idx_list in tqdm(self.cls_idxs):
            shuffle(idx_list)
            self.val_idxs.extend(idx_list[:self.split["val"]])
            self.test_idxs.extend(idx_list[self.split["val"]:sum(self.split.values())])
        
        self.train_idxs = list(set(range(len(self.dataset))) - set(self.val_idxs) - set(self.test_idxs))

    def train_dataloader(self):
        return self.dataloader(sampler=SubsetRandomSampler(self.train_idxs))

    def val_dataloader(self):
        return self.dataloader(sampler=SubsetRandomSampler(self.val_idxs))

    def test_dataloader(self):
        return self.dataloader(sampler=SubsetRandomSampler(self.test_idxs))
    
dm = SUN397DataModule()

In [3]:
class PreActResNetBlock(nn.Module):
    def __init__(self, c_in, act_fn, subsample=False, c_out=-1):
        """
        Inputs:
            c_in - Number of input features
            act_fn - Activation class constructor (e.g. nn.ReLU)
            subsample - If True, we want to apply a stride inside the block and reduce the output shape by 2 in height and width
            c_out - Number of output features. Note that this is only relevant if subsample is True, as otherwise, c_out = c_in
        """
        super().__init__()
        if not subsample:
            c_out = c_in

        # Network representing F
        self.net = nn.Sequential(
            nn.BatchNorm2d(c_in),
            act_fn(),
            nn.Conv2d(c_in, c_out, kernel_size=3, padding=1, stride=1 if not subsample else 2, bias=False),
            nn.BatchNorm2d(c_out),
            act_fn(),
            nn.Conv2d(c_out, c_out, kernel_size=3, padding=1, bias=False),
        )

        # 1x1 convolution needs to apply non-linearity as well as not done on skip connection
        self.downsample = (
            nn.Sequential(nn.BatchNorm2d(c_in), act_fn(), nn.Conv2d(c_in, c_out, kernel_size=1, stride=2, bias=False))
            if subsample
            else None
        )

    def forward(self, x):
        z = self.net(x)
        if self.downsample is not None:
            x = self.downsample(x)
        out = z + x
        return out

108754it [00:00, 3452026.56it/s]
100%|██████████| 397/397 [00:00<00:00, 13426.37it/s]


In [6]:
class ResNet(nn.Module):
    def __init__(
        self,
        num_classes=10,
        num_blocks=[2, 2, 2, 2],
        c_hidden=[64, 128, 256, 512],
        act_fn_name="relu",
        block_name="ResNetBlock",
        **kwargs,
    ):
        """
        Inputs:
            num_classes - Number of classification outputs (10 for CIFAR10)
            num_blocks - List with the number of ResNet blocks to use. The first block of each group uses downsampling, except the first.
            c_hidden - List with the hidden dimensionalities in the different blocks. Usually multiplied by 2 the deeper we go.
            act_fn_name - Name of the activation function to use, looked up in "act_fn_by_name"
            block_name - Name of the ResNet block, looked up in "resnet_blocks_by_name"
        """
        super().__init__()
        assert block_name in resnet_blocks_by_name
        self.hparams = SimpleNamespace(
            num_classes=num_classes,
            c_hidden=c_hidden,
            num_blocks=num_blocks,
            act_fn_name=act_fn_name,
            act_fn=act_fn_by_name[act_fn_name],
            block_class=resnet_blocks_by_name[block_name],
        )
        self._create_network()
        self._init_params()

    def _create_network(self):
        c_hidden = self.hparams.c_hidden

        # A first convolution on the original image to scale up the channel size
        if self.hparams.block_class == PreActResNetBlock:  # => Don't apply non-linearity on output
            self.input_net = nn.Sequential(nn.Conv2d(3, c_hidden[0], kernel_size=3, padding=1, bias=False))
        else:
            self.input_net = nn.Sequential(
                nn.Conv2d(3, c_hidden[0], kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(c_hidden[0]),
                self.hparams.act_fn(),
            )

        # Creating the ResNet blocks
        blocks = []
        for block_idx, block_count in enumerate(self.hparams.num_blocks):
            for bc in range(block_count):
                # Subsample the first block of each group, except the very first one.
                subsample = bc == 0 and block_idx > 0
                blocks.append(
                    self.hparams.block_class(
                        c_in=c_hidden[block_idx if not subsample else (block_idx - 1)],
                        act_fn=self.hparams.act_fn,
                        subsample=subsample,
                        c_out=c_hidden[block_idx],
                    )
                )
        self.blocks = nn.Sequential(*blocks)

        # Mapping to classification output
        self.output_net = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)), nn.Flatten(), nn.Linear(c_hidden[-1], self.hparams.num_classes)
        )

    def _init_params(self):
        # Based on our discussion in Tutorial 4, we should initialize the convolutions according to the activation function
        # Fan-out focuses on the gradient distribution, and is commonly used in ResNets
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity=self.hparams.act_fn_name)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.input_net(x)
        x = self.blocks(x)
        x = self.output_net(x)
        return x

TypeError: 'int' object is not callable

In [3]:
# Define the LightningModule
class ERMModule(pl.LightningModule):
    def __init__(self, backbone='preactresnet18'):
        super().__init__()
        self.backbone = 
        self.loss_fn = nn.CrossEntropyLoss()
        self.accuracy = Accuracy()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        acc = self.accuracy(logits, y)
        self.log('val_loss', loss)
        self.log('val_acc', acc, prog_bar=True)

    def configure_optimizers(self):
        optimizer = optim.SGD(self.parameters(), lr=0.1, weight_decay=2e-4)
        lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[30, 60], gamma=0.1)
        return [optimizer], [lr_scheduler]

SyntaxError: invalid syntax (3366959818.py, line 5)

In [None]:

# Create the LightningDataModule
data_module = ImbalancedSUNDataModule(data_dir='path/to/data')

# Initialize the Lightning trainer
trainer = pl.Trainer(max_epochs=90)

# Train the model
model = ImbalancedSUNModel()
trainer.fit(model, datamodule=data_module)


In [None]:
%pip install torch torchvision
%pip install jupyter ipywidgets
%pip install "pytorch-lightning <1.9" lightning-flash[image]

In [None]:
from pytorch_lightning import seed_everything

import flash
from flash.core.classification import LabelsOutput
from flash.core.data.utils import download_data
from flash.image import ImageClassificationData, ImageClassifier

from flash.core.data.transforms import ApplyToKeys
from flash.core.data.io.input_transform import InputTransform
from dataclasses import dataclass
from torch.utils.data import Subset
from torchvision import datasets, transforms
from random import shuffle
import torch
from tqdm.notebook import tqdm
from torchmetrics import Recall, F1Score

In [3]:
# set the random seeds.
seed_everything(42)

# 1. Download and organize the data
# download_data("https://pl-flash-data.s3.amazonaws.com/hymenoptera_data.zip", "data/")

ds = datasets.SUN397(
    root='./data', 
)
print(ds)

s = [[] for _ in ds.classes]
for i, label in tqdm(enumerate(ds._labels)):
    s[label].append(i)

idxs = {"train": [], "val": [], "test": []}
for idx_list in tqdm(s):
    shuffle(idx_list)
    idxs["val"].extend(idx_list[:10])
    idxs["test"].extend(idx_list[10:10 + 40])
    idxs["train"].extend(idx_list[50:])
# print(idxs)

Global seed set to 42


Dataset SUN397
    Number of datapoints: 108754
    Root location: ./data


0it [00:00, ?it/s]

  0%|          | 0/397 [00:00<?, ?it/s]

In [4]:
from typing import Tuple


@dataclass
class ImageClassificationInputTransform(InputTransform):
    image_size: Tuple[int, int] = (32, 32)
    # mean: Union[float, Tuple[float, float, float]] = (0.485, 0.456, 0.406)
    # std: Union[float, Tuple[float, float, float]] = (0.229, 0.224, 0.225)

    def per_sample_transform(self):
        return transforms.Compose(
            [
                ApplyToKeys(
                    "input",
                    transforms.Compose(
                        [
                            transforms.ToTensor(), 
                            # transforms.Normalize(self.mean, self.std)
                        ]
                    ),
                ),
                ApplyToKeys("target", torch.as_tensor),
            ]
        )

    def train_per_sample_transform(self):
        return transforms.Compose(
            [
                ApplyToKeys(
                    "input",
                    transforms.Compose(
                        [
                            transforms.RandomResizedCrop(self.image_size),
                            transforms.RandomHorizontalFlip(),
                            transforms.ToTensor(),
                            # transforms.Normalize(self.mean, self.std),
                        ]
                    ),
                ),
                ApplyToKeys("target", torch.as_tensor),
            ]
        )

In [5]:
datamodule = ImageClassificationData.from_datasets(
    train_dataset=Subset(ds, idxs["train"]),
    val_dataset=Subset(ds, idxs["val"]),
    test_dataset=Subset(ds, idxs["test"]),
    batch_size=128,
    transform=ImageClassificationInputTransform(),
)
# print(datamodule.num_classes)

# 2. Build the model using desired Task
model = ImageClassifier(
    backbone="resnet18", 
    num_classes=len(s), 
    pretrained=False,
    # metrics=F1Score(task='multiclass', num_classes=len(s)),
)
print(model)
# 3. Create the trainer (run one epoch for demo)
trainer = flash.Trainer(max_epochs=1, accelerator="gpu", devices=torch.cuda.device_count())
print(trainer)
# 4. Train the model
trainer.fit(model, datamodule=datamodule)

# 5. Save the model!
trainer.save_checkpoint("test/image_classification_model.pt")

  rank_zero_deprecation(


ModuleNotFoundError: Required dependencies not available. Please run: pip install 'lightning-flash[image]'

In [1]:
%pip freeze | grep lightning

lightning-bolts==0.6.0.post1
lightning-flash==0.8.1.post0
lightning-utilities==0.8.0
pytorch-lightning==1.8.6
Note: you may need to restart the kernel to use updated packages.


# Testing

In [2]:
import torch 

In [3]:
torch.cuda.device_count()

2

In [None]:
%conda install imbalanced-learn

In [None]:
from imblearn.metrics import geometric_mean_score, sensitivity_score

In [None]:
import numpy as np


y = np.random.randint(10, size=500)
pred = np.random.randint(10, size=500)
# print(y)
# print(pred)
tmp1 = sensitivity_score(y, pred, average=None)
print(tmp1)
tmp2 = geometric_mean_score(y, pred)
print(tmp2)

In [None]:
acc = np.load("./logs/Imbalance_S320_M2m_L0.5_W160_E0.1_I10_svhn_R60_resnet32_G0.9_B0.999/classwise_acc.npy")
r =1 
for i in acc**(1/len(acc)):
    r *= i

print(r)

In [None]:
acc.mean()

In [59]:
from torchvision import datasets, transforms
from utils import get_mean_and_std
from torch.utils.data import DataLoader

data = datasets.SUN397(
    root='./data', 
    # split='train',
    transform=transforms.Compose(
        [
            transforms.ToTensor(),

        ]
    )
)

# dataloader = DataLoader(
#     data, 
#     batch_size=1, 
#     # shuffle=True, 
#     num_workers=2
# )

# m, s = get_mean_and_std(data)
# print(m)
# print(s)
# data = data.data / 255 # data is numpy array

# print(np.round(data.mean(axis = (0,2,3)), 4))
# print(np.round(data.std(axis = (0,2,3)), 4))

In [2]:
from torchvision import datasets

In [None]:
from fuzzywuzzy import process
print(process.extractOne('sun37'.lower(), datasets.__all__, lambda x: x.lower())[0])

SUN397


In [2]:
import multiprocessing

num_cores = multiprocessing.cpu_count()
print("Number of CPU cores:", num_cores)

Number of CPU cores: 24
