<a href="https://colab.research.google.com/github/samanthawhite7326/AIFinalPres/blob/main/Final_Proj_SimCLR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
## Standard libraries
import os
from copy import deepcopy

## tqdm for loading bars
from tqdm.notebook import tqdm

## PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim

## Torchvision
import torchvision
from torchvision.datasets import STL10
from torchvision import transforms
!pip install --quiet pytorch-lightning>=1.4
import pytorch_lightning as pl
# Only import LearningRateMonitor (skip ModelCheckpoint)
from pytorch_lightning.callbacks import LearningRateMonitor


# Set your dataset path
DATASET_PATH = "/content/data"
NUM_WORKERS = os.cpu_count()

# Set random seed
pl.seed_everything(42)

# Ensure deterministic behavior for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Choose device
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", device)
print("Number of workers:", NUM_WORKERS)


INFO:lightning_fabric.utilities.seed:Seed set to 42


Device: cuda:0
Number of workers: 2


In [10]:
class ContrastiveTransformations(object):

    def __init__(self, base_transforms, n_views=2):
        self.base_transforms = base_transforms
        self.n_views = n_views

    def __call__(self, x):
        return [self.base_transforms(x) for i in range(self.n_views)]

In [11]:
contrast_transforms = transforms.Compose([transforms.RandomHorizontalFlip(),
                                          transforms.RandomResizedCrop(size=96),
                                          transforms.RandomApply([
                                              transforms.ColorJitter(brightness=0.5,
                                                                     contrast=0.5,
                                                                     saturation=0.5,
                                                                     hue=0.1)
                                          ], p=0.8),
                                          transforms.RandomGrayscale(p=0.2),
                                          transforms.GaussianBlur(kernel_size=9),
                                          transforms.ToTensor(),
                                          transforms.Normalize((0.5,), (0.5,))
                                         ])

In [12]:
unlabeled_data = STL10(root=DATASET_PATH, split='unlabeled', download=True,
                       transform=ContrastiveTransformations(contrast_transforms, n_views=2))
train_data_contrast = STL10(root=DATASET_PATH, split='train', download=True,
                            transform=ContrastiveTransformations(contrast_transforms, n_views=2))

In [21]:
class SimCLR(pl.LightningModule):

    def __init__(self, hidden_dim, lr, temperature, weight_decay, max_epochs=500):
        super().__init__()
        self.save_hyperparameters()
        assert self.hparams.temperature > 0.0, 'The temperature must be a positive float!'


        self.convnet = torchvision.models.resnet18(pretrained=False)
        self.convnet = nn.Sequential(*list(self.convnet.children())[:-1])


        self.projection_head = nn.Sequential(
          nn.Conv2d(512, 256, kernel_size=1),
          nn.BatchNorm2d(256),
          nn.ReLU(),
          nn.Conv2d(256, 128, kernel_size=1),
          nn.AdaptiveAvgPool2d((1, 1)),
          nn.Flatten()
      )

    def info_nce_loss(self, batch, mode='train'):
        imgs, _ = batch
        imgs = torch.cat(imgs, dim=0)
        feats = self.convnet(imgs)
        projected_feats = self.projection_head(feats)


        cos_sim = F.cosine_similarity(projected_feats[:, None, :], projected_feats[None, :, :], dim=-1)
        self_mask = torch.eye(cos_sim.shape[0], dtype=torch.bool, device=cos_sim.device)
        cos_sim.masked_fill_(self_mask, -9e15)
        pos_mask = self_mask.roll(shifts=cos_sim.shape[0]//2, dims=0)


        cos_sim = cos_sim / self.hparams.temperature
        nll = -cos_sim[pos_mask] + torch.logsumexp(cos_sim, dim=-1)
        nll = nll.mean()
        self.log(mode+'_loss', nll)


        comb_sim = torch.cat([cos_sim[pos_mask][:, None],
                              cos_sim.masked_fill(pos_mask, -9e15)],
                             dim=-1)
        sim_argsort = comb_sim.argsort(dim=-1, descending=True).argmin(dim=-1)


        self.log(mode+'_acc_top1', (sim_argsort == 0).float().mean())
        self.log(mode+'_acc_top5', (sim_argsort < 5).float().mean())
        self.log(mode+'_acc_mean_pos', 1+sim_argsort.float().mean())

        return nll

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(),
                                lr=self.hparams.lr,
                                weight_decay=self.hparams.weight_decay)
        lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer,
                                                            T_max=self.hparams.max_epochs,
                                                            eta_min=self.hparams.lr/50)
        return [optimizer], [lr_scheduler]

    def training_step(self, batch, batch_idx):
        return self.info_nce_loss(batch, mode='train')

    def validation_step(self, batch, batch_idx):
        return self.info_nce_loss(batch, mode='val')



