In [1]:
! pip install -q kaggle

In [None]:
import os
# Check if the dataset file already exists
dataset_filename = "breast-ultrasound-images-dataset.zip"  # Replace with the actual filename

if not os.path.exists(dataset_filename):
    # download dataset from Kaggle if not present
    ! pip install -q kaggle
    files.upload()
    # upload json file from your Kaggle account
    ! mkdir ~/.kaggle
    ! cp kaggle.json ~/.kaggle/
    ! chmod 600 ~/.kaggle/kaggle.json
    ! kaggle datasets download -d aryashah2k/breast-ultrasound-images-dataset
else:
    print(f"Dataset file '{dataset_filename}' already exists. Skipping download.")

In [None]:
# prompt: if the folder Dataset_BUSI_with_GT exists then skip unzip file otherwise unzip file

import os

# Check if the folder exists
folder_name = "Dataset_BUSI_with_GT"
if not os.path.exists(folder_name):
    # Unzip the file if the folder doesn't exist
    !unzip breast-ultrasound-images-dataset.zip
else:
    print(f"Folder '{folder_name}' already exists. Skipping unzip.")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler

import torchvision.datasets as dset
import torchvision.transforms as T

import numpy as np

USE_GPU = True
dtype = torch.float32 # We will be using float throughout this tutorial.

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# Constant to control how frequently we print train loss.
print_every = 100
print('using device:', device)

In [None]:
# prompt: build training testing validation set using data in the folder dataset_busi_with_gt, each data point is a photo, label is the folder name benign, malignant, normal. we only need data that does not end with _mask.png

import os
import random
from PIL import Image
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from sklearn.model_selection import train_test_split

# Define data directory
data_dir = 'Dataset_BUSI_with_GT'

# Define image extensions to consider
image_extensions = ('.jpg', '.jpeg', '.png')


# Function to load and preprocess images
def load_images_and_labels(data_dir):
    images = []
    labels = []
    class_to_idx = {}
    idx_to_class = {}
    current_idx = 0
    for class_name in os.listdir(data_dir):
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            class_to_idx[class_name] = current_idx
            idx_to_class[current_idx] = class_name
            current_idx += 1
            for filename in os.listdir(class_dir):
                if filename.lower().endswith(image_extensions) and not filename.lower().endswith('_mask.png'):
                    img_path = os.path.join(class_dir, filename)
                    try:
                        image = Image.open(img_path)
                        images.append((image, class_name))
                    except Exception as e:
                        print(f"Error loading image {img_path}: {e}")

    return images, class_to_idx, idx_to_class

# Load images and labels
images, class_to_idx, idx_to_class = load_images_and_labels(data_dir)

# Split data
train_val_images, test_images = train_test_split(images, test_size=0.2, random_state=42)
train_images, val_images = train_test_split(train_val_images, test_size=0.2, random_state=42)

# Separate images and labels
train_images, train_labels = zip(*train_images)
val_images, val_labels = zip(*val_images)
test_images, test_labels = zip(*test_images)

# Map class names to indices for labels
train_labels = [class_to_idx[label] for label in train_labels]
val_labels = [class_to_idx[label] for label in val_labels]
test_labels = [class_to_idx[label] for label in test_labels]

# Print split sizes
print(f"Train set size: {len(train_images)}, Labels: {len(train_labels)}")
print(f"Validation set size: {len(val_images)}, Labels: {len(val_labels)}")
print(f"Test set size: {len(test_images)}, Labels: {len(test_labels)}")




In [None]:
# prompt: classify the above dataset using imagenet

import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import torch

# Load a pre-trained ResNet model (you can choose other models too)
model = models.resnet18(pretrained=True)
model.eval()  # Set the model to evaluation mode

# Define the image transformations for ImageNet
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Function to classify a single image
def classify_image(image_path):
    try:
        img = Image.open(image_path)
        img_t = transform(img)
        batch_t = torch.unsqueeze(img_t, 0)

        with torch.no_grad():
            out = model(batch_t)

        _, index = torch.max(out, 1)

        # Load ImageNet class labels
        with open('imagenet_classes.txt') as f:
            classes = [line.strip() for line in f.readlines()]

        percentage = torch.nn.functional.softmax(out, dim=1)[0] * 100

        # Print top 5 predictions
        _, indices = torch.sort(out, descending=True)
        for idx in indices[0][:5]:
            print(f"{classes[idx]}: {percentage[idx].item():.2f}%")

    except FileNotFoundError:
        print(f"Error: Image file not found at {image_path}")
    except Exception as e:
        print(f"An error occurred: {e}")


# Example usage: Classify an image from the dataset
# You'll need to replace 'path/to/your/image.jpg' with the actual path to an image
# from the dataset.

# Example image path
image_path_example = os.path.join(data_dir, "benign", "benign (1).png")

# Download the imagenet class labels
!wget https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt

classify_image(image_path_example)


# To classify multiple images, you can iterate through your image list:
# for image_path in your_image_list:
#    classify_image(image_path)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset

transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale to 3 channels
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


# Custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Prepare datasets
train_dataset = CustomDataset(train_images, train_labels, transform)
val_dataset = CustomDataset(val_images, val_labels, transform)
test_dataset = CustomDataset(test_images, test_labels, transform)

# Prepare data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Load a pretrained model
model = models.resnet50(pretrained=True)

# Modify the final fully connected layer to match the number of classes
num_classes = len(class_to_idx)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

# Training and validation loop
num_epochs = 10
best_val_accuracy = 0.0

for epoch in range(num_epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    train_accuracy = 100 * correct_train / total_train

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_accuracy = 100 * correct_val / total_val

    print(f"Epoch {epoch+1}/{num_epochs}, "
          f"Train Loss: {train_loss/len(train_loader):.4f}, Train Accuracy: {train_accuracy:.2f}%, "
          f"Validation Loss: {val_loss/len(val_loader):.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    # Save the best model
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), "best_model.pth")

# Test phase
model.load_state_dict(torch.load("best_model.pth"))
model.eval()
correct_test = 0
total_test = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total_test += labels.size(0)
        correct_test += (predicted == labels).sum().item()

test_accuracy = 100 * correct_test / total_test
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Define a custom dataset
class CustomImageDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Image transformation
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Ensure images are grayscale
    transforms.Resize((256, 256)),  # Resize to 256x256
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize for 1 channel
])
# Create datasets
train_dataset = CustomImageDataset(train_images, train_labels, transform=transform)
val_dataset = CustomImageDataset(val_images, val_labels, transform=transform)
test_dataset = CustomImageDataset(test_images, test_labels, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define a simple CNN
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)  # Change input channels to 1
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 64 * 64, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize model, loss, and optimizer
num_classes = len(class_to_idx)
model = SimpleCNN(num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.float(), labels
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_accuracy = 100 * correct / total
        val_accuracy, val_loss = evaluate_model(model, val_loader, criterion)

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
              f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Evaluation function
def evaluate_model(model, data_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.float(), labels
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy, val_loss / len(data_loader)

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)

# Test the model
test_accuracy, _ = evaluate_model(model, test_loader, criterion)
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
# with image resized to 128*128
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Define a custom dataset
class CustomImageDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Image transformation
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Ensure images are grayscale
    transforms.Resize((128, 128)),  # Resize to 256x256
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize for 1 channel
])
# Create datasets
train_dataset = CustomImageDataset(train_images, train_labels, transform=transform)
val_dataset = CustomImageDataset(val_images, val_labels, transform=transform)
test_dataset = CustomImageDataset(test_images, test_labels, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define a simple CNN
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)  # Change input channels to 1
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(32 * 32 * 64, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize model, loss, and optimizer
num_classes = len(class_to_idx)
model = SimpleCNN(num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.float(), labels
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_accuracy = 100 * correct / total
        val_accuracy, val_loss = evaluate_model(model, val_loader, criterion)

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, "
              f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Evaluation function
def evaluate_model(model, data_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.float(), labels
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy, val_loss / len(data_loader)

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)

# Test the model
test_accuracy, _ = evaluate_model(model, test_loader, criterion)
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
class DeeperCNN(nn.Module):
    def __init__(self, num_classes):
        super(DeeperCNN, self).__init__()

        # First convolutional block
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Second convolutional block
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Third convolutional block
        self.conv5 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Fully connected layers
        self.fc1 = None  # Placeholder for dynamic initialization
        self.fc2 = None  # Placeholder for dynamic initialization
        self.relu = nn.ReLU()

        # Output layer
        self.num_classes = num_classes

    def forward(self, x):
        # Forward pass through convolutional layers
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.pool1(x)

        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        x = self.pool2(x)

        x = self.relu(self.conv5(x))
        x = self.pool3(x)

        # Dynamically initialize fully connected layers based on input size
        if self.fc1 is None:
            flattened_size = x.view(x.size(0), -1).size(1)
            self.fc1 = nn.Linear(flattened_size, 512).to(x.device)
            self.fc2 = nn.Linear(512, self.num_classes).to(x.device)

        # Flatten and pass through fully connected layers
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)

        return x


In [None]:
num_classes = len(class_to_idx)
model = DeeperCNN(num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)


In [None]:
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch

# Load the CLIP model and processor
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

In [None]:
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch

# Load the CLIP model and processor
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)


In [None]:


def zero_shot_classify(image_paths, class_labels):
    results = {}
    for img_path in image_paths:
        image = Image.open(img_path)

        # Process the image and text labels
        inputs = processor(
            text=class_labels,
            images=image,
            return_tensors='pt',
            padding=True,
        ).to(device)

        # Perform classification
        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image  # Image-to-text similarity scores
        probabilities = logits_per_image.softmax(dim=1)

        # Store probabilities
        results[img_path] = {label: prob.item() for label, prob in zip(class_labels, probabilities[0])}

    return results

# Dynamically generate test image paths based on dataset structure
test_image_paths = [
    os.path.join(data_dir, idx_to_class[label], img.filename) for img, label in zip(test_images, test_labels)
]

