In [None]:
!pip install datasets
!pip install transformers

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.init as init
import torchvision as tv
import matplotlib.pyplot as plt
import seaborn as sns
import os
import random

from sklearn.model_selection import train_test_split
from PIL import Image
from tqdm import tqdm
from torchvision.transforms import v2
from torchvision import transforms
from datasets import load_dataset
from transformers import ConvNextV2ForImageClassification
from transformers.models.convnextv2.modeling_convnextv2 import ConvNextV2Embeddings

sns.set_theme()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
 !unzip -q /content/drive/MyDrive/TTA/streetshouses.zip

In [None]:
class StreetAndHousesDataset(torch.utils.data.Dataset):
    def __init__(self, files, transform=None):
        self.files = files
        self.transform = transform
        self.file_paths = list()
        self.labels = list()

        # Real labels
        labels_keys = ["apartment", "bath", "bed", "church", \
                  "commercial", "din", "garage", "house", "industrial", "kitchen", "living", "retail", "roof"]

        LABELS = {key: val for key, val in zip(labels_keys, list(range(len(labels_keys))))}

        for file_path in files:
          if "BdIdx" not in file_path:
            label = os.path.basename(file_path).split("_")[0]
          else:
            label = os.path.basename(file_path).split("_")[-1][:-4]

          # Additional cleaning of labels is required
          if label == "apartments":
            label = "apartment"
          if label == "garages":
            label = "garage"
          if label == "office":
            label = "officebuilding"

          if label in LABELS.keys():
            self.file_paths.append(file_path)
            self.labels.append(LABELS[label])

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        image = Image.open(img_path)

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]

        return image, label

## VGG16

