# Setting up the Model

In [6]:
#import necessary libraries
import os
import torch
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import torch.nn as nn
import shutil
import numpy as np

In [7]:
# Constants
BATCH = 16
IM_SIZE = 224
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL_PATH = r"C:\Users\linds\OneDrive\Documents\MSBA-lindsayslaptop\Fall\Advanced Machine Learning\Best Model.pth"

# Load the dataset
df = pd.read_csv(r"C:\ML_Project_Data\train.csv")

# Filter dataset with hotel_ids >= 20
s1 = df["hotel_id"].value_counts()
s2 = s1[s1 >= 20]
hotel_id_list = s2.index.tolist()
df1 = df[df["hotel_id"].isin(hotel_id_list)]

In [8]:
# How many classes do we have and how much total data?

num_classes = len(df1['hotel_id'].unique())
print(f"Number of classes: {num_classes}")

num_images = len(df1)
print(f"Total number of images: {num_images}")


Number of classes: 1062
Total number of images: 38389


In [11]:
# Train-test split (90:10)
train_df, test_df = train_test_split(df1, test_size=0.1, stratify=df1['hotel_id'], random_state=42)

# Map hotel_id to indices for classification
unique_hotel_ids = sorted(train_df['hotel_id'].unique())
hotel_id_to_index = {hotel_id: idx for idx, hotel_id in enumerate(unique_hotel_ids)}
index_to_hotel_id = {idx: hotel_id for hotel_id, idx in hotel_id_to_index.items()}

# Add the class index to the DataFrame
train_df['class_index'] = train_df['hotel_id'].map(hotel_id_to_index)
test_df['class_index'] = test_df['hotel_id'].map(hotel_id_to_index)

