In [None]:
%pip install cleanlab "cleanlab[image]" datasets

In [None]:
import csv
import numpy as np

def load_embedding(path):
    with open(path, newline='\n') as csvfile:
        spamreader = csv.reader(csvfile, delimiter='\t', quotechar='|')
        embeddings = []
        for row in spamreader:
            array = np.array(np.float32(row))
            embeddings.append(array)
        return np.array(embeddings)
    
embeddings = load_embedding('yoloV8-9b683e5d1039c81f2747a707f9c7f24a86c64a22c1464c6393db1538d42ca634.tsv')
print(embeddings.shape)

In [None]:
from cleanlab.outlier import OutOfDistribution

ood = OutOfDistribution()

ood_train_feature_scores = ood.fit_score(features=embeddings)

In [None]:
import matplotlib.pyplot as plt

plt.hist(ood_train_feature_scores, range=[0,1], bins=50)
plt.show()

In [None]:
siameseTrain = load_embedding('siameseResnetTrain.tsv')
print(siameseTrain.shape)

siameseTest = load_embedding('siameseResnetTest.tsv')
print(siameseTest.shape)

In [None]:

ood = OutOfDistribution()

ood_train_feature_scores = ood.fit_score(features=siameseTrain)
ood_test_feature_scores = ood.score(features=siameseTest)

In [None]:
fifth_percentile = np.percentile(ood_train_feature_scores, 5)  # 5th percentile of the train_data distribution

# Plot outlier_score distributions and the 5th percentile cutoff
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))
plt_range = [min(ood_train_feature_scores.min(),ood_test_feature_scores.min()), \
             max(ood_train_feature_scores.max(),ood_test_feature_scores.max())]
axes[0].hist(ood_train_feature_scores, range=plt_range, bins=50)
axes[0].set(title='train_outlier_scores distribution', ylabel='Frequency')
axes[0].axvline(x=fifth_percentile, color='red', linewidth=2)
axes[1].hist(ood_test_feature_scores, range=plt_range, bins=50)
axes[1].set(title='test_outlier_scores distribution', ylabel='Frequency')
axes[1].axvline(x=fifth_percentile, color='red', linewidth=2)


In [3]:
from torch.utils.data import Dataset
import os
from torchvision import transforms
import torch
from PIL import Image

class ImagePairsDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.features = set()
        self.data = self.load_data(f'{root_dir}/{csv_file}')
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        (img1_path, _, interval1) = self.data[idx]
        # Load images
        img1 = Image.open(os.path.join(self.root_dir, img1_path)).convert('RGB')
        # Apply transformations if provided
        if self.transform:
            img1 = self.transform(img1)

        return img1, interval1

    def load_data(self, csv_file):
        data = []
        with open(csv_file, 'r') as file:
            for line in file:
                filename, video, interval = line.strip().split(',')
                data.append((filename, video, interval))
                self.features.add(interval)
        return data


# Example usage:
transform = transforms.Compose([
    transforms.Resize((640, 360)),
    transforms.ToTensor(),
])

dataset = ImagePairsDataset('tag.csv', 'siamese', transform=transform)
num_classes = len(dataset.features)


from datasets import Dataset

def gen_hf_dataset():
    for image, label in dataset:
        yield {'image': image, 'label': label}


hf_dataset = Dataset.from_generator(gen_hf_dataset)


In [5]:
from torch import nn

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 6, 5),
            nn.ReLU(),
            nn.BatchNorm2d(6),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(6, 16, 5, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(2, 2),
        )
        self.linear = nn.Sequential(nn.LazyLinear(128), nn.ReLU())
        self.output = nn.Sequential(nn.Linear(128, num_classes))

    def forward(self, x):
        x = self.embeddings(x)
        x = self.output(x)
        return x

    def embeddings(self, x):
        x = self.cnn(x)
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = self.linear(x)
        return x

KeyboardInterrupt: 

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps")


In [1]:
from tqdm.autonotebook import tqdm
from torch import optim
import time

# Method to calculate validation accuracy in each epoch

def get_test_accuracy(net, testloader):
    net.eval()
    accuracy = 0.0
    total = 0.0

    with torch.no_grad():
        for data in testloader:
            images, labels = data["image"].to(device), data["label"].to(device)

            # run the model on the test set to predict labels
            outputs = net(images)

            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()

    # compute the accuracy over all test images
    accuracy = 100 * accuracy / total
    return accuracy


