In [3]:
import os, sys
from pathlib import Path
sys.path.append(str(Path().resolve().parents[2]))
import numpy as np
from PIL import Image
import torch
import torchvision.transforms.functional as TF
from sklearn.model_selection import train_test_split
from datasets import Dataset, Features, ClassLabel, DatasetDict, load_from_disk, Image as HFImage
from torch.utils.data import TensorDataset, DataLoader
import tqdm.auto as tqdm
import cv2
from sklearn.cluster import KMeans
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import time
from src.chatbot import pathtree
from dprep import data_prep

In [4]:
def tensor_to_cv_image(tensor):
    img = tensor.permute(1, 2, 0).numpy()  # CHW to HWC format
    img = (img * 255).astype(np.uint8)
    return img

# Feature extraction functions
def extract_sift_features(img, max_features=100):
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    sift = cv2.SIFT_create(nfeatures=max_features)
    keypoints, descriptors = sift.detectAndCompute(gray, None)
    return keypoints, descriptors

def extract_surf_features(img, hessian_threshold=400):
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    # Note: SURF is patented and may not be available in all OpenCV builds
    try:
        surf = cv2.xfeatures2d.SURF_create(hessianThreshold=hessian_threshold)
        keypoints, descriptors = surf.detectAndCompute(gray, None)
        return keypoints, descriptors
    except:
        print("SURF not available in this OpenCV build. Falling back to SIFT.")
        return extract_sift_features(img)

In [5]:
save_dir = pathtree().get("dataset")
if "percom3224" not in os.listdir(save_dir):
    dataset = data_prep(
        resize=(600, 600), save_dir=os.path.join(save_dir, "percom3224")
    )
else:
    dataset = load_from_disk(pathtree().get("percom3224"))

In [6]:
dataset

Dataset({
    features: ['image', 'label'],
    num_rows: 2353
})

In [None]:
if "pcda" not in os.listdir(save_dir):
    os.makedirs(os.path.join(save_dir, "pcda"), exist_ok=True)
    original_labels = dataset.features["label"].names
    features = Features({
        "image": HFImage(),
        "label": ClassLabel(names=original_labels)
    })
    
    with tqdm.tqdm(total=len(dataset), desc="Extracting Examples") as main_bar:    
        examples = []
        for example in dataset:
            examples.append({"image": example["image"], "label": example["label"]})
            main_bar.update(1)

    train_data, test_data = train_test_split(examples, test_size=0.2, random_state=42)

    pbar = tqdm.tqdm(total=3, desc="Creating dataset")

    pbar.set_description("Processing train set")
    hf_train_dataset = Dataset.from_dict({
        "image": [ex["image"] for ex in train_data],
        "label": [ex["label"] for ex in train_data]
    }, features=features)
    pbar.update(1)
    
    pbar.set_description("Processing test set")
    hf_test_dataset = Dataset.from_dict({
        "image": [ex["image"] for ex in test_data],
        "label": [ex["label"] for ex in test_data]
    }, features=features)
    pbar.update(1)
    
    pbar.set_description("Saving dataset")
    dataset = DatasetDict({"train": hf_train_dataset, "test": hf_test_dataset})
    dataset.save_to_disk(os.path.join(save_dir, "pcda"))
    pbar.update(1)
    pbar.close()
else:
    dataset = load_from_disk(pathtree().get("pcda"))

In [None]:
def pre_data(example, target_size=(600, 600)):
    image = example["image"]
    if not isinstance(image, Image.Image):
        image = Image.fromarray(np.array(image))
    image = image.resize(target_size)
    image_np = np.array(image)

    if image_np.ndim == 2: 
        image_np = np.expand_dims(image_np, axis=-1)
        image_np = np.repeat(image_np, 3, axis=-1)  
    elif image_np.shape[-1] == 1: 
        image_np = np.repeat(image_np, 3, axis=-1)
    elif image_np.shape[-1] == 4: 
        image_np = image_np[..., :3]  
    elif image_np.shape[-1] != 3: 
        raise ValueError(f"Unexpected number of channels: {image_np.shape[-1]}")

    image_tensor = torch.as_tensor(image_np).float()
    image_tensor = image_tensor.permute(2, 0, 1) 
    image_normalized = image_tensor / 255.0
    return {"image": image_normalized, "label": example["label"]}