In [None]:
transforms_train = transforms.Compose([
    transforms.RandomResizedCrop(size=(224,224), scale=(0.8, 1.0)),
    transforms.RandomRotation(degrees=30),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transforms_val = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transforms_test = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
# Create paths for val, train and test (with shuffling the data)
dirs = [os.path.join("./streetshouses/kaggle_room_street_data/", path) for path in os.listdir("./streetshouses/kaggle_room_street_data/")]

all_files = []

for dir_path in dirs:
    files = [os.path.join(dir_path, file) for file in os.listdir(dir_path)]
    all_files.extend(files)

train_paths, test_paths = train_test_split(all_files, train_size=0.9, random_state=42, shuffle=True)
train_paths, val_paths = train_test_split(train_paths, train_size=0.78, random_state=42, shuffle=True)

In [None]:
train_ds = StreetAndHousesDataset(files=train_paths, transform=transforms_train)
val_ds = StreetAndHousesDataset(files=val_paths, transform=transforms_val)
test_ds = StreetAndHousesDataset(files=test_paths, transform=transforms_test)

In [None]:
IN_CHANNELS = 3
N_CLASSES = 13

vgg16_model = tv.models.vgg16(weights="IMAGENET1K_V1")

In [None]:
for param in vgg16_model.parameters():
    param.requires_grad = False

In [None]:
# Add on classifier
n_inputs = vgg16_model.classifier[6].in_features
vgg16_model.classifier[6] = nn.Sequential(
    nn.Linear(n_inputs, 256), nn.ReLU(), nn.Dropout(0.6),
    nn.Linear(256, N_CLASSES), nn.LogSoftmax(dim=1))

In [None]:
vgg16_model = vgg16_model.to(device)

In [None]:
total_params = sum(p.numel() for p in vgg16_model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
    p.numel() for p in vgg16_model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} trainable parameters.")

135,312,717 total parameters.
1,052,173 trainable parameters.


In [None]:
BATCH_SIZE = 16
EPOCHS = 35
LEARNING_RATE = 1e-2
MOMENTUM = 0.9
WEIGHT_DECAY = 5e-4

In [None]:
train_dl = torch.utils.data.DataLoader(dataset=train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_dl = torch.utils.data.DataLoader(dataset=val_ds, batch_size=BATCH_SIZE)
test_dl = torch.utils.data.DataLoader(dataset=test_ds, batch_size=BATCH_SIZE)

In [None]:
criterion = nn.NLLLoss()
optimizer = torch.optim.SGD(vgg16_model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer=optimizer, mode="max", factor=0.1, patience=6, verbose=True
)

In [None]:
best_val_loss = 1e7
train_losses = list()
val_losses = list()

for epoch in range(EPOCHS):
  # Train
  vgg16_model.train()
  train_loss = 0.0
  for batch in tqdm(train_dl, desc=f"Epoch {epoch + 1}/{EPOCHS}", ncols=100):
    inputs, labels = batch
    inputs = inputs.to(device)
    labels = labels.to(device)
    optimizer.zero_grad()
    outputs = vgg16_model(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    train_loss += loss.item()

  # Validate
  vgg16_model.eval()
  val_loss = 0.0
  correct_predictions = 0
  total_predictions = 0
  with torch.no_grad():
    for batch in tqdm(val_dl, desc="Validation", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = vgg16_model(inputs)
      loss = criterion(outputs, labels)
      val_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()
  lr_scheduler.step(val_loss)

  if val_loss < best_val_loss:
    best_val_loss = val_loss
    torch.save(vgg16_model.state_dict(), "/content/drive/MyDrive/checkpoints/vgg16-streetsandhouses.pt")

  train_losses.append(train_loss / len(train_dl))
  val_losses.append(val_loss / len(val_dl))

  accuracy = 100 * correct_predictions / total_predictions
  print(f"Epoch {epoch + 1}/{EPOCHS}, Train Loss: {train_loss / len(train_dl):.4f}, Validation Loss: {val_loss / len(val_dl):.4f}, Validation Accuracy: {accuracy:.2f}%")


In [None]:
# Predict on Test set
vgg16_model.eval()
test_loss = 0.0
correct_predictions = 0
total_predictions = 0
with torch.no_grad():
    for batch in tqdm(test_dl, desc="Testing", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = vgg16_model(inputs)
      loss = criterion(outputs, labels)
      test_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()

print(f"\nTest set accuracy = {(100 * correct_predictions / total_predictions):.4f}%")

Testing: 100%|████████████████████████████████████████████████████| 141/141 [00:38<00:00,  3.65it/s]


Test set accuracy = 51.6644%





## ConvNeXt

In [None]:
transforms_train = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.RandAugment(num_ops=2, magnitude=9),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.25)
])

transforms_val = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

transforms_test = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

In [None]:
# Create paths for val, train and test (with shuffling the data)
dirs = [os.path.join("./streetshouses/kaggle_room_street_data/", path) for path in os.listdir("./streetshouses/kaggle_room_street_data/")]

all_files = []

for dir_path in dirs:
    files = [os.path.join(dir_path, file) for file in os.listdir(dir_path)]
    all_files.extend(files)

train_paths, test_paths = train_test_split(all_files, train_size=0.9, random_state=42, shuffle=True)
train_paths, val_paths = train_test_split(train_paths, train_size=0.78, random_state=42, shuffle=True)

In [None]:
train_ds = StreetAndHousesDataset(files=train_paths, transform=transforms_train)
val_ds = StreetAndHousesDataset(files=val_paths, transform=transforms_val)
test_ds = StreetAndHousesDataset(files=test_paths, transform=transforms_test)

In [None]:
IN_CHANNELS = 3
N_CLASSES = 13

convnext_model = tv.models.convnext_base(weights="IMAGENET1K_V1")

In [None]:
for param in convnext_model.parameters():
    param.requires_grad = False

In [None]:
# Add on classifier
n_inputs = convnext_model.classifier[2].in_features
convnext_model.classifier[2] = nn.Sequential(
    nn.Linear(n_inputs, 256), nn.GELU(), nn.Dropout(0.4),
    nn.Linear(256, N_CLASSES), nn.Softmax(dim=1))

In [None]:
convnext_model = convnext_model.to(device)

In [None]:
total_params = sum(p.numel() for p in convnext_model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
    p.numel() for p in convnext_model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} trainable parameters.")

87,832,205 total parameters.
265,741 trainable parameters.


In [None]:
BATCH_SIZE = 16
EPOCHS = 30
LEARNING_RATE = 5e-5
WEIGHT_DECAY = 1e-8

In [None]:
train_dl = torch.utils.data.DataLoader(dataset=train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_dl = torch.utils.data.DataLoader(dataset=val_ds, batch_size=BATCH_SIZE)
test_dl = torch.utils.data.DataLoader(dataset=test_ds, batch_size=BATCH_SIZE)

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(convnext_model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=4)

In [None]:
# Additional ConvNeXt augmentations
cutmix = v2.CutMix(num_classes=N_CLASSES)
mixup = v2.MixUp(num_classes=N_CLASSES)
cutmix_or_mixup = v2.RandomChoice([cutmix, mixup])

In [None]:
best_val_loss = 1e7
train_losses = list()
val_losses = list()

for epoch in range(EPOCHS):
  # Train
  convnext_model.train()
  train_loss = 0.0
  for batch in tqdm(train_dl, desc=f"Epoch {epoch + 1}/{EPOCHS}", ncols=100):
    inputs, labels = batch
    inputs = inputs.to(device)
    labels = labels.to(device)
    optimizer.zero_grad()
    outputs = convnext_model(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    train_loss += loss.item()

  # Validate
  convnext_model.eval()
  val_loss = 0.0
  correct_predictions = 0
  total_predictions = 0
  with torch.no_grad():
    for batch in tqdm(val_dl, desc="Validation", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = convnext_model(inputs)
      loss = criterion(outputs, labels)
      val_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()
  lr_scheduler.step()

  if val_loss < best_val_loss:
    best_val_loss = val_loss
    torch.save(convnext_model.state_dict(), "./convnext-streetsandhouses.pt")

  train_losses.append(train_loss / len(train_dl))
  val_losses.append(val_loss / len(val_dl))

  accuracy = 100 * correct_predictions / total_predictions
  print(f"Epoch {epoch + 1}/{EPOCHS}, Train Loss: {train_loss / len(train_dl):.4f}, Validation Loss: {val_loss / len(val_dl):.4f}, Validation Accuracy: {accuracy:.2f}%")


In [None]:
# Predict on Test set
convnext_model.eval()
test_loss = 0.0
correct_predictions = 0
total_predictions = 0
with torch.no_grad():
    for batch in tqdm(test_dl, desc="Testing", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = convnext_model(inputs)
      loss = criterion(outputs, labels)
      test_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()

print(f"\nTest set accuracy = {(100 * correct_predictions / total_predictions):.4f}%")

Testing: 100%|████████████████████████████████████████████████████| 141/141 [00:17<00:00,  8.14it/s]


Test set accuracy = 69.4629%





## ConvNeXt V2

In [None]:
transforms_train = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.RandAugment(num_ops=2, magnitude=9),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.25)
])

transforms_val = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

transforms_test = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

In [None]:
# Create paths for val, train and test (with shuffling the data)
dirs = [os.path.join("./streetshouses/kaggle_room_street_data/", path) for path in os.listdir("./streetshouses/kaggle_room_street_data/")]

all_files = []

for dir_path in dirs:
    files = [os.path.join(dir_path, file) for file in os.listdir(dir_path)]
    all_files.extend(files)

train_paths, test_paths = train_test_split(all_files, train_size=0.9, random_state=42, shuffle=True)
train_paths, val_paths = train_test_split(train_paths, train_size=0.78, random_state=42, shuffle=True)

In [None]:
IN_CHANNELS = 3
N_CLASSES = 13

convnextv2_model = ConvNextV2ForImageClassification.from_pretrained("facebook/convnextv2-base-1k-224")

In [None]:
for param in convnextv2_model.parameters():
    param.requires_grad = False

In [None]:
# Add on classifier
n_inputs = convnextv2_model.classifier.in_features
convnextv2_model.classifier = nn.Sequential(
    nn.Linear(n_inputs, 256), nn.GELU(), nn.Dropout(0.4),
    nn.Linear(256, N_CLASSES), nn.Softmax(dim=1))

In [None]:
convnextv2_model = convnextv2_model.to(device)

In [None]:
total_params = sum(p.numel() for p in convnextv2_model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
    p.numel() for p in convnextv2_model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} trainable parameters.")

87,958,541 total parameters.
265,741 trainable parameters.


In [None]:
BATCH_SIZE = 16
EPOCHS = 50
LEARNING_RATE = 6.25e-4
WEIGHT_DECAY = 0.05

In [None]:
train_dl = torch.utils.data.DataLoader(dataset=train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_dl = torch.utils.data.DataLoader(dataset=val_ds, batch_size=BATCH_SIZE)
test_dl = torch.utils.data.DataLoader(dataset=test_ds, batch_size=BATCH_SIZE)

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(convnextv2_model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=6)

In [None]:
# Additional ConvNeXt augmentations
cutmix = v2.CutMix(num_classes=N_CLASSES)
mixup = v2.MixUp(alpha=0.8, num_classes=N_CLASSES)
cutmix_or_mixup = v2.RandomChoice([cutmix, mixup])

In [None]:
best_val_loss = 1e7
train_losses = list()
val_losses = list()

for epoch in range(EPOCHS - 17):
  # Train
  convnextv2_model.train()
  train_loss = 0.0
  for batch in tqdm(train_dl, desc=f"Epoch {epoch + 1}/{EPOCHS}", ncols=100):
    inputs, labels = batch
    inputs, labels = cutmix_or_mixup(inputs, labels)
    inputs = inputs.to(device)
    labels = labels.to(device)
    optimizer.zero_grad()
    outputs = convnextv2_model(inputs)
    loss = criterion(outputs[0], labels)
    loss.backward()
    optimizer.step()
    train_loss += loss.item()

  # Validate
  convnextv2_model.eval()
  val_loss = 0.0
  correct_predictions = 0
  total_predictions = 0
  with torch.no_grad():
    for batch in tqdm(val_dl, desc="Validation", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = convnextv2_model(inputs)
      loss = criterion(outputs[0], labels)
      val_loss += loss.item()
      _, predicted = torch.max(outputs[0], 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()
  lr_scheduler.step()

  if val_loss < best_val_loss:
    best_val_loss = val_loss
    torch.save(convnextv2_model.state_dict(), "convnextv2-streetsandhouses.pt")

  train_losses.append(train_loss / len(train_dl))
  val_losses.append(val_loss / len(val_dl))

  accuracy = 100 * correct_predictions / total_predictions
  print(f"Epoch {epoch + 1}/{EPOCHS}, Train Loss: {train_loss / len(train_dl):.4f}, Validation Loss: {val_loss / len(val_dl):.4f}, Validation Accuracy: {accuracy:.2f}%")

In [None]:
# Predict on Test set
convnextv2_model.eval()
test_loss = 0.0
correct_predictions = 0
total_predictions = 0
with torch.no_grad():
    for batch in tqdm(test_dl, desc="Testing", ncols=100):
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = convnextv2_model(inputs)
      loss = criterion(outputs[0], labels)
      test_loss += loss.item()
      _, predicted = torch.max(outputs[0], 1)
      total_predictions += labels.size(0)
      correct_predictions += (predicted == labels).sum().item()

print(f"\nTest set accuracy = {(100 * correct_predictions / total_predictions):.4f}%")

Testing:   0%|                                                              | 0/141 [00:00<?, ?it/s]

Testing: 100%|████████████████████████████████████████████████████| 141/141 [00:51<00:00,  2.71it/s]


Test set accuracy = 70.2175%



