### Images pre-processing => OK code Ready

In [1]:
from PIL import Image
import matplotlib.pyplot as plt
import pandas as pd
import os

In [2]:
def rotate_and_crop_image(save_path,path, angle, crop_box, plot=False):
    """
    Applique une rotation d'un angle donné et un crop sur une image, puis enregistre le résultat en .png.
    Optionnellement, affiche l'image si plot=True.
    
    Arguments :
    - input_image_path : Chemin de l'image d'entrée.
    - output_image_path : Chemin pour enregistrer l'image modifiée.
    - angle : L'angle de rotation en degrés (positif dans le sens antihoraire).
    - crop_box : Un tuple (left, upper, right, lower) définissant les points du crop en pixels.
    - plot : Booléen, si True affiche l'image modifiée.
    """
    # Ouvrir l'image
    image = Image.open(path)
    
    # Appliquer la rotation (expand=True permet d'agrandir l'image pour qu'elle s'ajuste au cadre après rotation)
    rotated_image = image.rotate(angle, expand=True)
    
    # Appliquer le crop avec les points fournis
    cropped_image = rotated_image.crop(crop_box)
    
    # Enregistrer l'image au format PNG
    cropped_image.save(save_path, format='PNG')
    #print(f"Image enregistrée avec succès sous {path}")
    
    # Si plot=True, afficher l'image modifiée
    if plot:
        plt.imshow(cropped_image)
        plt.axis('off')  # Masquer les axes
        plt.title(f"Image après rotation de {angle}° et crop")
        plt.show()

In [3]:
# Dict defining rotation and cropping depending on component
rot_crop_data = {
        "Die01":[55,(340, 120, 500, 680)], # (left, upper, right, lower)
        "Die02":[-44, (480, 210, 640, 930)],
        "Die03":[134, (460, 200, 620, 920)],
        "Die04":[35, (310, 130, 470, 690)]
}

In [4]:
test_pictures = '/Users/nicolasthiou/Desktop/DataChallenge/input_test'
test_labels = '/Users/nicolasthiou/Desktop/DataChallenge/Y_random_nKwalR1.csv'
path_bis = '/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_test'
# Pre-process test data

In [None]:


test_df = pd.read_csv(test_labels)

for index, row in test_df.iterrows():
    path = os.path.join(test_pictures, row['filename'])
    save_path = os.path.join(path_bis,row['filename'])
    rotate_and_crop_image(save_path,path, rot_crop_data[row['lib']][0], rot_crop_data[row['lib']][1], plot=False)

In [5]:
train_pictures = '/Users/nicolasthiou/Desktop/DataChallenge/input_train'
train_labels = '/Users/nicolasthiou/Desktop/DataChallenge/Y_train_eVW9jym.csv'
path_bis = '/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_train'
# Pre-process train data


In [None]:

train_df = pd.read_csv(train_labels)

for index, row in train_df.iterrows():
    path = os.path.join(train_pictures, row['filename'])
    save_path = os.path.join(path_bis,row['filename'])
    rotate_and_crop_image(save_path,path, rot_crop_data[row['lib']][0], rot_crop_data[row['lib']][1], plot=False)

### PatchCore

In [7]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
from sklearn.neighbors import NearestNeighbors