# Method for training the model
def train(trainloader, testloader, n_epochs, patience):
    model = Net()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters())

    model = model.to(device)

    best_test_accuracy = 0.0

    for epoch in range(n_epochs):  # loop over the dataset multiple times
        start_epoch = time.time()
        running_loss = 0.0

        for _, data in enumerate(trainloader):
            # get the inputs; data is a dict of {"image": images, "label": labels}

            inputs, labels = data["image"].to(device), data["label"].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.detach().cpu().item()

        # Get accuracy on the test set
        accuracy = get_test_accuracy(model, testloader)

        if accuracy > best_test_accuracy:
            best_epoch = epoch

        # Condition for early stopping
        if epoch - best_epoch > patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

        end_epoch = time.time()

        print(
            f"epoch: {epoch + 1} loss: {running_loss / len(trainloader):.3f} test acc: {accuracy:.3f} time_taken: {end_epoch - start_epoch:.3f}"
        )
    return model


# Method for computing out-of-sample embeddings
def compute_embeddings(model, testloader):
    embeddings_list = []

    with torch.no_grad():
        for data in tqdm(testloader):
            images, labels = data["image"].to(device), data["label"].to(device)

            embeddings = model.embeddings(images)
            embeddings_list.append(embeddings.cpu())

    return torch.vstack(embeddings_list)


# Method for computing out-of-sample predicted probabilities
def compute_pred_probs(model, testloader):
    pred_probs_list = []

    with torch.no_grad():
        for data in tqdm(testloader):
            images, labels = data["image"].to(device), data["label"].to(device)

            outputs = model(images)
            pred_probs_list.append(outputs.cpu())

    return torch.vstack(pred_probs_list)

  from tqdm.autonotebook import tqdm


In [5]:
import multiprocessing
from sklearn.model_selection import StratifiedKFold


K = 3  # Number of cross-validation folds. Set to small value here to ensure quick runtimes, we recommend 5 or 10 in practice for more accurate estimates.
n_epochs = 2  # Number of epochs to train model for. Set to a small value here for quick runtime, you should use a larger value in practice.
patience = 2  # Parameter for early stopping. If the validation accuracy does not improve for this many epochs, training will stop.
train_batch_size = 64  # Batch size for training
test_batch_size = 512  # Batch size for testing
num_workers = multiprocessing.cpu_count()  # Number of workers for data loaders

# Create k splits of the dataset
kfold = StratifiedKFold(n_splits=K, shuffle=True)
splits = kfold.split(hf_dataset, hf_dataset['label'])

train_id_list, test_id_list = [], []

for fold, (train_ids, test_ids) in enumerate(splits):
    train_id_list.append(train_ids)
    test_id_list.append(test_ids)

print(train_id_list)

[array([   0,    1,    3, ..., 9119, 9120, 9122]), array([   0,    2,    4, ..., 9119, 9120, 9121]), array([   1,    2,    3, ..., 9117, 9121, 9122])]




In [7]:
from torch.utils.data import Subset, DataLoader

pred_probs_list, embeddings_list = [], []
embeddings_model = None

for i in range(K):
    print(f"\nTraining on fold: {i+1} ...")

    # Create train and test sets and corresponding dataloaders
    trainset = Subset(hf_dataset, train_id_list[i])
    testset = Subset(hf_dataset, test_id_list[i])

    trainloader = DataLoader(
        trainset,
        batch_size=train_batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
    )
    testloader = DataLoader(
        testset, batch_size=test_batch_size, shuffle=False, num_workers=num_workers, pin_memory=True
    )

    # Train model
    model = train(trainloader, testloader, n_epochs, patience)
    if embeddings_model is None:
        embeddings_model = model

    # Compute out-of-sample embeddings
    print("Computing feature embeddings ...")
    fold_embeddings = compute_embeddings(embeddings_model, testloader)
    embeddings_list.append(fold_embeddings)

    print("Computing predicted probabilities ...")
    # Compute out-of-sample predicted probabilities
    fold_pred_probs = compute_pred_probs(model, testloader)
    pred_probs_list.append(fold_pred_probs)

print("Finished Training")


# Combine embeddings and predicted probabilities from each fold
features = torch.vstack(embeddings_list).numpy()

logits = torch.vstack(pred_probs_list)
pred_probs = nn.Softmax(dim=1)(logits).numpy()

ImportError: cannot import name 'Subset' from 'torch.utils' (/Users/mmandirola/tesis/venv/lib/python3.10/site-packages/torch/utils/__init__.py)