# Define transformations
Transform = transforms.Compose([
    transforms.Resize((IM_SIZE, IM_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# Dataset Class
class HotelImageDataset(Dataset):
    def __init__(self, df, image_dir, transform=None):
        self.df = df
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.iloc[idx]['image']
        label = self.df.iloc[idx]['class_index']
        img_path = os.path.join(self.image_dir, str(self.df.iloc[idx]['chain']), img_name)
        
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [12]:
# Initialize datasets and loaders
image_dir = r"C:\ML_Project_Data\train_images"
test_dataset = HotelImageDataset(test_df, image_dir, Transform)
test_loader = DataLoader(test_dataset, batch_size=BATCH, shuffle=False)

In [13]:
# Initialize the model
num_classes = len(unique_hotel_ids)  # Ensure this matches your friend's training
model = models.resnet50(pretrained=False)  # Pretrained is False since we are loading weights
model.fc = nn.Linear(model.fc.in_features, num_classes)



In [14]:
# Load the model weights
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model = model.to(DEVICE)
model.eval()

  model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [15]:
# Validation code
criterion = nn.CrossEntropyLoss()
TOP_X = 5

val_loss = 0.0
correct = 0
total = 0
topX_correct = 0

with torch.no_grad():
    for images, labels in tqdm(test_loader, desc="Validation"):
        images, labels = images.to(DEVICE), labels.to(DEVICE)
        outputs = model(images)
        loss = criterion(outputs, labels)
        val_loss += loss.item()

        # Top-1 accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Top-5 accuracy
        _, predicted_topX = outputs.data.topk(TOP_X, dim=1, largest=True, sorted=True)
        labels_expanded = labels.view(-1, 1).expand_as(predicted_topX)
        topX_correct += (predicted_topX == labels_expanded).sum().item()

avg_val_loss = val_loss / len(test_loader)
top1_accuracy = 100 * correct / total
topX_accuracy = 100 * topX_correct / total

print(f"Validation Loss: {avg_val_loss:.4f}, Top-1 Accuracy: {top1_accuracy:.2f}%, Top-{TOP_X} Accuracy: {topX_accuracy:.2f}%")


Validation: 100%|██████████| 240/240 [04:39<00:00,  1.16s/it]

Validation Loss: 1.5563, Top-1 Accuracy: 73.35%, Top-5 Accuracy: 86.35%





# Exploring Image Accuracy

In [16]:
#okay let's see what hotels had the most images in the test data nad their accuracy to find the best options for TCAV

# Initialize variables for storing results
hotel_image_counts = {}
hotel_correct_counts = {}

# Ensure the model is in evaluation mode
model.eval()

# Initialize lists to store predictions and labels for all test samples
all_predictions = []
all_labels = []

# Process test data
with torch.no_grad():
    for images, labels in tqdm(test_loader, desc="Evaluating Model on Test Data"):
        images, labels = images.to(DEVICE), labels.to(DEVICE)

        # Get model predictions
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

        # Collect predictions and labels for all samples
        all_predictions.extend(preds.cpu().numpy())  # Append batch predictions
        all_labels.extend(labels.cpu().numpy())     # Append batch true labels

        # Loop through each image in the batch
        for label, pred in zip(labels.cpu().numpy(), preds.cpu().numpy()):
            hotel_id = index_to_hotel_id[label]  # Map class index back to hotel_id

            # Update image counts for this hotel ID
            if hotel_id not in hotel_image_counts:
                hotel_image_counts[hotel_id] = 0
                hotel_correct_counts[hotel_id] = 0

            hotel_image_counts[hotel_id] += 1

            # Check if the prediction was correct
            if label == pred:
                hotel_correct_counts[hotel_id] += 1

# Create a DataFrame to analyze the results
results_df = pd.DataFrame({
    "hotel_id": list(hotel_image_counts.keys()),
    "image_count": list(hotel_image_counts.values()),
    "correct_count": list(hotel_correct_counts.values())
})

# Calculate accuracy for each hotel ID
results_df["accuracy"] = results_df["correct_count"] / results_df["image_count"]

# Sort by the number of images and take the top 20 hotel IDs
top_20_hotels = results_df.sort_values(by=["image_count", "accuracy"], ascending=[False, False]).head(20)

# Display the results
print(top_20_hotels)

# Ensure all predictions and labels align with test_df
assert len(all_predictions) == len(test_df), "Mismatch between predictions and test dataset size"
assert len(all_labels) == len(test_df), "Mismatch between labels and test dataset size"


Evaluating Model on Test Data: 100%|██████████| 240/240 [03:20<00:00,  1.20it/s]

     hotel_id  image_count  correct_count  accuracy
172     36363           10              7  0.700000
505     53586           10              6  0.600000
38      60181            9              8  0.888889
546     17759            9              8  0.888889
557      4869            9              8  0.888889
643     64314            9              8  0.888889
315     18807            9              7  0.777778
15       9520            8              8  1.000000
112     18002            8              8  1.000000
116     58884            8              8  1.000000
156     28917            8              8  1.000000
173     24833            8              8  1.000000
177      6524            8              8  1.000000
223     13849            8              8  1.000000
227     16161            8              8  1.000000
232     56874            8              8  1.000000
292     32791            8              8  1.000000
322     24421            8              8  1.000000
434     1026




In [22]:
# Sort by the number of images and take the top 20 hotel IDs
bottom_20_hotels = results_df.sort_values(by=["accuracy", "image_count"], ascending=[True, False]).head(20)

# Display the results
print(bottom_20_hotels)

      hotel_id  image_count  correct_count  accuracy
51       35545            5              0       0.0
175      15195            4              0       0.0
915      33836            4              0       0.0
176      41106            3              0       0.0
341      33839            3              0       0.0
498      30636            3              0       0.0
604      37077            3              0       0.0
711       8439            3              0       0.0
719      16978            3              0       0.0
737      35739            3              0       0.0
794      17451            3              0       0.0
800      49015            3              0       0.0
958      52588            3              0       0.0
1052     14388            3              0       0.0
3        65273            2              0       0.0
13       42982            2              0       0.0
31       61142            2              0       0.0
45        6800            2              0    

In [None]:
#let's look at how many image they were trained on

# Base directory containing the training data
csv_path = r"C:\ML_Project_Data\train.csv"

# Load the CSV file
try:
    df = pd.read_csv(csv_path)
except FileNotFoundError:
    print(f"File not found: {csv_path}")
    exit()

# List of hotel IDs to count images for
hotel_ids = [60181, 17759, 4869, 64314, 18807, 35545, 15195, 33836, 41106, 33839]

# Count the number of images for each hotel ID
image_counts = df['hotel_id'].value_counts()

# Filter counts for the specified hotel IDs
hotel_image_counts = {hotel_id: image_counts.get(hotel_id, 0) for hotel_id in hotel_ids}

# Display the results
for hotel_id, count in hotel_image_counts.items():
    print(f"Hotel ID {hotel_id}: {count} images")


Hotel ID 60181: 86 images
Hotel ID 17759: 84 images
Hotel ID 4869: 84 images
Hotel ID 64314: 87 images
Hotel ID 18807: 92 images
Hotel ID 35545: 48 images
Hotel ID 15195: 35 images
Hotel ID 33836: 41 images
Hotel ID 41106: 28 images
Hotel ID 33839: 32 images


In [21]:
# Now let's create files of the images we might use TCAV on to explore concepts

# Define the hotel IDs and corresponding image directories
target_hotel_ids = [60181, 17759, 4869, 64314, 18807, 9520, 18002, 58884]

# Paths
image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of images
output_dir = r"C:\ML_Project_Data\TCAV_Hotel_Images"  # Output directory for organizing images

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Iterate through the test set
for hotel_id in target_hotel_ids:
    # Create directories for the hotel
    hotel_dir = os.path.join(output_dir, str(hotel_id))
    correct_dir = os.path.join(hotel_dir, "correct")
    incorrect_dir = os.path.join(hotel_dir, "incorrect")
    os.makedirs(correct_dir, exist_ok=True)
    os.makedirs(incorrect_dir, exist_ok=True)

    # Filter test set for the current hotel ID
    hotel_images = test_df[test_df['hotel_id'] == hotel_id]
    
    # Iterate through the images for this hotel ID
    for idx, row in hotel_images.iterrows():
        image_name = row['image']
        chain = row['chain']
        true_label = row['class_index']
        
        # Get the predicted label for this image
        image_index = test_df.index.get_loc(idx)  # Get the index of the image in test DataFrame
        predicted_label = all_predictions[image_index]  # Use all_predictions instead of undefined predicted
        
        # Determine if the classification was correct
        is_correct = (predicted_label == true_label)
        
        # Source and destination paths
        src_path = os.path.join(image_dir, str(chain), image_name)
        if is_correct:
            dest_path = os.path.join(correct_dir, image_name)
        else:
            dest_path = os.path.join(incorrect_dir, image_name)
        
        # Copy the image
        shutil.copy(src_path, dest_path)

print("Image organization complete!")


Image organization complete!


In [24]:
#Let's also create a file of badly classified images to see why they were hard to classify

# Define the hotel IDs and corresponding image directories
worst_hotel_ids = [35545, 15195, 33836]

# Paths
image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of images
output_dir = r"C:\ML_Project_Data\Worst_Classifications"  # Output directory for organizing images

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Ensure the model is in evaluation mode
model.eval()

# Process test set and organize images for worst classifications
with torch.no_grad():
    for hotel_id in worst_hotel_ids:
        # Create a directory for each hotel
        hotel_dir = os.path.join(output_dir, str(hotel_id))
        os.makedirs(hotel_dir, exist_ok=True)

        # Filter test set for the current hotel ID
        hotel_images = test_df[test_df['hotel_id'] == hotel_id]
        
        # Iterate through the images for this hotel ID
        for idx, row in hotel_images.iterrows():
            image_name = row['image']
            chain = row['chain']

            # Source and destination paths
            src_path = os.path.join(image_dir, str(chain), image_name)
            dest_path = os.path.join(hotel_dir, image_name)
            
            # Copy the image
            shutil.copy(src_path, dest_path)

print("Worst classifications images have been organized.")


Worst classifications images have been organized.


In [25]:
# Okay let's look at the training images for hte bad classifications

# Paths
training_output_dir = r"C:\ML_Project_Data\Worst_Classifications"  # Parent directory for worst classifications
image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of images

# Ensure the directory exists
os.makedirs(training_output_dir, exist_ok=True)

# Process training set to organize images for specified hotels
for hotel_id in worst_hotel_ids:
    # Create a directory for the training images of each hotel
    training_hotel_dir = os.path.join(training_output_dir, f"{hotel_id}_training")
    os.makedirs(training_hotel_dir, exist_ok=True)

    # Filter the training dataset for the current hotel ID
    hotel_train_images = train_df[train_df['hotel_id'] == hotel_id]

    # Iterate through the training images for this hotel ID
    for idx, row in hotel_train_images.iterrows():
        image_name = row['image']
        chain = row['chain']

        # Source and destination paths
        src_path = os.path.join(image_dir, str(chain), image_name)
        dest_path = os.path.join(training_hotel_dir, image_name)

        # Copy the image
        shutil.copy(src_path, dest_path)

print("Training images for worst classifications have been organized.")


Training images for worst classifications have been organized.


In [29]:
# Let's see what they were misclassified as

# Paths
output_dir = r"C:\ML_Project_Data\Worst_Classifications"  # Parent directory for worst classifications
image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of images

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# Ensure the model is in evaluation mode
model.eval()

# Process the worst hotels
for hotel_id in worst_hotel_ids:
    # Create directories for the misclassified test images
    hotel_dir = os.path.join(output_dir, str(hotel_id))
    misclassified_test_dir = os.path.join(hotel_dir, "misclassified_test_images")
    misclassified_training_dir = os.path.join(hotel_dir, "misclassified_training_images")
    os.makedirs(misclassified_test_dir, exist_ok=True)
    os.makedirs(misclassified_training_dir, exist_ok=True)

    # Filter the test dataset for the current hotel ID
    hotel_test_images = test_df[test_df['hotel_id'] == hotel_id]

    # Iterate through the test images for this hotel ID
    for idx, row in hotel_test_images.iterrows():
        image_name = row['image']
        chain = row['chain']
        true_label = row['class_index']

        # Get the predicted label for this image
        image_index = test_df.index.get_loc(idx)
        predicted_label = all_predictions[image_index]  # Use predictions collected earlier
        predicted_hotel_id = index_to_hotel_id[predicted_label]  # Map predicted label to hotel ID

        # If the classification was incorrect
        if true_label != predicted_label:
            # Misclassified test image
            src_path_test = os.path.join(image_dir, str(chain), image_name)
            dest_path_test = os.path.join(misclassified_test_dir, f"misclassified_as_{predicted_hotel_id}_{image_name}")
            shutil.copy(src_path_test, dest_path_test)

            # Add the training images of the predicted hotel ID
            predicted_training_images = train_df[train_df['hotel_id'] == predicted_hotel_id]
            for _, train_row in predicted_training_images.iterrows():
                train_image_name = train_row['image']
                train_chain = train_row['chain']

                src_path_train = os.path.join(image_dir, str(train_chain), train_image_name)
                dest_path_train = os.path.join(misclassified_training_dir, f"from_{predicted_hotel_id}_{train_image_name}")
                shutil.copy(src_path_train, dest_path_train)

print("Misclassified test and training images organized for worst classifications.")



Misclassified test and training images organized for worst classifications.


# Let's set up images for TCAV

In [39]:
# Okay let's gather all the images for the class that we want to run TCAV on

# Define the target hotel ID
target_hotel_id = 4869

# Define paths
train_image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of training images
output_dir = r"C:\ML_Project_Data\TCAV\4869"  # Directory for this hotel's images

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# Combine training and testing datasets
combined_df = pd.concat([train_df, test_df])

# Filter images for the target hotel ID
hotel_images = combined_df[combined_df['hotel_id'] == target_hotel_id]

# Copy images to the target directory
for _, row in hotel_images.iterrows():
    image_name = row['image']
    chain_id = row['chain']  # Retrieve the chain ID
    src_path = os.path.join(train_image_dir, str(chain_id), image_name)  # Construct source path
    dest_path = os.path.join(output_dir, image_name)  # Construct destination path

    # Copy image
    shutil.copy(src_path, dest_path)

print(f"All images for hotel ID {target_hotel_id} have been copied to {output_dir}.")


All images for hotel ID 4869 have been copied to C:\ML_Project_Data\TCAV\4869.


In [None]:
# Now let's use image augmentation to create more triaiing images for each concept

import os
import random
from PIL import Image
from torchvision import transforms

# Paths
base_dir = r"C:\ML_Project_Data\TCAV\4869"  # Root directory containing concept folders
concepts = ["carpet", "painting"] #,"bed", "lamp",]  # List of concepts
target_count = 30  # Target number of training images per concept

# Define augmentation transforms
augmentation_transforms = transforms.Compose([
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.Resize((224, 224)),  # Resize to match model input size
    transforms.ToTensor()  # Convert image to tensor
])

# Loop through each concept folder
for concept in concepts:
    concept_path = os.path.join(base_dir, concept, "training")  # Training images folder
    augmented_path = os.path.join(base_dir, concept, "training_augmented")  # Augmented images folder
    os.makedirs(augmented_path, exist_ok=True)

    # List all existing images in the concept folder
    image_files = [f for f in os.listdir(concept_path) if os.path.isfile(os.path.join(concept_path, f))]

    # Count current images
    current_count = len(image_files)
    print(f"Concept '{concept}': {current_count} existing images.")

    # If images are fewer than the target, augment until we have target_count
    if current_count < target_count:
        required_augmentations = target_count - current_count
        print(f"Augmenting {required_augmentations} images for '{concept}'...")

        for i in range(required_augmentations):
            # Pick a random image to augment
            img_file = random.choice(image_files)
            img_path = os.path.join(concept_path, img_file)

            # Open the image
            image = Image.open(img_path).convert("RGB")

            # Apply augmentation
            augmented_image = augmentation_transforms(image)

            # Convert tensor back to PIL Image
            augmented_image = transforms.ToPILImage()(augmented_image)

            # Save augmented image
            augmented_filename = f"augmented_{i+1}_{img_file}"
            augmented_image.save(os.path.join(augmented_path, augmented_filename))

    else:
        print(f"No augmentation needed for '{concept}'.")

print("Image augmentation complete!")


Concept 'carpet': 8 existing images.
Augmenting 22 images for 'carpet'...
Concept 'painting': 21 existing images.
Augmenting 9 images for 'painting'...
Image augmentation complete!


In [42]:
# now let's gather random images to train against

import os
import shutil
import random
import pandas as pd
from PIL import Image

# Parameters
target_hotel_id = 4869  # Hotel ID to exclude
num_random_images = 40  # Number of random images needed per concept
concepts = ["carpet", "painting"] #,"bed", "lamp",]   # List of concepts

# Paths
image_dir = r"C:\ML_Project_Data\train_images"  # Root directory of images
output_dir = r"C:\ML_Project_Data\TCAV\4869"  # Directory where concept folders exist

# Filter the dataset to exclude the target hotel
non_target_df = test_df[test_df['hotel_id'] != target_hotel_id]

# Shuffle the non-target DataFrame to randomize
non_target_df = non_target_df.sample(frac=1, random_state=42).reset_index(drop=True)

# Ensure we have enough images for selection
assert len(non_target_df) >= num_random_images, "Not enough non-target images available."

# Randomly select 70 images
selected_random_images = non_target_df.sample(n=num_random_images, random_state=42)

# Iterate through each concept
for concept in concepts:
    # Create the 'random' folder within the concept folder
    concept_random_dir = os.path.join(output_dir, concept, "random")
    os.makedirs(concept_random_dir, exist_ok=True)

    print(f"Copying random images for concept: {concept}")

    # Copy the selected random images to the 'random' folder
    for _, row in selected_random_images.iterrows():
        image_name = row['image']
        chain = row['chain']
        
        # Construct source and destination paths
        src_path = os.path.join(image_dir, str(chain), image_name)
        dest_path = os.path.join(concept_random_dir, image_name)
        
        # Copy the image
        shutil.copy(src_path, dest_path)

print("Random images copied to each concept folder.")


Copying random images for concept: carpet
Copying random images for concept: painting
Random images copied to each concept folder.


# Load Model & Extract Features

In [3]:
# Set device to GPU if available
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL_PATH = r"C:\Users\linds\OneDrive\Documents\MSBA-lindsayslaptop\Fall\Advanced Machine Learning\Best Model.pth"

# Parameters
BATCH_SIZE = 16

# Paths
concepts = ["carpet", "painting"] #,"bed", "lamp",] 
base_dir = r"C:\ML_Project_Data\TCAV\4869"
random_dir_name = "random"
train_dir_names = ["training", "training_augmented"]

In [10]:
# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


In [11]:
# Dataset Class
class ConceptImageDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, img_path  # Return the image and its path for reference

In [12]:
# Load ResNet-50 Architecture
model = models.resnet50(pretrained=False)  # Use the same architecture as during training
num_classes = 1062  # Match the number of classes in the dataset
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)  # Restore the original `fc` layer

# Load Pretrained Weights
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))  # Load the fine-tuned weights
model = model.to(DEVICE)
model.eval()  # Set the model to evaluation mode

# Modify the Model for Feature Extraction
# Extract features from the avgpool layer (1x2048 feature vector)
feature_extractor = torch.nn.Sequential(*list(model.children())[:-2])  # Remove `fc` and `avgpool`
avgpool = torch.nn.AdaptiveAvgPool2d((1, 1))  # Add avgpool manually for feature extraction


  model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))  # Load the fine-tuned weights


