In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
from sklearn.model_selection import train_test_split
from torchvision.transforms import ToTensor
from torchvision import transforms
import torch
from torch.utils.data import DataLoader
from torchvision import transforms, models
from datasets import load_dataset
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader  
import matplotlib.pyplot as plt
from datasets import load_dataset
import torch.distributed as dist
from torchvision.models import resnet18



In [5]:
#Image net Tiny
ds = load_dataset("zh-plus/tiny-imagenet")
print(ds)
pretrain = ds['train']
finetune = ds['valid']


DatasetDict({
    train: Dataset({
        features: ['image', 'label'],
        num_rows: 100000
    })
    valid: Dataset({
        features: ['image', 'label'],
        num_rows: 10000
    })
})


**Barlow twin pretraining**

In [19]:
reduce_size =     transforms.Resize((64))

random_transform = transforms.Compose([
    transforms.RandomResizedCrop(64),
    transforms.RandomHorizontalFlip(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = resnet18(pretrained=False)
        self.backbone.fc = nn.Identity()  # Remove classification head
        self.projection = nn.Sequential(
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 256)
        )
    
    def forward(self, x):
        x = self.backbone(x)
        x = self.projection(x)
        return x

# Loss function for cross-correlation
def barlow_twins_loss(z1, z2, lambda_=5e-3):
    # Batch size
    batch_size = z1.size(0)
    
    # Normalize the embeddings
    z1_norm = (z1 - z1.mean(0)) / z1.std(0)
    z2_norm = (z2 - z2.mean(0)) / z2.std(0)
    
    # Compute cross-correlation matrix
    c = torch.mm(z1_norm.T, z2_norm) / batch_size
    
    # Identity matrix
    on_diag = torch.diagonal(c).add_(-1).pow(2).sum()  # On-diagonal terms
    off_diag = (c - torch.eye(c.size(0), device=c.device)).pow(2).sum() - on_diag  # Off-diagonal terms
    
    # Loss
    return on_diag + lambda_ * off_diag


class PreTrainDataset(Dataset):
    def __init__(self, data ,transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]['image']
        if not img.mode == "RGB":
            img = img.convert("RGB")
        img=ToTensor()(img)
        if self.transform:
            img = self.transform(img)
        return img

class TestDataset(Dataset):
    def __init__(self, data ,transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]['image']
        label = self.data[idx]['label']
        if not img.mode == "RGB":
            img = img.convert("RGB")
        img=ToTensor()(img)
        if self.transform:
            img = self.transform(img)
        return img,label

**Fine Tuning**

In [16]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
dataset = PreTrainDataset(pretrain,transform=reduce_size)
loader = DataLoader(dataset, batch_size=1024, shuffle=True)

encoder = Encoder().to(device)
encoder = nn.DataParallel(encoder)
optimizer = optim.Adam(encoder.parameters(), lr=0.001)




In [18]:
# Training loop for barlow twins
for epoch in range(10):  # Number of epochs
    encoder.train()
    for x in loader:
        # Move data to GPU
        x = x.to(device)
        
        # Apply two random augmentations
        x_a = random_transform(x)
        x_b = random_transform(x)
        
        # Forward passes
        z_a = encoder(x_a)
        z_b = encoder(x_b)
        
        # Compute loss
        loss = barlow_twins_loss(z_a, z_b)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

Epoch 1, Loss: 19.6493
Epoch 2, Loss: 11.5978
Epoch 3, Loss: 9.1249
Epoch 4, Loss: 8.1447
Epoch 5, Loss: 9.5854
Epoch 6, Loss: 9.0360
Epoch 7, Loss: 25.7458
Epoch 8, Loss: 7.0035
Epoch 9, Loss: 10.0565
Epoch 10, Loss: 7.7338


In [29]:
class DownstreamModel(nn.Module):
    def __init__(self, encoder, num_classes):
        super().__init__()
        if encoder.module:
            self.encoder = encoder.module
        else:
            self.encoder = encoder
        self.head = nn.Linear(512, num_classes)  # Output features = num_classes

    def forward(self, x):
        x = self.encoder.backbone(x)  # Use only the backbone
        x = self.head(x)
        return x


In [30]:
test_data = TestDataset(finetune,transform=reduce_size)
test_loader = loader = DataLoader(test_data, batch_size=1024, shuffle=False)
model = DownstreamModel(encoder,200).to(device)
model = nn.DataParallel(model)

In [31]:

# Set the model in evaluation mode
model.eval()

# List to store predictions and ground truth labels
all_preds = []
all_labels = []

# Disable gradient computation for inference
with torch.no_grad():
    for inputs, labels in test_loader:  # Assuming test_loader provides batches
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward pass through the model
        outputs = model(inputs)  # Get model predictions
        
        # Get the predicted class (for classification, typically the class with the max probability)
        _, preds = torch.max(outputs, 1)  # '1' indicates class-wise max
        
        # Store predictions and true labels
        all_preds.append(preds.cpu().numpy())  # Move predictions to CPU and convert to numpy
        all_labels.append(labels.cpu().numpy())  # Move true labels to CPU and convert to numpy

# Convert lists to numpy arrays
all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)

# Compute accuracy
accuracy = np.sum(all_preds == all_labels) / len(all_labels)
print(f"Test Accuracy: {accuracy * 100:.2f}%")


Test Accuracy: 0.41%


In [3]:
# Helper functions
class PILDataset(Dataset):
    def __init__(self, data ,transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img,label = self.data[idx]['image'],self.data[idx]['label']
        if not img.mode == "RGB":
            img = img.convert("RGB")
        img=ToTensor()(img)
        label = self.labels[idx]
        if self.transform:
            img = self.transform(img)
        return img,label
train_transforms = transforms.Compose([
    # Resize the image to a fixed size (if needed)
    transforms.Resize((64, 64)),
    
    # Randomly crop the image to add variety
    transforms.RandomCrop(64, padding=4),
    
    # Randomly flip the image horizontally with a 50% chance
    transforms.RandomHorizontalFlip(),
    
    # Apply a slight random blur to the image
    transforms.GaussianBlur(kernel_size=(3, 3)),
    
    # Convert the image tensor to grayscale (if you want grayscale images)
    # Uncomment if needed:
    # transforms.Grayscale(num_output_channels=3),
    
    # Normalize the image tensor (after applying augmentation)
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    
    # Optionally, add a color jitter for brightness, contrast, saturation, and hue
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
])

eval_transforms = transforms.Compose([
    # Resize the image to a fixed size (if needed)
    transforms.Resize((64, 64)),
    
    # Randomly crop the image to add variety
    transforms.RandomCrop(64, padding=4),
    
    # Randomly flip the image horizontally with a 50% chance
    transforms.RandomHorizontalFlip(),
    
    # Apply a slight random blur to the image
    transforms.GaussianBlur(kernel_size=(3, 3)),
    
    # Convert the image tensor to grayscale (if you want grayscale images)
    # Uncomment if needed:
    # transforms.Grayscale(num_output_channels=3),
    
    # Normalize the image tensor (after applying augmentation)
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    
    # Optionally, add a color jitter for brightness, contrast, saturation, and hue
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
])




def show_image(image,size):
    plt.figure(figsize=(size, size))
    plt.imshow(image)
    plt.axis('off')  # Hide axes for better display
    plt.show()

def data_split(dataset,test_size=0.1,labels=None,seed =42):
# DATASET IS PIL DATASET
    indices  =range(len(dataset))
    pre_train_indices, train_indices = train_test_split(
        indices, 
        test_size=test_size, 
        random_state=seed, 
        stratify=labels ) # Use labels for stratification
    train= dataset.select(pre_train_indices)
    test = dataset.select(train_indices)

    return train,test

In [6]:
# # whole imagenet
# ds = load_dataset("evanarlian/imagenet_1k_resized_256")
# print(ds)

In [8]:
train_dataset = PILDataset(train)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

for images, labels in train_loader:
    print(images.shape, labels.shape)
    break  

torch.Size([64, 3, 64, 64]) torch.Size([64])


In [12]:

model = models.resnet18(weights=None)
model.fc = nn.Linear(model.fc.in_features, 200)  # Tiny ImageNet has 200 classes

model.train()

criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(model.parameters(), lr=0.001)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [13]:
num_epochs = 10
model.to(device)
model = nn.DataParallel(model)
for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    for images,labels in train_loader:
        # Get inputs and labels
        images, labels = images.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
       

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update the loss and accuracy
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
# 8. Training loop


    # Print the loss and accuracy for every epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct/total:.2f}%")

print("Finished Training")

  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


KeyboardInterrupt: 