pre_dir = os.path.join(save_dir, "preprocessed")
if not ("train_preprocessed" in os.listdir(pre_dir) and "test_preprocessed" in os.listdir(pre_dir)):
    train_data = dataset['train'].map(pre_data)
    test_data = dataset['test'].map(pre_data)
    train_data.save_to_disk(os.path.join(pre_dir, "train_preprocessed"))
    test_data.save_to_disk(os.path.join(pre_dir, "test_preprocessed"))
else:
    train_data = load_from_disk(os.path.join(pre_dir, "train_preprocessed"))
    test_data = load_from_disk(os.path.join(pre_dir, "test_preprocessed"))


In [None]:
if not all(fname in os.listdir(pre_dir) for fname in ["train_images.npy", "train_labels.npy", "test_images.npy", "test_labels.npy"]):
    os.makedirs(os.path.join(pre_dir, "preprocessed"), exist_ok=True)
    train_images = [torch.as_tensor(data["image"]) if not isinstance(data["image"], torch.Tensor) else data["image"] for data in train_data]
    train_labels = [data["label"] for data in train_data]
    test_images = [torch.as_tensor(data["image"]) if not isinstance(data["image"], torch.Tensor) else data["image"] for data in test_data]
    test_labels = [data["label"] for data in test_data]

    train_images = torch.stack(train_images)
    train_labels = torch.tensor(train_labels, dtype=torch.long) 
    test_images = torch.stack(test_images)
    test_labels = torch.tensor(test_labels, dtype=torch.long)
    
    print("Train images shape:", train_images.shape, "dtype:", train_images.dtype)

    np.save(os.path.join(pre_dir, 'train_images.npy'), train_images.numpy())
    np.save(os.path.join(pre_dir, 'train_labels.npy'), train_labels.numpy())
    np.save(os.path.join(pre_dir, 'test_images.npy'), test_images.numpy())
    np.save(os.path.join(pre_dir, 'test_labels.npy'), test_labels.numpy())
else:
    train_images = np.load(os.path.join(pre_dir, 'train_images.npy'), allow_pickle=True)
    train_labels = np.load(os.path.join(pre_dir, 'train_labels.npy'), allow_pickle=True)
    test_images = np.load(os.path.join(pre_dir, 'test_images.npy'), allow_pickle=True)
    test_labels = np.load(os.path.join(pre_dir, 'test_labels.npy'), allow_pickle=True)

    train_images = torch.from_numpy(train_images)
    train_labels = torch.from_numpy(train_labels)
    test_images = torch.from_numpy(test_images)
    test_labels = torch.from_numpy(test_labels)
    print("Loaded train images shape:", train_images.shape, "dtype:", train_images.dtype)

train_dataset = TensorDataset(train_images, train_labels)
test_dataset = TensorDataset(test_images, test_labels)

train_dataset = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_dataset = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
# Function to build vocabulary (codebook) from features
def build_vocabulary(features_list, k=100):
    print(f"Building vocabulary with {len(features_list)} feature sets...")
    # Concatenate all features
    all_features = np.vstack([features for features in features_list if features is not None])
    print(f"Clustering {all_features.shape[0]} features into {k} visual words...")
    
    # Perform K-means clustering
    kmeans = KMeans(n_clusters=k, random_state=42, verbose=1)
    kmeans.fit(all_features)
    
    return kmeans

# Function to create BoW histograms
def create_bow_histograms(kmeans, features_list, k=100):
    histograms = []
    for features in features_list:
        if features is None:
            # If no features were detected, use an empty histogram
            histogram = np.zeros(k)
        else:
            # Assign each feature to the nearest cluster
            assignments = kmeans.predict(features)
            # Create histogram
            histogram = np.zeros(k)
            for assignment in assignments:
                histogram[assignment] += 1
            # Normalize histogram
            if np.sum(histogram) > 0:
                histogram = histogram / np.sum(histogram)
        histograms.append(histogram)
    return np.array(histograms)

# Extract features from all training images
print("Extracting SIFT features from training images...")
train_features = []
with tqdm.tqdm(total=len(train_images), desc="Extracting train features") as pbar:
    for i, (image, label) in enumerate(zip(train_images, train_labels)):
        img = tensor_to_cv_image(image)
        _, descriptors = extract_sift_features(img)
        train_features.append(descriptors)
        pbar.update(1)

# Build vocabulary
vocabulary_size = 100  # Number of visual words
kmeans = build_vocabulary(train_features, k=vocabulary_size)

# Create BoW representations for training data
train_bow_features = create_bow_histograms(kmeans, train_features, k=vocabulary_size)