In [13]:
def extract_features(image_paths, model, transform, batch_size=16):
    dataset = ConceptImageDataset(image_paths, transform=transform)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    features = []
    paths = []
    with torch.no_grad():
        for images, img_paths in loader:
            images = images.to(DEVICE)
            outputs = model(images)  # Extract spatial features
            outputs = avgpool(outputs)  # Apply global average pooling
            outputs = torch.flatten(outputs, 1)  # Flatten to (batch_size x 2048)
            features.append(outputs.cpu().numpy())
            paths.extend(img_paths)

    features = np.concatenate(features, axis=0)  # Combine all features into a single array
    return features, paths

In [44]:
# Updated Feature Extraction Script
for concept in concepts:
    concept_dir = os.path.join(base_dir, concept)

    # Gather all image paths for training and training_augmented
    concept_image_paths = []
    for train_dir in train_dir_names:
        train_path = os.path.join(concept_dir, train_dir)
        concept_image_paths += [os.path.join(train_path, f) for f in os.listdir(train_path) if os.path.isfile(os.path.join(train_path, f))]

    # Gather random image paths
    random_dir = os.path.join(concept_dir, random_dir_name)
    random_image_paths = [os.path.join(random_dir, f) for f in os.listdir(random_dir) if os.path.isfile(os.path.join(random_dir, f))]

    # Gather testing image paths
    testing_dir = os.path.join(concept_dir, "testing")
    testing_image_paths = [os.path.join(testing_dir, f) for f in os.listdir(testing_dir) if os.path.isfile(os.path.join(testing_dir, f))]

    # Check if paths are not empty
    if not concept_image_paths:
        print(f"No concept images found for '{concept}'. Skipping...")
        continue
    if not random_image_paths:
        print(f"No random images found for '{concept}'. Skipping...")
        continue
    if not testing_image_paths:
        print(f"No testing images found for '{concept}'. Skipping...")
        continue

    # Extract features for concept, random, and testing images
    print(f"Extracting features for concept '{concept}'...")

    # Concept and random images
    concept_features, concept_paths = extract_features(concept_image_paths, feature_extractor, transform, BATCH_SIZE)
    random_features, random_paths = extract_features(random_image_paths, feature_extractor, transform, BATCH_SIZE)

    # Testing images
    print(f"Extracting features for testing images in '{concept}'...")
    testing_features, testing_paths = extract_features(testing_image_paths, feature_extractor, transform, BATCH_SIZE)

    # Save features and paths for later use
    np.save(os.path.join(concept_dir, "concept_features.npy"), concept_features)
    np.save(os.path.join(concept_dir, "concept_paths.npy"), concept_paths)
    np.save(os.path.join(concept_dir, "random_features.npy"), random_features)
    np.save(os.path.join(concept_dir, "random_paths.npy"), random_paths)
    np.save(os.path.join(concept_dir, "testing_features.npy"), testing_features)
    np.save(os.path.join(concept_dir, "testing_paths.npy"), testing_paths)

    print(f"Feature extraction complete for '{concept}'. Features saved.")

