In [1]:
# Step 2: Import Modules
import torch
from PIL import Image
from torchvision import transforms
import glob
import os
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

from WinCLIP.model import WinClipAD




In [2]:
# Step 3: Define Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [3]:
# Step 4: Initialize the Model with Correct Image Sizes
out_size_h, out_size_w = 240, 240  # Set to 240 instead of 224
backbone = 'ViT-B-16-plus-240'
pretrained_dataset = 'laion400m_e32'
#scales = [224, 256, 384]  # You can keep the scales as is

#scales = [192, 224, 240]  # Adjusted scales
scales = [2, 3, 5, 7, 15]  # Example scales that fit within a 15x15 grid

precision = 'fp32'  # Use 'fp16' if your GPU supports mixed precision
img_resize = 256
img_cropsize = 240  # Set to 240 instead of 224

model = WinClipAD(
    out_size_h=out_size_h,
    out_size_w=out_size_w,
    device=device,
    backbone=backbone,
    pretrained_dataset=pretrained_dataset,
    scales=scales,
    precision=precision,
    img_resize=img_resize,
    img_cropsize=img_cropsize
)


  checkpoint = torch.load(checkpoint_path, map_location=map_location)


self.grid_size (15, 15)
fusion version: textual_visual


In [4]:
# Step 5: Build Text Feature Gallery
category = 'traffic sign'
model.build_text_feature_gallery(category)


In [5]:
# Step 6: Prepare Data Transforms with Correct Crop Size
def _convert_to_rgb(image):
    return image.convert('RGB')

transform = transforms.Compose([
    transforms.Resize((img_resize, img_resize), Image.BICUBIC),
    transforms.CenterCrop(img_cropsize),  # Now crops to 240x240
    _convert_to_rgb,
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], std=[0.26862954, 0.26130258, 0.27577711])
])


In [6]:
# Modify abnormal prompts in ad_prompts.py (if accessible)
# For example, add phrases like 'damaged', 'vandalized', 'graffiti on', 'broken', etc.

# Alternatively, directly create custom abnormal phrases
abnormal_phrases = [
    'A photo of a damaged traffic sign.',
    'A picture of a vandalized traffic sign.',
    'An image of a traffic sign with graffiti.',
    'A photo of a broken traffic sign.',
    'A picture of a traffic sign covered in stickers.'
]
# Define additional abnormal phrases based on your categories
additional_abnormal_phrases = [
    'A picture of a faded traffic sign.',
    'A photo of a traffic sign with graffiti.',
    'A photo of a vandalized traffic sign.',
    'An image of a traffic sign with other types of vandalism.'
    'Traffic signs which are spray painted'
]

abnormal_phrases.extend(additional_abnormal_phrases)
# Tokenize abnormal phrases
abnormal_phrases_tokenized = model.tokenizer(abnormal_phrases).to(device)

# Encode abnormal text features
abnormal_text_features = model.encode_text(abnormal_phrases_tokenized)
abnormal_text_features /= abnormal_text_features.norm(dim=-1, keepdim=True)

# Combine with normal text features
# Use existing normal_phrases from the model or define similarly
normal_phrases = [
    'A photo of a traffic sign.',
    'A clear picture of a traffic sign.',
    'An image of an intact traffic sign.',
    'A photo of a standard traffic sign.',
    'A picture of an undamaged traffic sign.'
]

# Tokenize and encode normal phrases
normal_phrases_tokenized = model.tokenizer(normal_phrases).to(device)
normal_text_features = model.encode_text(normal_phrases_tokenized)
normal_text_features /= normal_text_features.norm(dim=-1, keepdim=True)

# Compute average text features
model.avr_normal_text_features = normal_text_features.mean(dim=0, keepdim=True)
model.avr_abnormal_text_features = abnormal_text_features.mean(dim=0, keepdim=True)

# Update text features in the model
model.text_features = torch.cat([
    model.avr_normal_text_features,
    model.avr_abnormal_text_features
], dim=0)
model.text_features /= model.text_features.norm(dim=-1, keepdim=True)


In [7]:
model.transform

Compose(
    Resize(size=(256, 256), interpolation=bicubic, max_size=None, antialias=True)
    CenterCrop(size=(240, 240))
    <function _convert_to_rgb at 0x7f4d0eee8820>
    ToTensor()
    Normalize(mean=[0.48145466, 0.4578275, 0.40821073], std=[0.26862954, 0.26130258, 0.27577711])
)

In [8]:
# Paths to normal images
normal_image_paths = glob.glob('/home/schada/Desktop/damaged_traffic_sign_detection/data/good_examples/*.*')
#additional_normal_image_paths = glob.glob('/path/to/your/additional/normal/images/*.*')

# Combine all normal image paths
all_normal_image_paths = normal_image_paths #+ additional_normal_image_paths

# Transform and load images
normal_images = []
for img_path in all_normal_image_paths:
    try:
        img = Image.open(img_path).convert('RGB')  # Ensure RGB mode
        img_tensor = transform(img).unsqueeze(0)  # Add batch dimension
        normal_images.append(img_tensor)
    except Exception as e:
        print(f"Error loading image {img_path}: {e}")  # Handle any image loading issues

# Concatenate and build feature gallery
if normal_images:  # Check if the list is not empty
    normal_images_tensor = torch.cat(normal_images, dim=0).to(device)
    model.build_image_feature_gallery(normal_images_tensor)
else:
    print("No valid normal images found to build the feature gallery.")


In [9]:
import glob
import os