# Extract features from test images
print("Extracting SIFT features from test images...")
test_features = []
with tqdm.tqdm(total=len(test_images), desc="Extracting test features") as pbar:
    for i, (image, _) in enumerate(zip(test_images, test_labels)):
        img = tensor_to_cv_image(image)
        _, descriptors = extract_sift_features(img)
        test_features.append(descriptors)
        pbar.update(1)

# Create BoW representations for test data
test_bow_features = create_bow_histograms(kmeans, test_features, k=vocabulary_size)

In [None]:
# Train a classifier on the BoW features
print("Training SVM classifier...")
classifier = SVC(kernel='rbf', probability=True)
start_time = time.time()
classifier.fit(train_bow_features, train_labels.numpy() if hasattr(train_labels, 'numpy') else train_labels)
training_time = time.time() - start_time
print(f"Training completed in {training_time:.2f} seconds")

# Make predictions on test data
print("Making predictions...")
test_labels_np = test_labels.numpy() if hasattr(test_labels, 'numpy') else test_labels
predictions = classifier.predict(test_bow_features)

# Evaluate the classifier
accuracy = accuracy_score(test_labels_np, predictions)
print(f"Accuracy: {accuracy:.4f}")
print("\nClassification Report:")
print(classification_report(test_labels_np, predictions, target_names=dataset['train'].features["label"].names, labels=np.arange(len(dataset['train'].features["label"].names))))

# Visualize some examples of correct and incorrect predictions
def visualize_predictions(images, true_labels, pred_labels, class_names, num_examples=5):
    import matplotlib.pyplot as plt
    from random import sample
    
    # Ensure arrays are numpy arrays
    if hasattr(true_labels, 'numpy'):
        true_labels = true_labels.numpy()
    if hasattr(pred_labels, 'numpy'):
        pred_labels = pred_labels.numpy()
    
    # Find indices of correct and incorrect predictions
    correct = np.where(true_labels == pred_labels)[0]
    incorrect = np.where(true_labels != pred_labels)[0]
    
    # Sample examples (or take all if less than num_examples)
    try:
        correct_samples = sample(list(correct), min(num_examples, len(correct)))
    except ValueError:
        correct_samples = correct[:min(num_examples, len(correct))]
        
    try:
        incorrect_samples = sample(list(incorrect), min(num_examples, len(incorrect)))
    except ValueError:
        incorrect_samples = incorrect[:min(num_examples, len(incorrect))]
    
    # Plot correct predictions
    if len(correct_samples) > 0:
        plt.figure(figsize=(15, 5))
        plt.suptitle("Correct Predictions", fontsize=14)
        for i, idx in enumerate(correct_samples):
            plt.subplot(1, len(correct_samples), i+1)
            img = tensor_to_cv_image(images[idx])
            plt.imshow(img)
            plt.title(f"True: {class_names[true_labels[idx]]}\nPred: {class_names[pred_labels[idx]]}")
            plt.axis('off')
        plt.tight_layout()
        plt.show()
    else:
        print("No correct predictions to display.")
    
    # Plot incorrect predictions
    if len(incorrect_samples) > 0:
        plt.figure(figsize=(15, 5))
        plt.suptitle("Incorrect Predictions", fontsize=14)
        for i, idx in enumerate(incorrect_samples):
            plt.subplot(1, len(incorrect_samples), i+1)
            img = tensor_to_cv_image(images[idx])
            plt.imshow(img)
            plt.title(f"True: {class_names[true_labels[idx]]}\nPred: {class_names[pred_labels[idx]]}")
            plt.axis('off')
        plt.tight_layout()
        plt.show()
    else:
        print("No incorrect predictions to display.")

visualize_predictions(
    test_images, 
    test_labels,
    predictions, 
    dataset['train'].features["label"].names
)

import pickle
save_model = pathtree().get("models")
os.makedirs(save_model, exist_ok=True)  # Make sure directory exists
model_save_path = os.path.join(save_model, 'sift_bow_svm_model.pkl')
try:
    with open(model_save_path, 'wb') as f:
        pickle.dump({
            'classifier': classifier, 
            'kmeans': kmeans, 
            'class_names': dataset['train'].features["label"].names,
            'vocabulary_size': vocabulary_size
        }, f)
    print(f"Model saved to {model_save_path}")
except Exception as e:
    print(f"Error saving model: {e}")
    # Additional debug info
    print(f"Model path: {save_model}")
    print(f"Model components types: classifier={type(classifier)}, kmeans={type(kmeans)}")