print("Feature extraction completed for all concepts.")

Extracting features for concept 'carpet'...
Extracting features for testing images in 'carpet'...
Feature extraction complete for 'carpet'. Features saved.
Extracting features for concept 'painting'...
Extracting features for testing images in 'painting'...
Feature extraction complete for 'painting'. Features saved.
Feature extraction completed for all concepts.


# Train Concept Vectors 

In [None]:
# now let's train the linear classifiers

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import numpy as np
import os
import pickle

# Paths and parameters
concepts = ["carpet", "painting"] #,"bed"] 
base_dir = r"C:\ML_Project_Data\TCAV\4869"

# Train linear classifiers for each concept
for concept in concepts:
    concept_dir = os.path.join(base_dir, concept)

    # Load features
    concept_features = np.load(os.path.join(concept_dir, "concept_features.npy"))
    random_features = np.load(os.path.join(concept_dir, "random_features.npy"))

    # Combine features and labels
    X = np.concatenate([concept_features, random_features], axis=0)
    y = np.concatenate([np.ones(len(concept_features)), np.zeros(len(random_features))], axis=0)  # 1 for concept, 0 for random

    # Train-test split (80% train, 20% test)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    # Standardize features using training set
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)

    # Train logistic regression classifier
    clf = LogisticRegression(random_state=42, max_iter=1000)
    clf.fit(X_train_scaled, y_train)

    # Evaluate the classifier
    y_pred = clf.predict(X_val_scaled)
    accuracy = accuracy_score(y_val, y_pred)
    print(f"Validation Accuracy for '{concept}': {accuracy:.2f}")

    # Save the classifier and scaler
    with open(os.path.join(concept_dir, f"{concept}_cav.pkl"), "wb") as cav_file:
        pickle.dump((clf, scaler), cav_file)

    print(f"CAV for concept '{concept}' trained and saved.")

