In [0]:
import warnings
%load_ext autoreload
%autoreload 2
warnings.filterwarnings("ignore", module="threadpoolctl")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [0]:
%pip install opencv-python-headless kaleido open_clip_torch pandas imgaug

import cv2
import kaleido
import numpy as np
import open_clip
import pandas as pd
import time
import requests
import torch
import torchvision.transforms as T
from imgaug import augmenters as iaa
from urllib.parse import urlparse

from functions.gridsearch_modular import *
from functions.prep_functions import *

class embedder_patch:
    """
    Embeds an image by splitting it into overlapping 224x224 patches, embedding each, and averaging the results.
    """
    def __init__(self, model, device, patch_size=224, normalize=([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])):
        self.model = model
        self.device = device
        self.patch_size = patch_size
        self.normalize = normalize

    def __str__(self):
        return f"{self.__class__.__name__}_{self.model.__class__.__name__}"

    def ensure_rgb(self, image):
        if len(image.shape) == 2:
            return np.stack([image]*3, axis=-1)
        elif len(image.shape) == 3 and image.shape[2] == 1:
            return np.repeat(image, 3, axis=2)
        elif len(image.shape) == 3 and image.shape[2] == 3:
            return image
        else:
            raise ValueError("Unsupported image shape: {}".format(image.shape))

    def get_patch_positions(self, img_dim):
        if img_dim <= self.patch_size:
            return [0]
        positions = []
        stride = self.patch_size
        pos = 0
        while pos + self.patch_size < img_dim:
            positions.append(pos)
            pos += stride
        positions.append(img_dim - self.patch_size)
        return positions

    def embed(self, image, normalize=None):
        """
        Returns an embedding for the input image by averaging the embeddings of overlapping patches.

        Args:
            image: np.array, RGB or grayscale image.
            normalize: Optional normalization values to use for the image. If None, uses self.normalize.

        Returns:
            embedding: np.array, the image embedding.
            error: None if no error, else an error message string.
        """
        if normalize is None:
            normalize = self.normalize
        
        try:
            transform = T.Compose([
                T.ToPILImage(),
                T.Resize((self.patch_size, self.patch_size)),
                T.ToTensor(),
                T.Normalize(mean=normalize[0], std=normalize[1])
            ])
            image = self.ensure_rgb(image)
            h, w, _ = image.shape
            x_positions = self.get_patch_positions(w)
            y_positions = self.get_patch_positions(h)
            patch_embeddings = []

            for y in y_positions:
                for x in x_positions:
                    patch = image[y:y+self.patch_size, x:x+self.patch_size]
                    patch = self.ensure_rgb(patch)
                    patch_tensor = transform(patch).unsqueeze(0).to(self.device)
                    with torch.no_grad():
                        if (self.model.__class__.__name__ == "CLIP"):
                            emb = self.model.encode_image(patch_tensor).cpu().numpy().squeeze()
                        else:
                            emb = self.model(patch_tensor).cpu().numpy().squeeze()
                    patch_embeddings.append(emb)
            image_tensor = transform(image).unsqueeze(0).to(self.device)
            with torch.no_grad():
                if (self.model.__class__.__name__ == "CLIP"):
                    emb = self.model.encode_image(image_tensor).cpu().numpy().squeeze()
                else:
                    emb = self.model(image_tensor).cpu().numpy().squeeze()
            patch_embeddings.append(emb)

            patch_embeddings = np.stack(patch_embeddings, axis=0)
            image_embedding = patch_embeddings.mean(axis=0)
            return image_embedding, None
        except Exception as e:
            return None, str(e)   
    