# Define class labels (replace with your actual class labels)
class_labels = list(class_to_idx.keys()) # Get class labels from class_to_idx dictionary

# Perform zero-shot classification
zero_shot_results = zero_shot_classify(test_image_paths, class_labels)

# Print results
for img_path, probs in zero_shot_results.items():
    print(f'Results for {img_path}:')
    for label, prob in probs.items():
        print(f'  {label}: {prob:.4f}')

In [None]:
# labels can be replaced by more descriptive langauage
class_labels

In [None]:

# Create a mapping from numerical indices to the new descriptive labels
idx_to_new_label = {
    0: 'an ultrasonic image of normal breast tissue',
    1: 'an ultrasonic image of benign breast tumor',
    2: 'an ultrasonic image of malignant breast tumor',
}

# Convert the numerical labels in train/val/test datasets to the new descriptive labels
train_labels_new = [idx_to_new_label[label] for label in train_labels]
val_labels_new = [idx_to_new_label[label] for label in val_labels]
test_labels_new = [idx_to_new_label[label] for label in test_labels]


# Now, use these new labels when creating your datasets:

# Create datasets with new labels
train_dataset = CustomDataset(train_images, train_labels_new, transform=transform)
val_dataset = CustomDataset(val_images, val_labels_new, transform=transform)
test_dataset = CustomDataset(test_images, test_labels_new, transform=transform)




In [None]:

class_to_idx = {label: idx for idx, label in enumerate(class_labels)}
class_to_idx


In [None]:
# prompt: label the test data by the highest probability and calculate the test error

# Function to label test data and calculate test error
def label_and_evaluate(zero_shot_results, test_labels, class_to_idx):
    correct_predictions = 0
    total_predictions = len(test_labels)
    predicted_labels = []

    for img_path, probs in zero_shot_results.items():
        # Find the class label with the highest probability
        predicted_label = max(probs, key=probs.get)
        predicted_labels.append(predicted_label)

        # Convert predicted label to numerical index
        try:
            predicted_label_index = list(class_to_idx.keys()).index(predicted_label)
        except ValueError:
          print(f"Warning: Predicted label '{predicted_label}' not found in class_to_idx. Skipping.")
          continue

        # Get the true label for the current image
        # Assuming test_labels contains numerical indices corresponding to class_to_idx
        true_label_index = test_labels[list(zero_shot_results.keys()).index(img_path)]

        if predicted_label_index == true_label_index:
            correct_predictions += 1

    test_error = 1 - (correct_predictions / total_predictions)
    print(f"Test Error: {test_error:.4f}")
    return test_error


# Assuming you have test_labels and class_to_idx defined
predicted_labels = label_and_evaluate(zero_shot_results, test_labels, class_to_idx)

In [None]:
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim

# Transformations for training and validation
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Create datasets and data loaders
train_dataset = CustomDataset(train_images, train_labels, transform)
val_dataset = CustomDataset(val_images, val_labels, transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Load the CLIP model and processor
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
# Modify the CLIP model for fine-tuning
vision_model = model.vision_model  # Use only the vision encoder
num_classes = len(class_labels)

# Modify the CLIP model for fine-tuning
vision_model = model.vision_model  # Use only the vision encoder
num_classes = len(class_labels)

# Get the output size of the vision model
with torch.no_grad():
    output_size = vision_model(torch.randn(1, 3, 224, 224).to(device)).pooler_output.shape[1]

# Add a new classification head with the correct input size
classifier = nn.Linear(output_size, num_classes)
vision_model.classifier = classifier.to(device)


# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=0.001)

# Fine-tuning loop
num_epochs = 10
vision_model.train()

for epoch in range(num_epochs):
    total_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = vision_model(images)
        outputs = vision_model.classifier(outputs.pooler_output) # Extract pooler_output
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    # Validation loop
    vision_model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = vision_model(images)
            outputs = vision_model.classifier(outputs.pooler_output)
            loss = criterion(outputs, labels)


            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct_val += (predicted == labels).sum().item()
            total_val += labels.size(0)

    train_acc = 100 * correct / total
    val_acc = 100 * correct_val / total_val
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}, "
          f"Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%")

# Save the fine-tuned model
torch.save(vision_model.state_dict(), "clip_fine_tuned.pth")


In [None]:
# prompt: print out the label predicted on the test set, the true label of the test set and test error based on the fine tuned model

# Load the fine-tuned model
vision_model.load_state_dict(torch.load("clip_fine_tuned.pth"))
vision_model.eval()

# Create the test dataset and data loader
test_dataset = CustomDataset(test_images, test_labels, transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Perform predictions on the test set
predicted_labels = []
true_labels = []
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = vision_model(images)
        outputs = vision_model.classifier(outputs.pooler_output)
        _, predicted = torch.max(outputs, 1)

        predicted_labels.extend(predicted.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate test error
test_error = 1 - (correct / total)

# Print the results
print("Predicted Labels:", predicted_labels)
print("True Labels:", true_labels)
print(f"Test Error: {test_error:.4f}")