print("All classifiers trained and saved.")


Validation Accuracy for 'carpet': 1.00
CAV for concept 'carpet' trained and saved.
Validation Accuracy for 'painting': 1.00
CAV for concept 'painting' trained and saved.
All classifiers trained and saved.


# TCAV Scores and Directional Derivatives

In [None]:
import numpy as np
import os
import pickle
from sklearn.metrics import accuracy_score

# Paths and Parameters
concepts = ["carpet", "painting"] #,"bed"] 
base_dir = r"C:\ML_Project_Data\TCAV\4869"

# Dictionary to store TCAV results
tcav_scores = {}

# Iterate through each concept
for concept in concepts:
    concept_dir = os.path.join(base_dir, concept)

    # Load CAV (Classifier and Scaler)
    cav_file_path = os.path.join(concept_dir, f"{concept}_cav.pkl")
    with open(cav_file_path, "rb") as f:
        clf, scaler = pickle.load(f)

    # Load testing features and paths
    testing_features = np.load(os.path.join(concept_dir, "testing_features.npy"))
    testing_paths = np.load(os.path.join(concept_dir, "testing_paths.npy"), allow_pickle=True)

    # Standardize the testing features
    testing_features_scaled = scaler.transform(testing_features)

    # Predict using the trained classifier
    predictions = clf.predict(testing_features_scaled)
    probabilities = clf.predict_proba(testing_features_scaled)[:, 1]  # Probability of concept presence

    # Calculate TCAV score (fraction of images classified as concept)
    tcav_score = predictions.mean()  # Average of 1s (concept detected)
    print(f"TCAV Score for '{concept}': {tcav_score:.2f}")

    # Save TCAV results
    tcav_scores[concept] = {
        "tcav_score": tcav_score,
        "predictions": predictions,
        "probabilities": probabilities,
        "testing_paths": testing_paths,
    }