class KNNBased:
    """
    KNN-based classifier with outlier rejection using distance thresholding and k optimization.
    """
    def __init__(self, distance_metric, k_range, embedder, threshold=None, seed=None):
        self.distance_metric = distance_metric
        self.k_range = k_range
        self.embedder = embedder
        self.threshold = threshold
        self.seed = seed
    
    @property
    def optimized_param(self):
        return [self.k, self.distance_threshold]
    
    def _prepare_data(self, train): 
        """
        Prepares training embeddings, image paths, and folder names from input data.
        """
        all_embeddings_train = []
        image_paths_train = []
        folder_names_train = []

        for folder_name, folder in train.items():
            for img_path, img_data in folder.items():
                if isinstance(img_data, dict) and "original" in img_data:
                    all_embeddings_train.append(img_data["original"])
                    image_paths_train.append(img_path)
                    folder_names_train.append(folder_name)
                else:
                    all_embeddings_train.append(img_data)
                    image_paths_train.append(img_path)
                    folder_names_train.append(folder_name)

        embeddings_train = np.array(all_embeddings_train)

        return embeddings_train, folder_names_train, image_paths_train

    def fit(self, train, outliers, predicted_outlier_precentage=0):
        """
        Fits the model by optimizing k and distance threshold using the provided training data and outliers.

        Args:
            train: Training data in dictionary format.
            outliers: Array or list of outlier embeddings.
            predicted_outlier_precentage: Expected outlier percentage for threshold tuning.
        """
        embeddings_train, folder_names_train, image_paths_train = self._prepare_data(train)
        self.embeddings_train = embeddings_train
        self.folder_names_train = folder_names_train
        self.image_paths_train = image_paths_train
        self.predicted_percentage = predicted_outlier_precentage
        self.folder_names = sorted(set(folder_names_train))

        if self.seed is not None:
            np.random.seed(self.seed)

        best_k = None
        best_accuracy = float("-inf")

        outliers = outliers if outliers is not None else []
        outliers = np.array(outliers)

        indices_train = np.arange(len(embeddings_train))

        n_inlier = len(indices_train)

        for k in self.k_range:
            predictions = []
            outlier_count = 0

            for val_idx, val_point in enumerate(embeddings_train):
                subset_idx = [idx for idx in indices_train if image_paths_train[idx] != image_paths_train[val_idx]]
                base_points = np.array([embeddings_train[idx] for idx in subset_idx])
                subset_labels = [folder_names_train[idx] for idx in subset_idx]

                if len(base_points) == 0:
                    predictions.append(None)
                    continue

                distances = cdist([val_point], base_points, metric=self.distance_metric).flatten()
                nearest_indices = np.argsort(distances)[:k]
                neighbor_labels = [subset_labels[idx] for idx in nearest_indices]

                label_counts = {}
                for label in neighbor_labels:
                    label_counts[label] = label_counts.get(label, 0) + 1

                sorted_labels = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
                if self.threshold:
                    max_label, max_count = sorted_labels[0]
                    if (max_count / k) >= self.threshold:
                        if len(sorted_labels) > 1:
                            second_label, second_count = sorted_labels[1]
                            if abs((max_count / k) - (second_count / k)) <= 0.05:
                                predictions.append(None)
                                outlier_count += 1
                            else:
                                predictions.append(max_label)
                        else:
                            predictions.append(max_label)
                    else:
                        predictions.append(None)
                        outlier_count += 1
                else:
                    if len(sorted_labels) > 1:
                        max_label, max_count = sorted_labels[0]
                        second_label, second_count = sorted_labels[1]
                        if abs((max_count / k) - (second_count / k)) <= 0.05:
                            predictions.append(None)
                            outlier_count += 1
                        else:
                            predictions.append(max_label)
                    else:
                        predictions.append(sorted_labels[0][0])

            y_true = [folder_names_train[indices_train[i]] for i in range(len(predictions))]
            y_pred = [predictions[i] for i in range(len(predictions))]

            inlier_indices = [idx for idx, pred in enumerate(y_pred) if pred is not None]
            y_true_inlier = [y_true[i] for i in inlier_indices]
            y_pred_inlier = [y_pred[i] for i in inlier_indices]
            inlier_accuracy = accuracy_score(y_true_inlier, y_pred_inlier) if y_true_inlier else 0

            if self.predicted_percentage > 0:
                folder_outlier_indices = [idx for idx, pred in enumerate(y_pred) if pred is None]

                n_outlier_expected = len(indices_train) * (self.predicted_percentage//100)
                n_outlier_predicted = len(folder_outlier_indices)
                
                if n_outlier_expected > 0:
                    outlier_score = max(0, (1 - abs(n_outlier_predicted - n_outlier_expected) / n_outlier_expected))
                else:
                    outlier_score = max(0, (1 - n_outlier_predicted))

                accuracy = inlier_accuracy + (outlier_score * (self.predicted_percentage//100))
            else:
                accuracy = inlier_accuracy

            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_k = k

        self.k = best_k

        inlier_distances = []
        outlier_distances = []

        for val_idx, point in enumerate(embeddings_train):
            subset_idx = [idx for idx in indices_train if image_paths_train[idx] != image_paths_train[val_idx]]
            base_points = np.array([embeddings_train[base_idx] for base_idx in subset_idx])
            distances = cdist([point], base_points, metric=self.distance_metric).flatten()
            nearest_distances = np.sort(distances)[:self.k]
            inlier_distances.append(np.mean(nearest_distances))

        for outlier in outliers:
            distances = cdist([outlier], embeddings_train, metric=self.distance_metric).flatten()
            nearest_distances = np.sort(distances)[:self.k]
            outlier_distances.append(np.mean(nearest_distances))

        best_threshold = None
        best_combined_score = float("-inf")
        candidate_thresholds = np.linspace(min(inlier_distances), max(outlier_distances), 100)

        for threshold in candidate_thresholds:
            inlier_correct = (sum(1 for d in inlier_distances if d <= threshold))/len(inlier_distances)
            outlier_correct = (sum(1 for d in outlier_distances if d > threshold))/len(outlier_distances)
            combined_score = inlier_correct + outlier_correct

            if combined_score > best_combined_score:
                best_combined_score = combined_score
                best_threshold = threshold

        self.distance_threshold = best_threshold

    def predict(self, *img_paths):
        """
        Predicts labels for the input image paths using the optimized k value and distance threshold.

        Args:
            *img_paths: Variable number of image paths.

        Returns:
            results: List of predicted labels or error messages for each input image.
        """
        if not hasattr(self, "k") or self.k is None:
            raise ValueError("Model has not been fitted or k has not been optimized. Please call fit() first.")
        if not hasattr(self, "distance_threshold") or self.distance_threshold is None:
            raise ValueError("Distance threshold has not been optimized. Please call fit() first.")
        
        results = []
        for img_path in img_paths:
            try:
                if urlparse(img_path).scheme in ("http", "https"):
                    img = cv2.imdecode(np.frombuffer(requests.get(img_path).content, np.uint8), 1)
                else:
                    img = cv2.imread(img_path)
                if (img is not None):
                    image = img
                else:
                    raise ValueError("returned image is empty or None")

                embedding, error = embedder.embed(image)

                if embedding is not None:
                    distances = cdist([embedding], self.embeddings_train, metric=self.distance_metric).flatten()
                    nearest_indices = np.argsort(distances)[:self.k]
                    nearest_distances = distances[nearest_indices]
                    neighbor_labels = [self.folder_names_train[idx] for idx in nearest_indices]

                    average_distance = np.mean(nearest_distances)

                    if average_distance > self.distance_threshold: 
                        predicted_label = None
                    else:
                        label_counts = {}
                        for label in neighbor_labels:
                            label_counts[label] = label_counts.get(label, 0) + 1

                        sorted_labels = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
                        if len(sorted_labels) > 0:
                            predicted_label = sorted_labels[0][0]
                        else:
                            predicted_label = None

                    results.append(predicted_label)
                else:
                    results.append("error: " + error)
            except Exception as e:
                print(f"Error processing image {os.path.basename(img_path)}: {e}")
                results.append("error: " + e)
        return results

Looking in indexes: https://pypi.org/simple, https://****@pkgs.dev.azure.com/umicore/DataAnalytics.RBM.Applications/_packaging/DataAnalytics.RBM.Packages/pypi/simple/
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
data_folder = "/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/notebook_results/data/"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model, _, preprocess = open_clip.create_model_and_transforms("ViT-B-32", pretrained="openai")
model = clip_model.to(device).eval()

embedder = embedder_patch(model, device, patch_size=224, normalize=([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]))

train = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_train.npy", allow_pickle=True).item()
test = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_test.npy", allow_pickle=True).item()
train_aug = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_train_aug.npy", allow_pickle=True).item()
no_model = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_no_model.npy", allow_pickle=True).item()
wrong_upload = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_wrong_upload.npy", allow_pickle=True).item()
low_quality = np.load(f"{data_folder}/embeddings/{embedder}/{embedder}_low_quality.npy", allow_pickle=True).item()



In [0]:
outliers = []
for outlier_dict in [low_quality, no_model, wrong_upload]:
    keys = list(outlier_dict["outliers"].keys())
    for key in keys:
        outliers.append(outlier_dict["outliers"][key])

In [0]:
model = KNNBased(("canberra"), range(1, 50), embedder, threshold=0.65, seed=42)
model.fit(train, outliers, 3)

In [0]:
start = time.time()
pred1 = model.predict("/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_02_13_test_images/model_0003/3035402650 750x.jpg")
duration1 = time.time() - start

start = time.time()
pred2 = model.predict("/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_05_14/no_model/1e011bd992.png")
duration2 = time.time() - start

start = time.time()
pred3 = model.predict("/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_05_14/wrong_upload/Schermafbeelding 2025-02-20 160736.png")
duration3 = time.time() - start

start = time.time()
pred4 = model.predict(
    "/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_05_14/wrong_upload/image_Nicolas_ann.png",
    "/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_05_14/low_quality/001lq19.png",
    "/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_02_13_test_images/model_0002/3053605536 011.jpg",
    "/Volumes/rbm_playground/gritty_goat/image_examples_stage_grim/2025_02_13_test_images/model_0004/YX0975 FEG CS 0010.tif",
    "https://cdn.ymaws.com/www.thegraphenecouncil.org/resource/resmgr/images/products/product/dec/product_1/product_2/chart_3.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a4/Misc_pollen.jpg/1200px-Misc_pollen.jpg"
)
duration4 = time.time() - start

print(f"{pred1[0]} ({duration1:.3f}s)")
print(f"{pred2[0]} ({duration2:.3f}s)")
print(f"{pred3[0]} ({duration3:.3f}s)")
print(f"{pred4} ({duration4:.3f}s -> {(duration4/len(pred4)):.2f}s/img)")

total_images = 3 + len(pred4)
total_duration = duration1 + duration2 + duration3 + duration4
average_duration = total_duration / total_images

print(f"Average embedding + prediction time per image: {average_duration:.3f} seconds")

model_0003 (3.371s)
None (1.147s)
None (0.448s)
[None, None, 'model_0002', 'model_0004', 'model_0001', None] (11.256s -> 1.88s/img)
Average embedding + prediction time per image: 1.803 seconds
