# The Reality Filter

In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from pathlib import Path
from PIL import Image
from torchvision import transforms, models
from torchvision.models import ResNet34_Weights
from torch.utils.data import Dataset, DataLoader
from sklearn.ensemble import RandomForestClassifier

## Load images

In [2]:
class ImageDataset(Dataset):
    def __init__(self, dir_, csv_filename=None):
        super().__init__()
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.dir_ = Path(dir_)
        assert self.dir_.exists() and self.dir_.is_dir(), "image directory doesn't exist"

        self.images = sorted(self.dir_.glob("*.jpg"))
        self.labels = None
        if csv_filename is not None:
            self.labels = pd.read_csv(csv_filename)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_filename = self.images[idx]
        img = self.transform(Image.open(img_filename).convert("RGB"))

        label = torch.nan
        if self.labels is not None:
            label = self.labels.loc[self.labels["CodeID"] == int(img_filename.stem), "Label"].values[0]
        
        return img, label

In [3]:
ds_train = ImageDataset("./train", "./train_data.csv")
ds_test = ImageDataset("./test")

In [4]:
dl_train = DataLoader(ds_train, batch_size=32)
dl_test = DataLoader(ds_test, batch_size=32)

## Subtask 1

In [5]:
img129 = Image.open("./train/129.jpg").convert("RGB")
width, height = img129.size
width, height

(728, 486)

In [6]:
subtask1_rows = [(1, 0, width * height)]

## Subtask 2

In [7]:
labels = ds_train.labels["Label"]
zeros, ones = labels[labels == 0].count().item(), labels[labels == 1].count().item()
zeros, ones

(130, 93)

In [8]:
len(labels)

223

In [9]:
subtask2_rows = [(2, 0, min(zeros, ones) / max(zeros, ones))]

## Define feature extractor

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet34(weights=ResNet34_Weights.DEFAULT).to(device)
model.eval()
feature_extractor = nn.Sequential(*list(model.children())[:-2])

In [11]:
@torch.no_grad()
def to_features(dataloader):
    features = []
    labels = []
    for batch_images, batch_labels in dataloader:
        batch_images = batch_images.to(device)
        
        batch_features = F.adaptive_avg_pool2d(feature_extractor(batch_images), (1, 1)).squeeze((-1, -2))
        
        features.append(batch_features.cpu().detach().numpy())
        labels.append(batch_labels.cpu().detach().numpy())

    features = np.concatenate(features, axis=0)
    labels = np.concatenate(labels, axis=0)
    return features, labels

## Subtask 3

In [12]:
X_train, y_train = to_features(dl_train)

In [13]:
rf_clf = RandomForestClassifier(random_state=999)
rf_clf.fit(X_train, y_train)

In [14]:
X_test, _ = to_features(dl_test)



In [15]:
predictions = rf_clf.predict(X_test)
ids = [int(img.stem) for img in ds_test.images]

subtask3_rows = []
for id_, pred in zip(ids, predictions):
    subtask3_rows.append((3, id_, pred.item()))

## Save answers

In [16]:
submission_rows = subtask1_rows + subtask2_rows + subtask3_rows
df_submission = pd.DataFrame(submission_rows, columns=["subtaskID", "datapointID", "answer"])
df_submission.to_csv("submission.csv", index=False)

## Submission results

Subtask 1:
- Accuracy: 1
- Score: 10/10

Subtask 2:
- Accuracy: 1
- Score: 10/10

Subtask 3:
- Accuracy: 0.666666
- Score: 60/80