print("TCAV Score Calculation Complete.")


TCAV Score for 'carpet': 0.10
TCAV Score for 'painting': 0.50
TCAV Score Calculation Complete.


In [4]:
#Now let's get the directional derivatives

import torch
import numpy as np
import os
import pickle
from torch.nn import Linear

# Paths and Parameters
concepts = ["carpet", "painting"] 
base_dir = r"C:\ML_Project_Data\TCAV\4869"

# Store directional derivatives
directional_derivatives = {}

# Enable gradient tracking globally
torch.set_grad_enabled(True)

# Calculate directional derivatives for each concept
for concept in concepts:
    concept_dir = os.path.join(base_dir, concept)

    # Load CAV (Classifier and Scaler)
    cav_file_path = os.path.join(concept_dir, f"{concept}_cav.pkl")
    with open(cav_file_path, "rb") as f:
        clf, scaler = pickle.load(f)

    # Load testing features and paths
    testing_features = np.load(os.path.join(concept_dir, "testing_features.npy"))
    testing_paths = np.load(os.path.join(concept_dir, "testing_paths.npy"), allow_pickle=True)

    # Convert features to PyTorch tensors with gradient tracking enabled
    testing_features_tensor = torch.tensor(testing_features, dtype=torch.float32, requires_grad=True).to(DEVICE)

    # Retain gradient explicitly (required for non-leaf tensors)
    testing_features_tensor.retain_grad()

    # Define CAV as a trainable linear layer
    linear_layer = Linear(testing_features_tensor.shape[1], 1).to(DEVICE)

    # Assign weights and bias from the loaded classifier
    with torch.no_grad():
        linear_layer.weight.copy_(torch.tensor(clf.coef_, dtype=torch.float32).to(DEVICE))
        linear_layer.bias.copy_(torch.tensor(clf.intercept_, dtype=torch.float32).to(DEVICE))

    # Forward pass through the linear layer
    logits = linear_layer(testing_features_tensor)

    # Simulate a loss to enable backpropagation
    loss = logits.sum()
    loss.backward()

    # Extract gradients
    if testing_features_tensor.grad is None:
        raise RuntimeError("Gradients were not computed. Ensure requires_grad=True and autograd is enabled.")

    gradients = testing_features_tensor.grad.cpu().numpy()

    # Calculate directional derivatives
    cav_vector = linear_layer.weight.detach().cpu().numpy().flatten()
    derivatives = np.dot(gradients, cav_vector)

    # Average directional derivative
    avg_derivative = np.mean(derivatives)

    # Print results
    print(f"Average Directional Derivative for '{concept}': {avg_derivative:.4f}")

    # Store results
    directional_derivatives[concept] = {
        "derivatives": derivatives,
        "average_derivative": avg_derivative,
        "testing_paths": testing_paths,
    }