# Define the base directory where your images are stored
base_dir = '/home/schada/Desktop/damaged_traffic_sign_detection/data/RDX_cropped _dir'

# List of damaged directories
damaged_dirs = ['bent', 'broken', 'faded', 'graffiti', 'multiple_signs', 'other_vandalism', 'tilted','vandalized']

# Initialize lists for image paths and labels
test_image_paths = []
test_labels = []

# Load normal images
normal_image_paths = glob.glob(os.path.join(base_dir, 'undamaged', '*.*'))
test_image_paths.extend(normal_image_paths)
test_labels.extend([0] * len(normal_image_paths))  # Label 0 for normal images

# Load damaged images
for damaged_dir in damaged_dirs:
    damaged_image_paths = glob.glob(os.path.join(base_dir, damaged_dir, '*.*'))
    test_image_paths.extend(damaged_image_paths)
    test_labels.extend([1] * len(damaged_image_paths))  # Label 1 for damaged images

print(f'Total images: {len(test_image_paths)}')
print(f'Number of normal images: {len(normal_image_paths)}')
print(f'Number of damaged images: {len(test_image_paths) - len(normal_image_paths)}')


Total images: 1008
Number of normal images: 0
Number of damaged images: 1008


In [10]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Custom dataset class
class TrafficSignDataset(Dataset):
    def __init__(self, image_paths, labels, transform):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        try:
            img = Image.open(img_path).convert('RGB')
            img_tensor = self.transform(img)
            return img_tensor, label
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            return None, label  # Handle this case in the DataLoader

# Create dataset and dataloader
transform = model.transform  # Use the model's transform
dataset = TrafficSignDataset(test_image_paths, test_labels, transform)

batch_size = 16  # Adjust based on your GPU memory
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Compute anomaly scores
anomaly_scores = []
true_labels = []

model.eval_mode()  # Set the model to evaluation mode

with torch.no_grad():
    for batch in dataloader:
        batch_images, batch_labels = batch
        # Remove any None entries
        batch_images = [img for img in batch_images if img is not None]
        batch_labels = [label for img, label in zip(batch_images, batch_labels) if img is not None]
        
        if not batch_images:
            continue  # Skip if batch is empty

        batch_images = torch.stack(batch_images).to(device)
        anomaly_maps = model(batch_images)
        
        # Compute anomaly scores
        for anomaly_map in anomaly_maps:
            anomaly_score = anomaly_map.mean()
            anomaly_scores.append(anomaly_score.item())
        
        true_labels.extend(batch_labels)


In [11]:
import wandb
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve
import numpy as np

# Initialize a new W&B run
wandb.init(project="Winclip traffic sign classification", name="new validationb set")

# Compute ROC AUC
auc_score = roc_auc_score(true_labels, anomaly_scores)
print(f'ROC AUC: {auc_score:.4f}')

# Log the AUC score to W&B
wandb.log({"ROC AUC": auc_score})

# Determine the optimal threshold (e.g., using Youden's J statistic)
fpr, tpr, thresholds = roc_curve(true_labels, anomaly_scores)
j_scores = tpr - fpr
j_ordered = sorted(zip(j_scores, thresholds))
threshold = j_ordered[-1][1]

# Predict labels based on the threshold
predicted_labels = [1 if score >= threshold else 0 for score in anomaly_scores]

# Compute evaluation metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, zero_division=0)
recall = recall_score(true_labels, predicted_labels, zero_division=0)
f1 = f1_score(true_labels, predicted_labels, zero_division=0)
cm = confusion_matrix(true_labels, predicted_labels)

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1-score: {f1:.4f}')
print('Confusion Matrix:')
print(cm)

# Log metrics to W&B
wandb.log({
    "Accuracy": accuracy,
    "Precision": precision,
    "Recall": recall,
    "F1-score": f1,
    "Confusion Matrix": cm.tolist(),  # Convert the confusion matrix to a list for logging
    "Optimal Threshold": threshold
})

# Optionally, log ROC curve
# wandb.log({
#     "ROC Curve": wandb.plot.roc_curve(true_labels, anomaly_scores)
# })

# Finish the W&B run
wandb.finish()


[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33msahith_chada[0m ([33mhonda-sv[0m). Use [1m`wandb login --relogin`[0m to force relogin


ValueError: Only one class present in y_true. ROC AUC score is not defined in that case.

In [12]:
import os
import shutil

# Create output directories
output_dir = "/home/schada/Desktop/damaged_traffic_sign_detection/WinClip/output_images"
categories = ["true_positive", "false_positive", "true_negative", "false_negative"]

# Ensure the directories exist
for category in categories:
    dir_path = os.path.join(output_dir, category)
    os.makedirs(dir_path, exist_ok=True)

# Iterate through the data again and categorize images
for img_path, true_label, pred_label in zip(test_image_paths, true_labels, predicted_labels):
    if true_label == 1 and pred_label == 1:
        category = "true_positive"
    elif true_label == 0 and pred_label == 1:
        category = "false_positive"
    elif true_label == 0 and pred_label == 0:
        category = "true_negative"
    elif true_label == 1 and pred_label == 0:
        category = "false_negative"
    else:
        continue  # This should not happen if labels are binary (0 or 1)

    # Destination path
    destination_path = os.path.join(output_dir, category, os.path.basename(img_path))

    try:
        # Copy the image to the respective category directory
        shutil.copy(img_path, destination_path)
    except Exception as e:
        print(f"Error copying image {img_path}: {e}")

print("Images have been categorized into directories.")


Images have been categorized into directories.
