In [22]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision import models
from PIL import Image
import os
import time
from torchvision.transforms import functional as F
import random
import numpy as np

In [23]:
# Define a custom dataset for property images
class PropertyImageDataset(Dataset):
    def __init__(self, image_folder, transform=None):
        self.image_folder = image_folder
        self.image_paths = [os.path.join(image_folder, img) for img in os.listdir(image_folder) if img.endswith(('.jpg', '.png'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

In [24]:
class Cutout:
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, x):
        w, h = x.shape[2], x.shape[1]  # Use shape instead of size
        mask = torch.ones((1, h, w), dtype=torch.float32)
        for _ in range(self.n_holes):
            y = random.randint(0, h)
            x_coord = random.randint(0, w)  # Renamed to avoid conflict with the x parameter
            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x_coord - self.length // 2, 0, w)
            x2 = np.clip(x_coord + self.length // 2, 0, w)
            mask[:, y1:y2, x1:x2] = 0
        return x * mask

# Define SimCLR augmentations (two views for contrastive learning)
class SimCLRTransform:
    def __init__(self):
        self.transform = transforms.Compose([
            transforms.RandomResizedCrop(224, scale=(0.2, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1)], p=0.8),
            transforms.RandomGrayscale(p=0.2),
            transforms.RandomRotation(10),
            transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)),
            transforms.ToTensor(),  # Move this line up
            Cutout(n_holes=1, length=20),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def __call__(self, x):
        return self.transform(x), self.transform(x)  # Two augmented versions of the same image

In [25]:
class NTXentLoss(nn.Module):
    def __init__(self, batch_size, temperature=0.5):
        super(NTXentLoss, self).__init__()
        self.batch_size = batch_size
        self.temperature = temperature
        self.criterion = nn.CrossEntropyLoss(reduction="sum")
        self.mask = self._get_correlated_mask()

    def _get_correlated_mask(self):
        N = self.batch_size * 2
        mask = torch.ones((N, N), dtype=bool)
        mask.fill_diagonal_(False)
        for i in range(self.batch_size):
            mask[i, i + self.batch_size] = False
            mask[i + self.batch_size, i] = False
        return mask

    def forward(self, z_i, z_j):
        N = self.batch_size * 2
        z = torch.cat((z_i, z_j), dim=0)
        similarity_matrix = torch.matmul(z, z.T) / self.temperature

        mask = self.mask.to(z.device)
        positives = torch.cat([
            torch.diag(similarity_matrix, self.batch_size),
            torch.diag(similarity_matrix, -self.batch_size)
        ]).unsqueeze(1)

        negatives = similarity_matrix[mask].view(N, -1)
        logits = torch.cat((positives, negatives), dim=1)
        labels = torch.zeros(logits.shape[0], dtype=torch.long).to(z.device)

        loss = self.criterion(logits, labels)
        loss /= N
        return loss

In [26]:
# Define the SimCLR model
class SimCLR(nn.Module):
    def __init__(self, base_model, out_dim):
        super(SimCLR, self).__init__()
        self.backbone = nn.Sequential(*list(base_model.children())[:-1])  # Remove the classification layer
        self.projection_head = nn.Sequential(
            nn.Linear(base_model.fc.in_features, 512),
            nn.ReLU(),
            nn.Linear(512, out_dim)
        )

    def forward(self, x):
        h = self.backbone(x).squeeze()  # Get the backbone representation
        z = self.projection_head(h)  # Get the projection
        return h, z

In [27]:
# Training the SimCLR model
def train_simclr(image_folder, epochs=10, batch_size=32, lr=3e-4):
    print("Starting SimCLR training process...")
    transform = SimCLRTransform()
    dataset = PropertyImageDataset(image_folder=image_folder, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

    print(f"Dataset contains {len(dataset)} images.")
    print("Initializing model and optimizer...")

    base_model = models.resnet50(weights='ResNet50_Weights.DEFAULT')  # Load ResNet50 with updated weights
    simclr_model = SimCLR(base_model, out_dim=128)
    optimizer = optim.Adam(simclr_model.parameters(), lr=lr)
    criterion = NTXentLoss(batch_size=batch_size)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    simclr_model = simclr_model.to(device)
    print(f"Model moved to {device}.")

    for epoch in range(epochs):
        print(f"Starting epoch {epoch + 1} of {epochs}...")
        epoch_loss = 0
        for step, (x_i, x_j) in enumerate(dataloader):
            x_i, x_j = x_i.to(device), x_j.to(device)

            # Forward pass for both augmented views
            h_i, z_i = simclr_model(x_i)
            h_j, z_j = simclr_model(x_j)

            # Compute loss
            loss = criterion(z_i, z_j)
            epoch_loss += loss.item()

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Print the progress of training
            current_image_idx = step * batch_size
            print(f"Epoch [{epoch + 1}/{epochs}], Step [{step}/{len(dataloader)}], "
                  f"Image [{current_image_idx + 1}/{len(dataset)}], Loss: {loss.item():.4f}")

        print(f"Epoch [{epoch + 1}/{epochs}] completed. Average Loss: {epoch_loss / len(dataloader):.4f}")

    # Save the trained model
    torch.save(simclr_model.state_dict(), 'simclr_model1.pth')
    print("Training complete. Model saved as 'simclr_model1.pth'.")

In [28]:
if __name__ == "__main__":
    image_folder = r"C:\images"  # Path to your downloaded images folder
    train_simclr(image_folder)

Starting SimCLR training process...
Dataset contains 8158 images.
Initializing model and optimizer...
Model moved to cuda.
Starting epoch 1 of 10...
Epoch [1/10], Step [0/254], Image [1/8158], Loss: 4.0706
Epoch [1/10], Step [1/254], Image [33/8158], Loss: 4.0285
Epoch [1/10], Step [2/254], Image [65/8158], Loss: 3.9384
Epoch [1/10], Step [3/254], Image [97/8158], Loss: 3.7931
Epoch [1/10], Step [4/254], Image [129/8158], Loss: 3.5261
Epoch [1/10], Step [5/254], Image [161/8158], Loss: 3.3340
Epoch [1/10], Step [6/254], Image [193/8158], Loss: 3.2721
Epoch [1/10], Step [7/254], Image [225/8158], Loss: 2.9448
Epoch [1/10], Step [8/254], Image [257/8158], Loss: 3.2044
Epoch [1/10], Step [9/254], Image [289/8158], Loss: 2.8147
Epoch [1/10], Step [10/254], Image [321/8158], Loss: 2.8128
Epoch [1/10], Step [11/254], Image [353/8158], Loss: 2.9969
Epoch [1/10], Step [12/254], Image [385/8158], Loss: 2.7205
Epoch [1/10], Step [13/254], Image [417/8158], Loss: 2.8277
Epoch [1/10], Step [14/254