print("Directional Derivative Calculation Complete.")


Average Directional Derivative for 'carpet': 0.2233
Average Directional Derivative for 'painting': 0.2824
Directional Derivative Calculation Complete.


In [20]:
import torch
import numpy as np
import os
import pickle
from torch.nn import Linear

# Paths and Parameters
concepts = ["bed"] 
base_dir = r"C:\ML_Project_Data\TCAV_First_attempt\4869"

# Store directional derivatives
directional_derivatives = {}

# Enable gradient tracking globally
torch.set_grad_enabled(True)

# Calculate directional derivatives for each concept
for concept in concepts:
    concept_dir = os.path.join(base_dir, concept)

    # Load CAV (Classifier and Scaler)
    cav_file_path = os.path.join(concept_dir, f"{concept}_cav.pkl")
    with open(cav_file_path, "rb") as f:
        clf, scaler = pickle.load(f)

    # Load testing features and paths
    testing_features = np.load(os.path.join(concept_dir, "testing_features.npy"))
    testing_paths = np.load(os.path.join(concept_dir, "testing_paths.npy"), allow_pickle=True)

    # Convert features to PyTorch tensors with gradient tracking enabled
    testing_features_tensor = torch.tensor(testing_features, dtype=torch.float32, requires_grad=True).to(DEVICE)

    # Retain gradient explicitly (required for non-leaf tensors)
    testing_features_tensor.retain_grad()

    # Define CAV as a trainable linear layer
    linear_layer = Linear(testing_features_tensor.shape[1], 1).to(DEVICE)

    # Assign weights and bias from the loaded classifier
    with torch.no_grad():
        linear_layer.weight.copy_(torch.tensor(clf.coef_, dtype=torch.float32).to(DEVICE))
        linear_layer.bias.copy_(torch.tensor(clf.intercept_, dtype=torch.float32).to(DEVICE))

    # Forward pass through the linear layer
    logits = linear_layer(testing_features_tensor)

    # Simulate a loss to enable backpropagation
    loss = logits.sum()
    loss.backward()

    # Extract gradients
    if testing_features_tensor.grad is None:
        raise RuntimeError("Gradients were not computed. Ensure requires_grad=True and autograd is enabled.")

    gradients = testing_features_tensor.grad.cpu().numpy()

    # Calculate directional derivatives
    cav_vector = linear_layer.weight.detach().cpu().numpy().flatten()
    derivatives = np.dot(gradients, cav_vector)

    # Average directional derivative
    avg_derivative = np.mean(derivatives)

    # Print results
    print(f"Average Directional Derivative for '{concept}': {avg_derivative:.4f}")

    # Store results
    directional_derivatives[concept] = {
        "derivatives": derivatives,
        "average_derivative": avg_derivative,
        "testing_paths": testing_paths,
    }

print("Directional Derivative Calculation Complete.")


Average Directional Derivative for 'bed': 0.3481
Directional Derivative Calculation Complete.