# Define a custom dataset that returns an image and its filename.
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        # You can adjust the file extensions as needed.
        self.image_files = [f for f in os.listdir(root_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        filename = self.image_files[idx]
        img_path = os.path.join(self.root_dir, filename)
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, filename

# Define image transformations.
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Paths to your dataset folders.
train_folder = '/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_train'  # Folder containing normal images for training.
test_folder = '/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_test'    # Folder containing images to be tested.

# Create DataLoaders.
train_dataset = CustomImageDataset(root_dir=train_folder, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)

test_dataset = CustomImageDataset(root_dir=test_folder, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)

# Set device.
device = torch.device("mps")

# Load a pretrained model (ResNet50) and remove its fully-connected layers.
model = torchvision.models.resnet50(pretrained=True)
# Remove the final classification layers to get intermediate convolutional features.
model = nn.Sequential(*(list(model.children())[:-2]))
model.eval()
model.to(device)

# ----- Feature Extraction for Training Images -----
print("Extracting features from training images...")
train_features_list = []
for images, _ in tqdm(train_loader):
    images = images.to(device)
    with torch.no_grad():
        features = model(images)  # Shape: (batch_size, channels, H, W)
    # Reshape features: (batch_size, channels, H*W) then transpose -> (batch_size, H*W, channels)
    features = features.view(features.size(0), features.size(1), -1).permute(0, 2, 1)
    # Move to CPU and convert to numpy.
    features = features.cpu().numpy()
    # Append each image's patch features.
    for feat in features:
        train_features_list.append(feat)

# Combine all patches from the training set.
train_features = np.concatenate(train_features_list, axis=0)  # Shape: (num_total_patches, channels)
print(f"Total training patches: {train_features.shape[0]}")

# ----- Coreset Subsampling -----
# For simplicity, we randomly sample a subset of patches.
n_samples = 10000  # Adjust based on your memory and accuracy requirements.
if train_features.shape[0] > n_samples:
    indices = np.random.choice(train_features.shape[0], n_samples, replace=False)
    coreset_features = train_features[indices]
else:
    coreset_features = train_features
print(f"Coreset features shape: {coreset_features.shape}")

# ----- Nearest Neighbor Model -----
# Fit a nearest neighbor model on the coreset features.
nn_model = NearestNeighbors(n_neighbors=1, algorithm='auto')
nn_model.fit(coreset_features)

# ----- Anomaly Scoring for Test Images -----
print("Processing test images and computing anomaly scores...")
results = []
for image, filename in tqdm(test_loader):
    image = image.to(device)
    with torch.no_grad():
        features = model(image)  # Shape: (1, channels, H, W)
    # Reshape to get patch-level features: (num_patches, channels)
    features = features.view(features.size(0), features.size(1), -1).permute(0, 2, 1)
    features = features.squeeze(0).cpu().numpy()
    # Compute the distance from each patch feature to the nearest neighbor in the core-set.
    distances, _ = nn_model.kneighbors(features)
    # Aggregate the distances to obtain an anomaly score. Here we use the maximum distance.
    raw_score = np.max(distances)
    results.append((filename[0], raw_score))

# ----- Normalization to Obtain Anomaly Probabilities -----
# Convert raw anomaly scores to probabilities using min-max normalization.
raw_scores = np.array([score for _, score in results])
min_score, max_score = raw_scores.min(), raw_scores.max()
normalized_results = []
for filename, score in results:
    anomaly_prob = (score - min_score) / (max_score - min_score + 1e-8)  # Avoid division by zero.
    normalized_results.append((filename, anomaly_prob))

# ----- Save Results to CSV -----
output_df = pd.DataFrame(normalized_results, columns=['filename', 'anomaly_probability'])
output_df.to_csv('anomaly_scores.csv', index=False)
print("CSV file with anomaly scores saved as 'anomaly_scores.csv'.")


Extracting features from training images...


100%|██████████| 259/259 [00:45<00:00,  5.72it/s]


Total training patches: 529792
Coreset features shape: (10000, 2048)
Processing test images and computing anomaly scores...


100%|██████████| 1055/1055 [00:51<00:00, 20.42it/s]

CSV file with anomaly scores saved as 'anomaly_scores.csv'.





In [10]:
df__ = pd.read_csv('/Users/nicolasthiou/Desktop/DataChallenge/data_challenge/anomaly_scores.csv')
scores_list = []
for i in range(len(df__)):
    scores_list.append((df__.loc[i][0],df__.loc[i][1]))
print(scores_list)

[('6c2730d57920779bd76521c5145c9a9397dbe31e24f349c80af78cd17494fada.png', 0.0551663024965697), ('eba7d43b0b392a72187f3a635d756c21bfa52ac75fd444eec1b66593735e2d4c.png', 0.2906561747116168), ('2144b403405bf68d05f330ab99c7862f24a56d1735584d9d3230cc1aaa33e701.png', 0.3112505548818867), ('61f49a95147ffbf32d55c309b2449f57254331a0bd001d888a84c643c6a1bf13.png', 0.1689874987800243), ('2d5801daa5ce43728fcd7b94ca9d2921f09fd3fcd688cb17b51c266baa9d2fdd.png', 0.0508849913101523), ('49caa9922f0f03f6a4982a6132472c1ddfdfa27d759dc33315090f941d1761f2.png', 0.1069735773813867), ('557965c255e18d01817bf9ae3c5792540f166020b7a00f27d26669ef6a9f9c49.png', 0.3353699733862343), ('eee235bf5c637f0fd0abe6c2d127fdeda0ebb00d62fc2e5575c2e22c7eeabd4b.png', 0.0784642939907629), ('0fb519a127d5f8ddc2500aa17b6d88e950069dbd329b20e3dcdc52693978f38c.png', 0.1063339922731941), ('a4c4d425d5c91ddb29be345d51d154cd6e62fbdcfd03803c2355ccdcd3b0cc37.png', 0.0432431135121351), ('040dea662f360571f9c956cf73a50d5fe431211eb193293f1509eff7f

  scores_list.append((df__.loc[i][0],df__.loc[i][1]))


### Anomaly detection (PADIM) => OK

In [26]:
import random
from random import sample
import argparse
import numpy as np
import os
import pickle
from tqdm import tqdm
from collections import OrderedDict
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import precision_recall_curve
from sklearn.covariance import LedoitWolf
from scipy.spatial.distance import mahalanobis
from scipy.ndimage import gaussian_filter
from skimage import morphology
from skimage.segmentation import mark_boundaries
import matplotlib.pyplot as plt
import matplotlib

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision.models import wide_resnet50_2, resnet18

#from model import Classifier

import os
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

In [None]:
test_path = '/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_test'
padim_trained = '/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/PADIM.pkl'

In [28]:
class CustomDataset(Dataset):
    def __init__(self, data_path, transform=None):
        self.data_path = data_path
        self.transform = transform
        self.images = []


        for img_name in os.listdir(self.data_path):
            img_path = os.path.join(self.data_path, img_name)
            self.images.append(img_path)


    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert("RGB")  # Charger l'image
    
        # Appliquer les transformations
        if self.transform:
            image = self.transform(image)  # Appliquer les transformations à l'image

        filename = os.path.basename(img_path)

        return filename, image                               


def denormalization(x):
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    x = (((x.transpose(1, 2, 0) * std) + mean) * 255.).astype(np.uint8)
    
    return x


def embedding_concat(x, y):
    B, C1, H1, W1 = x.size()
    _, C2, H2, W2 = y.size()
    s = int(H1 / H2)
    x = F.unfold(x, kernel_size=s, dilation=1, stride=s)
    x = x.view(B, C1, -1, H2, W2)
    z = torch.zeros(B, C1 + C2, x.size(2), H2, W2)
    for i in range(x.size(2)):
        z[:, :, i, :, :] = torch.cat((x[:, :, i, :, :], y), 1)
    z = z.view(B, -1, H2 * W2)
    z = F.fold(z, kernel_size=s, output_size=(H1, W1), stride=s)

    return z


# Charger les données d'entraînement
with open(padim_trained, 'rb') as f:
    train_outputs = pickle.load(f)

model = wide_resnet50_2(pretrained=True, progress=True)
t_d = 1792
d = 550


# device setup
use_cuda = torch.cuda.is_available()
device = torch.device('mps')

model.to(device)
model.eval()
random.seed(1024)
torch.manual_seed(1024)
if use_cuda:
    torch.cuda.manual_seed_all(1024)

idx = torch.tensor(sample(range(0, t_d), d))

# set model's intermediate outputs
outputs = []

def hook(module, input, output):
    outputs.append(output)

model.layer1[-1].register_forward_hook(hook)
model.layer2[-1].register_forward_hook(hook)
model.layer3[-1].register_forward_hook(hook)


transform = transforms.Compose([
                                transforms.Resize((128, 128)),  # Redimensionner les images
                                transforms.ToTensor(),            # Convertir en tenseur
                                ])

test_dataset = CustomDataset(test_path, transform)
test_dataloader = DataLoader(test_dataset, batch_size=32, pin_memory=True)
test_outputs = OrderedDict([('layer1', []), ('layer2', []), ('layer3', [])])

test_imgs = []
filenames_list = []
scores_list = []

# extract test set features
for filename, x in tqdm(test_dataloader, '| feature extraction | test |'):
    filenames_list.extend(filename)  # Collect filenames for the batch
    test_imgs.extend(x.cpu().detach().numpy())
    # model prediction
    with torch.no_grad():
        _ = model(x.to(device))
    # get intermediate layer outputs
    for k, v in zip(test_outputs.keys(), outputs):
        test_outputs[k].append(v.cpu().detach())
    # initialize hook outputs
    outputs = []
for k, v in test_outputs.items():
    test_outputs[k] = torch.cat(v, 0)

# Embedding concat
embedding_vectors = test_outputs['layer1']
for layer_name in ['layer2', 'layer3']:
    embedding_vectors = embedding_concat(embedding_vectors, test_outputs[layer_name])

# randomly select d dimension
embedding_vectors = torch.index_select(embedding_vectors, 1, idx)

# calculate distance matrix
B, C, H, W = embedding_vectors.size()
embedding_vectors = embedding_vectors.view(B, C, H * W).numpy()
dist_list = []
for i in range(H * W):
    mean = train_outputs[0][:, i]
    conv_inv = np.linalg.inv(train_outputs[1][:, :, i])
    dist = [mahalanobis(sample[:, i], mean, conv_inv) for sample in embedding_vectors]
    dist_list.append(dist)

dist_list = np.array(dist_list).transpose(1, 0).reshape(B, H, W)

# upsample
dist_list = torch.tensor(dist_list)
score_map = F.interpolate(dist_list.unsqueeze(1), size=x.size(2), mode='bilinear',
                            align_corners=False).squeeze().numpy()

# apply gaussian smoothing on the score map
for i in range(score_map.shape[0]):
    score_map[i] = gaussian_filter(score_map[i], sigma=4)

# Normalization
max_score = score_map.max()
min_score = score_map.min()
scores = (score_map - min_score) / (max_score - min_score)

# calculate image-level ROC AUC score
img_scores = scores.reshape(scores.shape[0], -1).max(axis=1)

# Store scores and filenames
for filename, score in zip(filenames_list, img_scores):
    scores_list.append((filename, score))

| feature extraction | test |: 100%|██████████| 33/33 [00:02<00:00, 12.49it/s]


In [29]:
print(scores_list)

[('6c2730d57920779bd76521c5145c9a9397dbe31e24f349c80af78cd17494fada.png', 0.15198225), ('eba7d43b0b392a72187f3a635d756c21bfa52ac75fd444eec1b66593735e2d4c.png', 0.5044909), ('2144b403405bf68d05f330ab99c7862f24a56d1735584d9d3230cc1aaa33e701.png', 0.33477655), ('61f49a95147ffbf32d55c309b2449f57254331a0bd001d888a84c643c6a1bf13.png', 0.17841287), ('2d5801daa5ce43728fcd7b94ca9d2921f09fd3fcd688cb17b51c266baa9d2fdd.png', 0.18861808), ('49caa9922f0f03f6a4982a6132472c1ddfdfa27d759dc33315090f941d1761f2.png', 0.22336109), ('557965c255e18d01817bf9ae3c5792540f166020b7a00f27d26669ef6a9f9c49.png', 0.33672196), ('eee235bf5c637f0fd0abe6c2d127fdeda0ebb00d62fc2e5575c2e22c7eeabd4b.png', 0.14189887), ('0fb519a127d5f8ddc2500aa17b6d88e950069dbd329b20e3dcdc52693978f38c.png', 0.1731657), ('a4c4d425d5c91ddb29be345d51d154cd6e62fbdcfd03803c2355ccdcd3b0cc37.png', 0.15202528), ('040dea662f360571f9c956cf73a50d5fe431211eb193293f1509eff7f39258cd.png', 0.19665271), ('071ab46a64f54e27d15e0670139011db93b60e0e7f9c5df46de87

In [14]:
y_pred_anomaly = scores_list

### Classifier    => OK code Ready

In [11]:
class ImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, train=True):
        self.data = pd.read_csv(csv_file)  # Load CSV file
        self.root_dir = root_dir           # Directory where images are stored
        self.transform = transform
        self.train = train
        
        # Create a label-to-integer mapping
        self.label_map = {label: idx for idx, label in enumerate(self.data.iloc[:, -1].unique())}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Load image
        img_name = os.path.join(self.root_dir, self.data.iloc[idx, 1])  # 2nd column is filename
        image = Image.open(img_name).convert("RGB")

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        if self.train:
            label = self.data.iloc[idx, -1]  # Last column is the label
            
            # Convert the text label to an integer label
            label = self.label_map.get(label, -1)  # -1 if the label is not found, you can handle it differently
            label = torch.tensor(label, dtype=torch.long)  # Convert to tensor
            return image, label
        else:
            return image


transform = transforms.Compose([
    transforms.Resize((224, 224)),  
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = ImageDataset(csv_file="/Users/nicolasthiou/Desktop/DataChallenge/Y_train_eVW9jym.csv", root_dir="/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_train", transform=transform, train=True)
test_dataset = ImageDataset(csv_file="/Users/nicolasthiou/Desktop/DataChallenge/Y_random_nKwalR1.csv", root_dir="/Users/nicolasthiou/Desktop/DataChallenge/preprocessed_test", transform=transform, train=False)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

import torchvision.models as models
import torch.nn as nn

# Load Pretrained ResNet50
model = models.resnet50(pretrained=True)

# Replace final FC layer for 6 classes
num_classes = 6
model.fc = nn.Linear(model.fc.in_features, num_classes)




In [12]:
import torch.optim as optim

# Move model to GPU if available
device = torch.device("mps")
model = model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
for epoch in tqdm(range(num_epochs)):
    model.train()
    running_loss = 0.0
    
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

print("Training complete!")


  0%|          | 0/10 [00:05<?, ?it/s]


KeyboardInterrupt: 

In [13]:
import torch
import torch.nn.functional as F

model.eval()  # Set the model to evaluation mode

predictions = []
probabilities = []

with torch.no_grad():  # Disable gradient calculation for inference
    for images in test_loader:
        images = images.to(device)
        
        # Get model outputs
        outputs = model(images)
        
        # Apply softmax to get class probabilities
        probs = F.softmax(outputs, dim=1)  # probs is a tensor with shape [batch_size, num_classes]
        
        # Get predicted class (index of maximum probability)
        _, preds = torch.max(probs, 1)  # Get the class index with highest probability

        predictions.extend(preds.cpu().numpy())  # Convert predictions to numpy and append
        probabilities.extend(probs.cpu().numpy())  # Append probabilities as well

# Create a DataFrame with the results
import pandas as pd

test_filenames = pd.read_csv("/Users/nicolasthiou/Desktop/DataChallenge/Y_random_nKwalR1.csv").iloc[:, 1]  # Get filenames from the test CSV

# Create a DataFrame for predictions with probabilities
df = pd.DataFrame({
    "filename": test_filenames,
    "label": predictions,
    "probabilities": [prob.tolist() for prob in probabilities]  # Convert each prob tensor to a list
})

# Save to CSV
df.to_csv("predictions_with_probabilities_cropped.csv", index=False)

print("Predictions and probabilities saved to predictions_with_probabilities_cropped.csv")



Predictions and probabilities saved to predictions_with_probabilities_cropped.csv


In [47]:
import torch

# Define the file path to save the model
model_path = "/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/resnet50_model_cropped.pth"

# Save the model state dictionary
torch.save(model.state_dict(), model_path)

print(f"Model saved to {model_path}")


Model saved to /Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/resnet50_model_cropped.pth


In [48]:
idx_to_label = {idx: label for label, idx in train_dataset.label_map.items()}
print(idx_to_label)  # Check the mapping


{0: 'Missing', 1: 'GOOD', 2: 'Lift-off blanc', 3: 'Lift-off noir', 4: 'Boucle plate', 5: 'Short circuit MOS'}


In [49]:
# Load the existing CSV file
df = pd.read_csv("/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/predictions_with_probabilities_cropped.csv")  # Change to your actual filename

# Replace the numeric labels with class names
df["label"] = df["label"].map(idx_to_label)

# Save the modified CSV
df.to_csv("Named_classes_predictions_with_probabilities_cropped.csv", index=False)

print("CSV updated! Numeric labels replaced with class names.")

CSV updated! Numeric labels replaced with class names.


In [50]:
# Load the existing CSV file
df = pd.read_csv("/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/predictions_with_probabilities_uncropped.csv")  # Change to your actual filename

# Replace the numeric labels with class names
df["label"] = df["label"].map(idx_to_label)

# Save the modified CSV
df.to_csv("Named_classes_predictions_with_probabilities_uncropped.csv", index=False)

print("CSV updated! Numeric labels replaced with class names.")

CSV updated! Numeric labels replaced with class names.


In [57]:
import pandas as pd
import ast

# Your new key mapping (idx_bis):
idx_bis = {
    0: 'GOOD',
    1: 'Boucle plate',
    2: 'Lift-off blanc',
    3: 'Lift-off noir',
    4: 'Missing',
    5: 'Short circuit MOS'
}

# Define the old order as inferred from your CSV file:
old_order = ['Missing', 'GOOD', 'Lift-off blanc', 'Lift-off noir', 'Boucle plate', 'Short circuit MOS']

# Define the new order based on idx_bis:
new_order = [idx_bis[i] for i in sorted(idx_bis.keys())]
# new_order becomes: ['GOOD', 'Boucle plate', 'Lift-off blanc', 'Lift-off noir', 'Missing', 'Short circuit MOS']

# Create the reordering index: for each class in new_order, find its index in the old_order.
reorder_indices = [old_order.index(class_name) for class_name in new_order]
# Expected: [1, 4, 2, 3, 0, 5]

print("Reorder indices:", reorder_indices)  # This should print: [1, 4, 2, 3, 0, 5]

# Also, create a mapping from class name to numeric label based on idx_bis:
label_to_idx = {v: k for k, v in idx_bis.items()}

# Load the existing CSV (adjust filename as needed)
df = pd.read_csv("/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/Named_classes_predictions_with_probabilities_cropped.csv")

# Convert the probability strings to actual lists
df["probabilities"] = df["probabilities"].apply(ast.literal_eval)

# Reorder probabilities using the computed reorder_indices
df["probabilities"] = df["probabilities"].apply(lambda probs: [probs[i] for i in reorder_indices])

# Convert the textual labels in "label" column back to numbers
df["label"] = df["label"].map(label_to_idx)

# Save the updated CSV
df.to_csv("Numerical_classes_predictions_with_probabilities_cropped.csv", index=False)

print("CSV updated! Labels are now numeric and probabilities have been reordered.")


Reorder indices: [1, 4, 2, 3, 0, 5]
CSV updated! Labels are now numeric and probabilities have been reordered.


In [58]:
import pandas as pd
import ast

# Your new key mapping (idx_bis):
idx_bis = {
    0: 'GOOD',
    1: 'Boucle plate',
    2: 'Lift-off blanc',
    3: 'Lift-off noir',
    4: 'Missing',
    5: 'Short circuit MOS'
}

# Define the old order as inferred from your CSV file:
old_order = ['Missing', 'GOOD', 'Lift-off blanc', 'Lift-off noir', 'Boucle plate', 'Short circuit MOS']

# Define the new order based on idx_bis:
new_order = [idx_bis[i] for i in sorted(idx_bis.keys())]
# new_order becomes: ['GOOD', 'Boucle plate', 'Lift-off blanc', 'Lift-off noir', 'Missing', 'Short circuit MOS']

# Create the reordering index: for each class in new_order, find its index in the old_order.
reorder_indices = [old_order.index(class_name) for class_name in new_order]
# Expected: [1, 4, 2, 3, 0, 5]

print("Reorder indices:", reorder_indices)  # This should print: [1, 4, 2, 3, 0, 5]

# Also, create a mapping from class name to numeric label based on idx_bis:
label_to_idx = {v: k for k, v in idx_bis.items()}

# Load the existing CSV (adjust filename as needed)
df = pd.read_csv("/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/Named_classes_predictions_with_probabilities_uncropped.csv")

# Convert the probability strings to actual lists
df["probabilities"] = df["probabilities"].apply(ast.literal_eval)

# Reorder probabilities using the computed reorder_indices
df["probabilities"] = df["probabilities"].apply(lambda probs: [probs[i] for i in reorder_indices])

# Convert the textual labels in "label" column back to numbers
df["label"] = df["label"].map(label_to_idx)

# Save the updated CSV
df.to_csv("Numerical_classes_predictions_with_probabilities_uncropped.csv", index=False)

print("CSV updated! Labels are now numeric and probabilities have been reordered.")


Reorder indices: [1, 4, 2, 3, 0, 5]
CSV updated! Labels are now numeric and probabilities have been reordered.


In [15]:
import csv
import json  # Use json to load the list

# Initialize lists
output_classifier = []
y_pred_classifier = []

# Read the CSV file
csv_file = "/Users/nicolasthiou/Desktop/DataChallenge/models_GX6qjjM/Numerical_classes_predictions_with_probabilities_uncropped.csv"  # Replace with your actual CSV filename

with open(csv_file, newline='') as file:
    reader = csv.reader(file)
    for row in reader:
        filename = row[0]  # First column is the filename
        scores_str = row[2]  # The string representation of the list of scores

        try:
            # Use json.loads to parse the string into a list
            scores = json.loads(scores_str)
        except json.JSONDecodeError:
            print(f"Error parsing scores for file {filename}: {scores_str}")
            continue

        output_classifier.append(scores)
        y_pred_classifier.append((filename, scores))

# Print results
print("output_classifier:", output_classifier)
print("y_pred_classifier:", y_pred_classifier)


Error parsing scores for file filename: probabilities
output_classifier: [[4.5686011418411e-08, 8.733576396480203e-05, 2.986072222821434e-10, 1.4035603790318874e-08, 0.9999125003814697, 3.8650457834243923e-10], [4.21601278333128e-08, 8.994747622637078e-05, 3.6356465060727317e-10, 2.9343945939785954e-08, 0.9999098777770996, 1.861800574820549e-10], [3.6789952417848326e-08, 9.996194421546534e-05, 1.1498456631287013e-09, 6.711181299579039e-08, 0.9998998641967773, 1.5302104028336316e-10], [8.677384144561984e-09, 6.861450674477965e-05, 1.0605170214228465e-10, 1.1639295749432677e-08, 0.9999313354492188, 4.420580568265109e-11], [3.080106125707971e-08, 0.00010429016401758417, 4.6490472582760844e-10, 4.6302734091341335e-08, 0.9998955726623535, 1.590522991090637e-10], [0.9999986886978149, 5.727453511461533e-10, 7.080283381810659e-08, 4.029661511140148e-07, 4.378181728270647e-08, 8.847523531585466e-07], [5.724600171674865e-08, 7.33144479454495e-05, 3.9619010849811787e-10, 2.1228968805075965e-08, 0

### Calcul de la performance

In [16]:
# Check classifier predictions lenght = anomaly detection prediction length

if len(y_pred_classifier) != len(y_pred_anomaly):
    print("Erreur: les deux prédictions n'ont pas la meme dimension")

In [17]:
# Convertir y_pred_anomaly en un dictionnaire
anomaly_dict = dict(y_pred_anomaly)

# Convertir y_pred_classifier en un dictionnaire
classifier_dict = dict(y_pred_classifier)

# Fusionner les deux dictionnaires basés sur le filename
# Assurer que tous les filenames sont présents dans les deux modèles
merged_data = []
for filename in anomaly_dict.keys():
    if filename in classifier_dict:
        anomaly_score = anomaly_dict[filename]
        classifier_probs = classifier_dict[filename]
        merged_data.append((filename, anomaly_score, *classifier_probs))  # Fusionner les données

# Créer un DataFrame pandas avec les résultats
final_df = pd.DataFrame(merged_data, columns=['filename', 'p_drift', 'p0', 'p1', 'p2', 'p3', 'p4', 'p5'])


In [18]:
final_df.head()

Unnamed: 0,filename,p_drift,p0,p1,p2,p3,p4,p5
0,6c2730d57920779bd76521c5145c9a9397dbe31e24f349...,0.055166,8.764521e-09,5.613366e-05,9.76034e-11,2.395489e-08,0.999944,2.001183e-11
1,eba7d43b0b392a72187f3a635d756c21bfa52ac75fd444...,0.290656,0.022453,0.03719516,0.8945753,0.01211169,0.015312,0.01835294
2,2144b403405bf68d05f330ab99c7862f24a56d1735584d...,0.311251,3.232218e-08,4.508826e-05,4.583593e-10,7.285972e-08,0.999955,1.17842e-09
3,61f49a95147ffbf32d55c309b2449f57254331a0bd001d...,0.168987,3.088929e-11,3.902886e-07,1.978402e-14,4.323987e-11,1.0,1.091145e-13
4,2d5801daa5ce43728fcd7b94ca9d2921f09fd3fcd688cb...,0.050885,5.016065e-08,3.431041e-05,3.011236e-10,5.599871e-08,0.999966,2.621615e-08


In [22]:
import pandas as pd

# Load your first dataframe (assuming your columns are named as described)
df1 = final_df

# Load the second CSV containing 'window' and 'lib' columns
df2 = pd.read_csv('/Users/nicolasthiou/Desktop/DataChallenge/Y_random_nKwalR1.csv')

# Define your threshold
threshold = 0.7  # Adjust as needed

# Initialize an empty list to store new rows
new_rows = []

# Iterate through the rows of df1
for i, row in df1.iterrows():
    filename = row['filename']
    pdrift = row['p_drift']
    
    # Get the label based on pdrift and probabilities
    if pdrift > threshold:
        label = 6
    else:
        # Find the column with the highest probability and assign the label
        probabilities = row[['p0', 'p1', 'p2', 'p3', 'p4', 'p5']]
        label = probabilities.idxmax().replace('p', '')  # Get the label from the probability column name

    # Get the corresponding window and lib from df2 (assuming you match rows based on index)
    window = df2.loc[i, 'window']
    lib = df2.loc[i, 'lib']
    
    # Add the row to the list
    new_rows.append([i,filename, window, lib, label])

# Create a new dataframe
new_df = pd.DataFrame(new_rows, columns=['','filename', 'window', 'lib', 'Label'])

# Save the new dataframe to a CSV file
new_df.to_csv('PatchCore_output_cropped_Threshold_0_7.csv', index=False)


**This part is useless as we do not have the true labels for the test data**

In [76]:
#### Get the y_test

test_df = pd.read_csv(test_labels)

classes_dict = {
    "GOOD" : 0,
    "Boucle plate" : 1,
    "Lift-off blanc" : 2,
    "Lift-off noir" : 3,
    "Missing" : 4,
    "Short circuit MOS" : 5,
    "Drift" : 6
}

# Replace the labels by the corresponding vlaues in the dict

test_df['Label'] = test_df['Label'].replace(classes_dict)

# Check the new values
test_df['Label'].unique()


array([5, 1, 6, 3, 4, 0, 2])

In [77]:
# merge the results and true results DFs

merged_df = pd.merge(final_df, test_df, on='filename', how='inner')

In [78]:
print(len(final_df))
print(len(test_df))
print(len(merged_df))

1055
1055
1055


In [79]:
import numpy as np

def define_classe(row, threshold=0.5):
    # Accéder à la valeur de p_drift dans la ligne
    p_drift = row['p_drift']
    
    # Extraire les probabilités des classes (colonnes 2 à 7)
    p_classes = row[['p0', 'p1', 'p2', 'p3', 'p4', 'p5']].values
    
    # Si le drift est supérieur au seuil, retourner 6, sinon index du max des classes
    if p_drift > threshold:
        return 6
    else:
        return p_classes.argmax()  # Utiliser argmax pour obtenir l'indice du max

# Appliquer la fonction à chaque ligne du DataFrame
merged_df['y_pred'] = merged_df.apply(define_classe, axis=1)


In [80]:
merged_df.head()

Unnamed: 0.1,filename,p_drift,p0,p1,p2,p3,p4,p5,Unnamed: 0,window,lib,Label,y_pred
0,6c2730d57920779bd76521c5145c9a9397dbe31e24f349...,0.151982,8.764521e-09,5.613366e-05,9.76034e-11,2.395489e-08,0.999944,2.001183e-11,17,2003,Die01,5,4
1,eba7d43b0b392a72187f3a635d756c21bfa52ac75fd444...,0.504491,0.022453,0.03719516,0.8945753,0.01211169,0.015312,0.01835294,432,2003,Die03,3,6
2,2144b403405bf68d05f330ab99c7862f24a56d1735584d...,0.334777,3.232218e-08,4.508826e-05,4.583593e-10,7.285972e-08,0.999955,1.17842e-09,525,2005,Die03,0,4
3,61f49a95147ffbf32d55c309b2449f57254331a0bd001d...,0.178413,3.088929e-11,3.902886e-07,1.978402e-14,4.323987e-11,1.0,1.091145e-13,473,2005,Die01,0,4
4,2d5801daa5ce43728fcd7b94ca9d2921f09fd3fcd688cb...,0.188618,5.016065e-08,3.431041e-05,3.011236e-10,5.599871e-08,0.999966,2.621615e-08,932,2005,Die03,6,4


In [82]:
import numpy as np

def penalty_weighted_accuracy(y_true, y_pred, penalty_matrix):
    total_penalty = 0
    max_penalty = penalty_matrix.max()  # Maximum possible penalty in the matrix
    n = len(y_true)  # Total number of samples

    for i in range(n):
        true_class = y_true[i]
        predicted_class = y_pred[i]
        
        # If misclassified, add the penalty; if correct, penalty is 0
        if true_class != predicted_class:
            total_penalty += penalty_matrix[true_class, predicted_class]
    
    # Normalize penalty by the worst-case penalty (n * max_penalty)
    normalized_penalty = total_penalty / (n * max_penalty)
    
    # Calculate PWA
    pwa = 1 - normalized_penalty
    return pwa

# Define the penalty matrix based on the design example
penalty_matrix = np.array([
    [0, 100, 100, 100, 100, 100, 10000],    # Good
    [10000, 0, 1, 1, 1, 1, 1000],           # Defect1
    [10000, 1, 0, 1, 1, 1, 1000],           # Defect2
    [10000, 1, 1, 0, 1, 1, 1000],           # Defect3
    [10000, 1, 1, 1, 0, 1, 1000],           # Defect4
    [10000, 1, 1, 1, 1, 0, 1000],           # Defect5
    [10000, 1000, 1000, 1000, 1000, 1000, 0] # Drift
])

# Calculate Penalty-Weighted Accuracy
pwa = penalty_weighted_accuracy(merged_df['Label'], merged_df['y_pred'], penalty_matrix)
print(f"Penalty-Weighted Accuracy: {pwa}")

Penalty-Weighted Accuracy: 0.842138009478673