In [23]:
def train_simclr(batch_size, max_epochs=500, **kwargs):

    trainer = pl.Trainer(
        accelerator="gpu" if str(device).startswith("cuda") else "cpu",
        devices=1,
        max_epochs=max_epochs,
    )

    # Data loaders
    train_loader = data.DataLoader(
        unlabeled_data,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
        pin_memory=True,
        num_workers=NUM_WORKERS
    )

    val_loader = data.DataLoader(
        train_data_contrast,
        batch_size=batch_size,
        shuffle=False,
        drop_last=False,
        pin_memory=True,
        num_workers=NUM_WORKERS
    )


    pl.seed_everything(42)
    model = SimCLR(max_epochs=max_epochs, **kwargs)
    trainer.fit(model, train_loader, val_loader)


    return model


In [26]:
simclr_model = train_simclr(
    batch_size=256,
    hidden_dim=128,
    lr=3e-4,
    temperature=0.05,
    weight_decay=1e-4,
    max_epochs=20
)

INFO:pytorch_lightning.utilities.rank_zero:You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:lightning_fabric.utilities.seed:Seed set to 42
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name            | Type       | Params | Mode 
-------------------------------------------------------
0 | convnet         | Sequential | 11.2 M | train
1 | projection_head | Sequential | 164 K  | train
-------------------------------------------------------
11.3 M    Trainable params
0         Non-trainable params
11.3 M    Total params
45.365    Total estimated model params size (MB)
74  

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=20` reached.


In [27]:
eval_transform = transforms.Compose([
    transforms.Resize(96),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


stl_train = STL10(root=DATASET_PATH, split='train', download=True, transform=eval_transform)
stl_test = STL10(root=DATASET_PATH, split='test', download=True, transform=eval_transform)


  0%|          | 0.00/2.64G [00:00<?, ?B/s][A
  0%|          | 131k/2.64G [00:00<38:23, 1.15MB/s][A
  0%|          | 295k/2.64G [00:00<31:48, 1.38MB/s][A
  0%|          | 492k/2.64G [00:00<27:23, 1.61MB/s][A
  0%|          | 819k/2.64G [00:00<19:59, 2.20MB/s][A
  0%|          | 1.31M/2.64G [00:00<14:13, 3.09MB/s][A
  0%|          | 1.97M/2.64G [00:00<10:33, 4.16MB/s][A
  0%|          | 2.75M/2.64G [00:00<08:22, 5.25MB/s][A
  0%|          | 3.80M/2.64G [00:00<06:29, 6.77MB/s][A
  0%|          | 5.21M/2.64G [00:00<04:57, 8.85MB/s][A
  0%|          | 6.85M/2.64G [00:01<04:00, 10.9MB/s][A
  0%|          | 8.78M/2.64G [00:01<03:18, 13.3MB/s][A
  0%|          | 11.2M/2.64G [00:01<02:42, 16.2MB/s][A
  1%|          | 14.1M/2.64G [00:01<02:14, 19.5MB/s][A
  1%|          | 17.3M/2.64G [00:01<01:53, 23.2MB/s][A
  1%|          | 21.2M/2.64G [00:01<01:35, 27.3MB/s][A
  1%|          | 25.8M/2.64G [00:01<01:21, 32.2MB/s][A
  1%|          | 31.2M/2.64G [00:01<01:08, 38.0MB/s][A
  1%

In [None]:
def extract_features(dataset, model):
    model = model.to(device)
    model.eval()
    dataloader = data.DataLoader(dataset, batch_size=128, shuffle=False, num_workers=NUM_WORKERS)
    features, labels = [], []

    with torch.no_grad():
        for imgs, lbls in tqdm(dataloader, desc="Extracting features"):
            imgs = imgs.to(device)
            feats = model.convnet(imgs)
            feats = feats.view(feats.size(0), -1)  # flatten
            features.append(feats.cpu())
            labels.append(lbls)

    return torch.cat(features).numpy(), torch.cat(labels).numpy()


train_feats, train_labels = extract_features(stl_train, simclr_model)
test_feats, test_labels = extract_features(stl_test, simclr_model)

In [29]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

clf = LogisticRegression(max_iter=1000, solver='lbfgs', multi_class='multinomial')
clf.fit(train_feats, train_labels)


preds = clf.predict(test_feats)
acc = accuracy_score(test_labels, preds)
print(f"Linear Classifier Accuracy: {acc:.4f}")



Linear Classifier Accuracy: 0.6464


In [30]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score



from sklearn.preprocessing import normalize
train_feats = normalize(train_feats)
test_feats = normalize(test_feats)


knn = KNeighborsClassifier(n_neighbors=5, metric='cosine')
knn.fit(train_feats, train_labels)
preds = knn.predict(test_feats)
acc = accuracy_score(test_labels, preds)
print(f"KNN Accuracy (k=5, cosine distance): {acc:.4f}")

KNN Accuracy (k=3, cosine distance): 0.6192
