# Mencoba model CSAD

## library

In [2]:
import torch
import torch.onnx
import numpy as np
from torch.utils.data import Dataset
from torchvision import transforms
import glob
from torch.utils.data import DataLoader
from sklearn.metrics import roc_auc_score
import os
import tqdm
import argparse
import time
import pandas as pd
import cv2
import openvino as ov
from PIL import Image
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc

In [6]:

from complement.metrics import compute_best_pr_re
import json

In [4]:
import os

file_path = 'C:/SEMESTER_6/capstone_sml/ckpt/openvino_models/breakfast_box.bin'

# Dapatkan ukuran dalam bytes
size_bytes = os.path.getsize(file_path)

# Konversi ke KB atau MB
size_kb = size_bytes / 1024
size_mb = size_kb / 1024

print(f"Ukuran file: {size_bytes} Bytes")
print(f"               {size_kb:.2f} KB")
print(f"               {size_mb:.2f} MB")

Ukuran file: 181959560 Bytes
               177694.88 KB
               173.53 MB


## Preprocessing asli

In [7]:
class Padding2Resize():
    def __init__(self, pad_l, pad_t, pad_r, pad_b):
        self.pad_l = pad_l
        self.pad_t = pad_t
        self.pad_r = pad_r
        self.pad_b = pad_b

    def __call__(self,image,target_size,mode='nearest'):
        shape = len(image.shape)
        if shape == 3:
            image = image[None,:,:,:]
        elif shape == 2:
            image = image[None,None,:,:]
        # B,C,H,W
        if self.pad_b == 0:
            image = image[:,:,self.pad_t:]
        else:
            image = image[:,:,self.pad_t:-self.pad_b]
        if self.pad_r == 0:
            image = image[:,:,:,self.pad_l:]
        else:
            image = image[:,:,:,self.pad_l:-self.pad_r]
        
        if isinstance(image,np.ndarray):
            image = cv2.resize(image,(target_size,target_size),interpolation=cv2.INTER_NEAREST if mode == 'nearest' else cv2.INTER_LINEAR)
        elif isinstance(image,torch.Tensor):
            image = torch.nn.functional.interpolate(image, size=(target_size,target_size), mode=mode)


        if shape == 3:
            return image[0]
        elif shape == 2:
            return image[0,0]
        return image
    

In [8]:
def get_padding_functions(orig_size,target_size=256,resize_target_size=None,mode='nearest',fill=0):
    """
        padding_func, inverse_padding_func = get_padding_functions(image.size,target_size=256)
        image2 = padding_func(image) # image2.size = (256,256) with padding
        image2.show()
        image3 = inverse_padding_func(image2) # image3.size = (256,256) without padding
        image3.show()
    """
    resize_target_size = target_size if resize_target_size is None else resize_target_size
    imsize = orig_size
    long_size = max(imsize)
    scale = target_size / long_size
    new_h = int(imsize[1] * scale + 0.5)
    new_w = int(imsize[0] * scale + 0.5)

    if (target_size - new_w) % 2 == 0:
        pad_l = pad_r = (target_size - new_w) // 2
    else:
        pad_l,pad_r = (target_size - new_w) // 2,(target_size - new_w) // 2 + 1
    if (target_size - new_h) % 2 == 0:
        pad_t = pad_b = (target_size - new_h) // 2
    else:
        pad_t,pad_b = (target_size - new_h) // 2,(target_size - new_h) // 2 + 1
    inter =  Image.NEAREST if mode == 'nearest' else Image.BILINEAR

    padding_func = transforms.Compose([
        transforms.Resize((new_h,new_w),interpolation=inter),
        transforms.Pad((pad_l, pad_t, pad_r, pad_b), fill=fill, padding_mode='constant')
    ])
    return padding_func, Padding2Resize(pad_l,pad_t,pad_r,pad_b)


In [9]:
class MVTecLOCODataset(Dataset):

    def __init__(self, root, image_size, phase,category,use_pad=True,to_gpu=True,config=None):
        self.phase=phase
        self.category = category
        self.image_size = image_size
        
        self.use_pad = use_pad
        self.build_transform()
        

        if phase=='train':
            print(f"Loading MVTec LOCO {self.category} (train)")
            self.img_path = os.path.join(root,category, 'train')
        elif phase=='eval':
            print(f"Loading MVTec LOCO {self.category} (validation)")
            self.img_path = os.path.join(root,category, 'validation')
        else:
            print(f"Loading MVTec LOCO {self.category} (test)")
            self.img_path = os.path.join(root,category, 'test')
            self.gt_path = os.path.join(root,category, 'ground_truth')
        assert os.path.isdir(os.path.join(root,category)), 'Error MVTecLOCODataset category:{}'.format(category)

        self.img_paths, self.gt_paths, self.labels, self.types = self.load_paths() # self.labels => good : 0, anomaly : 1
        
        # load dataset
        self.load_images(to_gpu=to_gpu)

    def build_transform(self):
        self.norm_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.resize_norm_transform = transforms.Compose([
            transforms.Resize((self.image_size,self.image_size),interpolation=Image.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.aug_tranform = transforms.RandomChoice([
            transforms.ColorJitter(brightness=0.2),
            transforms.ColorJitter(contrast=0.2),
            transforms.ColorJitter(saturation=0.2),
        ])
        self.transform_gt = transforms.Compose([
            transforms.ToTensor(),
        ])

        

    def load_paths(self):

        img_tot_paths = []
        gt_tot_paths = []
        tot_labels = []
        tot_types = []

        defect_types = os.listdir(self.img_path)
        
        for defect_type in defect_types:
            if defect_type == 'good':
                img_paths = glob.glob(os.path.join(self.img_path, defect_type) + "/*.png")
                img_tot_paths.extend(img_paths)
                gt_tot_paths.extend([0]*len(img_paths))
                tot_labels.extend([0]*len(img_paths))
                tot_types.extend(['good']*len(img_paths))
            else:
                img_paths = glob.glob(os.path.join(self.img_path, defect_type) + "/*.png")
                gt_paths = glob.glob(os.path.join(self.gt_path, defect_type) + "/*")
                gt_paths = [g for g in gt_paths if os.path.isdir(g)]
                img_paths.sort()
                gt_paths.sort()
                img_tot_paths.extend(img_paths)
                if len(gt_paths)==0:
                    gt_paths = [0]*len(img_paths)
                
                gt_tot_paths.extend(gt_paths)
                tot_labels.extend([1]*len(img_paths))
                tot_types.extend([defect_type]*len(img_paths))

        assert len(img_tot_paths) == len(gt_tot_paths), "Something wrong with test and ground truth pair!"

        return img_tot_paths, gt_tot_paths, tot_labels, tot_types


    def __len__(self):
        return len(self.img_paths)
    
    def load_images(self,to_gpu=True):
        
        self.pad_func, self.pad2resize = get_padding_functions(
            Image.open(self.img_paths[0]).size,
            target_size=self.image_size,
            mode='bilinear')
        
        self.samples = list()
        self.images = list()
        for i in range(len(self.img_paths)):
            img_path, gt, label, img_type = self.img_paths[i], self.gt_paths[i], self.labels[i], self.types[i]
            img = Image.open(img_path).convert('RGB')
            
            self.images.append(img.copy())

            
            resize_img = self.resize_norm_transform(img)
            pad_img = self.norm_transform(self.pad_func(img))
            
            if to_gpu:
                resize_img = resize_img.cuda()
                pad_img = pad_img.cuda()
            self.samples.append({
                'image': resize_img,
                'pad_image': pad_img,
                'label': label,
                'name': os.path.basename(img_path[:-4]),
                'type': img_type,
                'path': img_path,
            })

    def __getitem__(self, idx):
        

        if self.phase == 'train':
            # augmentation
            
            aug_image = self.aug_tranform(self.images[idx])
            self.samples[idx]['aug_image'] = self.resize_norm_transform(aug_image).cuda()

        self.samples[idx]['idx'] = torch.tensor([idx])
        
        return self.samples[idx]

## Test model
udh di modif dikit2

In [41]:
root = os.getcwd()
print(root)

c:\SEMESTER_6\project_sml\CSAD


In [15]:
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)

In [16]:
def inference_openvino(category):
    # Inference after converting to onnx
    import openvino as ov
    core =  ov.Core()
    # root = os.path.dirname(os.path.abspath(__file__))
    root = os.getcwd()
    compiled_model = core.compile_model(f"{root}/ckpt/openvino_models/{category}.xml", "CPU")
    infer_request = compiled_model.create_infer_request()
    
    def to_numpy(tensor):
        return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()


    test_set = MVTecLOCODataset(
            root="dataset/mvtec_loco_anomaly_detection",
            image_size=256,
            phase='test',
            category=category,
            to_gpu=False
        )
    test_set = DataLoader(test_set, batch_size=1, shuffle=False)

    logi_ture = []
    logi_score = []
    logi_nor = []
    logi_ano = []
    stru_ture = []
    stru_nor = []
    stru_ano = []
    stru_score = []
    for i,sample in tqdm.tqdm(enumerate(test_set), desc="Testing"):
        image = sample['image']
        path = sample['path']
        
        torch_input = image


        # Create tensor from external memory
        input_tensor = ov.Tensor(array=to_numpy(torch_input), shared_memory=False)
        # Set input tensor for model with one input
        infer_request.set_input_tensor(input_tensor)

        infer_request.start_async()
        infer_request.wait()

        # Get output tensor for model with one output
        output = infer_request.get_output_tensor()
        output_buffer = output.data
        score = output_buffer[0]
        # print(score)


        defect_class = os.path.basename(os.path.dirname(path[0]))
        
        if defect_class == "good":
            logi_ture.append(0)
            logi_score.append(score)
            logi_nor.append(score)
            stru_ture.append(0)
            stru_nor.append(score)
            stru_score.append(score)
        elif defect_class == "logical_anomalies":
            logi_ture.append(1)
            logi_ano.append(score)
            logi_score.append(score)
        elif defect_class == "structural_anomalies":
            stru_ture.append(1)
            stru_ano.append(score)
            stru_score.append(score)

    logi_auc = roc_auc_score(y_true=logi_ture, y_score=logi_score)
    stru_auc = roc_auc_score(y_true=stru_ture, y_score=stru_score)
    
    logi_tric = compute_best_pr_re(logi_ture, logi_score)
    stru_tric = compute_best_pr_re(stru_ture, stru_score)
    
    # metrik logical
    logi_presisi = logi_tric["precision"]
    logi_recal = logi_tric["recall"]
    logi_akurasi = logi_tric["accuracy"]
    logi_f1 = logi_tric["f1_score"]
    logi_threshold = logi_tric["threshold"]
    
    # metrik structural
    stru_presisi = stru_tric["precision"]
    stru_recal = stru_tric["recall"]
    stru_akurasi = stru_tric["accuracy"]
    stru_f1 = stru_tric["f1_score"]
    stru_threshold = stru_tric["threshold"]
    
    # Membuat DataFrame untuk semua kategori
    df_logi_nor = pd.DataFrame(logi_nor, columns=['score'])
    df_logi_ano = pd.DataFrame(logi_ano, columns=['score'])
    df_logi_score = pd.DataFrame(logi_score, columns=['score'])
    df_stru_nor = pd.DataFrame(stru_nor, columns=['score'])
    df_stru_ano = pd.DataFrame(stru_ano, columns=['score'])
    df_stru_score = pd.DataFrame(stru_score, columns=['score'])

    # Menyimpan setiap DataFrame ke CSV dengan format nama yang sesuai
    df_logi_nor.to_csv(f'{category}_logi_normal.csv', index=False)
    df_logi_ano.to_csv(f'{category}_logi_anomaly.csv', index=False)
    df_logi_score.to_csv(f'{category}_logi_scores.csv', index=False)
    df_stru_nor.to_csv(f'{category}_stru_normal.csv', index=False)
    df_stru_ano.to_csv(f'{category}_stru_anomaly.csv', index=False)
    df_stru_score.to_csv(f'{category}_stru_scores.csv', index=False)
    
    # Save metrics to JSON files
    logical_metrics = {
        "auc": logi_auc * 100,
        "precision": logi_presisi,
        "recall": logi_recal,
        "accuracy": logi_akurasi,
        "f1_score": logi_f1,
        "threshold": logi_threshold
    }
    
    structural_metrics = {
        "auc": stru_auc * 100,
        "precision": stru_presisi,
        "recall": stru_recal,
        "accuracy": stru_akurasi,
        "f1_score": stru_f1,
        "threshold": stru_threshold
    }
    
    os.makedirs("output", exist_ok=True)
    # Save metrics to JSON files
    with open(os.path.join("output/", f'{category}_logical_metrics.json'), 'w') as f:
        json.dump(logical_metrics, f, indent=4, cls=NumpyEncoder)

    with open(os.path.join("output/", f'{category}_structural_metrics.json'), 'w') as f:
        json.dump(structural_metrics, f, indent=4, cls=NumpyEncoder)
        
    print(f"Test {category} result: logical auc:{logi_auc*100}, structural auc:{stru_auc*100}")

    return (logi_auc*100+stru_auc*100)/2

In [17]:
categories = ["breakfast_box","juice_bottle","pushpins","screw_bag","splicing_connectors",]
aucs = []
for category in categories:
    auc = inference_openvino(category)
    aucs.append(auc)
    
# print("Total Average AUC:",np.mean(aucs))

Loading MVTec LOCO breakfast_box (test)


Testing: 275it [03:22,  1.36it/s]


Test breakfast_box result: logical auc:95.8421922986062, structural auc:89.70588235294117
Loading MVTec LOCO juice_bottle (test)


Testing: 330it [03:06,  1.77it/s]


Test juice_bottle result: logical auc:95.48996104285287, structural auc:95.83521955636034
Loading MVTec LOCO pushpins (test)


Testing: 310it [02:58,  1.74it/s]


Test pushpins result: logical auc:99.72129319955407, structural auc:94.2565754159957
Loading MVTec LOCO screw_bag (test)


Testing: 341it [03:00,  1.89it/s]


Test screw_bag result: logical auc:99.88632284312551, structural auc:94.3422630947621
Loading MVTec LOCO splicing_connectors (test)


Testing: 312it [02:50,  1.83it/s]

Test splicing_connectors result: logical auc:97.38562091503267, structural auc:94.29560059317843





In [4]:
print("Total Average AUC:",np.mean(aucs))

Total Average AUC: 95.67609313124092


In [None]:
test_set = MVTecLOCODataset(
            root="./datasets/mvtec_loco_anomaly_detection",
            image_size=256,
            phase='test',
            category="juice_bottle",
            to_gpu=False
        )

Loading MVTec LOCO juice_bottle (test)


In [28]:
print(test_set.labels)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [31]:
test_set = DataLoader(test_set, batch_size=1, shuffle=False)
type(test_set)

torch.utils.data.dataloader.DataLoader

In [37]:
for i,sample in tqdm.tqdm(enumerate(test_set), desc="Testing"):
        image = sample['image']
        path = sample['path']
        defect_class = os.path.basename(os.path.dirname(path[0]))
        print(defect_class)
        print(path)
        torch_input = image

Testing: 330it [00:00, 1844.37it/s]

good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\000.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\001.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\002.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\003.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\004.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\005.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\006.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\007.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\008.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\009.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\010.png']
good
['./datasets/mvtec_loco_anomaly_detection\\juice_bottle\\test\\good\\01




In [36]:
defect_class = os.path.basename(os.path.dirname(path[0]))
print(defect_class)

structural_anomalies


## hitung threshold

In [None]:
categories = ["breakfast_box", "juice_bottle", "pushpins", "screw_bag", "splicing_connectors"]

# Dictionary untuk menyimpan threshold untuk setiap kategori
thresholds_logical = {}
thresholds_structural = {}

for category in categories:
    # Membaca file CSV yang telah dibuat sebelumnya
    logi_nor = pd.read_csv(f'{category}_logi_normal.csv')['score'].tolist()
    logi_ano = pd.read_csv(f'{category}_logi_anomaly.csv')['score'].tolist()
    logi_scores = pd.read_csv(f'{category}_logi_scores.csv')['score'].tolist()
    
    stru_nor = pd.read_csv(f'{category}_stru_normal.csv')['score'].tolist()
    stru_ano = pd.read_csv(f'{category}_stru_anomaly.csv')['score'].tolist()
    stru_scores = pd.read_csv(f'{category}_stru_scores.csv')['score'].tolist()
    
    # Logical Anomaly Thresholding
    y_true_logi = [0] * len(logi_nor) + [1] * len(logi_ano)
    y_score_logi = logi_scores
    
    fpr, tpr, thresholds = roc_curve(y_true_logi, y_score_logi)
    optimal_idx = np.argmax(tpr - fpr)
    THRESHOLD_LOGICAL = thresholds[optimal_idx]
    thresholds_logical[category] = THRESHOLD_LOGICAL
    
    # Structural Anomaly Thresholding
    y_true_stru = [0] * len(stru_nor) + [1] * len(stru_ano)
    y_score_stru = stru_scores
    
    fpr_stru, tpr_stru, thresholds_stru = roc_curve(y_true_stru, y_score_stru)
    optimal_idx_stru = np.argmax(tpr_stru - fpr_stru)
    THRESHOLD_STRUCTURAL = thresholds_stru[optimal_idx_stru]
    thresholds_structural[category] = THRESHOLD_STRUCTURAL
    
    print(f"Category: {category}")
    print(f"  Logical Threshold: {THRESHOLD_LOGICAL:.4f}")
    print(f"  Structural Threshold: {THRESHOLD_STRUCTURAL:.4f}")

# Menyimpan semua threshold ke dalam file
threshold_df = pd.DataFrame({
    'category': categories,
    'logical_threshold': [thresholds_logical[cat] for cat in categories],
    'structural_threshold': [thresholds_structural[cat] for cat in categories]
})

threshold_df.to_csv('anomaly_thresholds_2.csv', index=False)
print("\nThresholds saved to 'anomaly_thresholds.csv'")

Category: breakfast_box
  Logical Threshold: 2.3232
  Structural Threshold: -1.4716
Category: juice_bottle
  Logical Threshold: 13.4014
  Structural Threshold: 14.5729
Category: pushpins
  Logical Threshold: 8.9723
  Structural Threshold: 3.4282
Category: screw_bag
  Logical Threshold: 9.4575
  Structural Threshold: 4.5171
Category: splicing_connectors
  Logical Threshold: 12.4772
  Structural Threshold: 12.2652

Thresholds saved to 'anomaly_thresholds.csv'


In [7]:
print(tpr_stru)
print(fpr_stru)

[0.         0.01176471 0.54117647 0.54117647 0.70588235 0.70588235
 0.71764706 0.71764706 0.76470588 0.76470588 0.8        0.8
 0.81176471 0.81176471 0.82352941 0.82352941 0.83529412 0.83529412
 0.87058824 0.87058824 0.88235294 0.88235294 0.89411765 0.89411765
 0.90588235 0.90588235 0.91764706 0.91764706 0.92941176 0.92941176
 0.94117647 0.94117647 0.95294118 0.95294118 0.96470588 0.96470588
 0.97647059 0.97647059 0.98823529 0.98823529 1.         1.        ]
[0.         0.         0.         0.00840336 0.00840336 0.01680672
 0.01680672 0.02521008 0.02521008 0.03361345 0.03361345 0.04201681
 0.04201681 0.05042017 0.05042017 0.05882353 0.05882353 0.07563025
 0.07563025 0.09243697 0.09243697 0.10084034 0.10084034 0.16806723
 0.16806723 0.19327731 0.19327731 0.25210084 0.25210084 0.28571429
 0.28571429 0.31092437 0.31092437 0.53781513 0.53781513 0.54621849
 0.54621849 0.81512605 0.81512605 0.83193277 0.83193277 1.        ]


#### visualisasi

In [None]:
# Daftar kategori
categories = ["breakfast_box", "juice_bottle", "pushpins", "screw_bag", "splicing_connectors"]
output_dir = "./threshold_analysis_viz"
os.makedirs(output_dir, exist_ok=True)

# Fungsi untuk plot distribusi skor dan ROC Curve
def plot_score_distribution(category, logi_nor, logi_ano, stru_nor, stru_ano, logi_scores, stru_scores):
    # Logical Anomaly
    plt.figure(figsize=(14, 6))

    # Histogram Logical Anomaly
    plt.subplot(1, 2, 1)
    sns.histplot(logi_nor, label='Normal', color='green', kde=True, alpha=0.5)
    sns.histplot(logi_ano, label='Logical Anomaly', color='red', kde=True, alpha=0.5)
    plt.axvline(x=logi_thresholds[category], color='blue', linestyle='--', label=f'Threshold: {logi_thresholds[category]:.4f}')
    plt.title(f"[{category}] Logical Anomaly Score Distribution")
    plt.xlabel("Anomaly Score")
    plt.ylabel("Frequency")
    plt.legend()

    # Structural Anomaly
    plt.subplot(1, 2, 2)
    sns.histplot(stru_nor, label='Normal', color='green', kde=True, alpha=0.5)
    sns.histplot(stru_ano, label='Structural Anomaly', color='orange', kde=True, alpha=0.5)
    plt.axvline(x=stru_thresholds[category], color='blue', linestyle='--', label=f'Threshold: {stru_thresholds[category]:.4f}')
    plt.title(f"[{category}] Structural Anomaly Score Distribution")
    plt.xlabel("Anomaly Score")
    plt.ylabel("Frequency")
    plt.legend()

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"{category}_distribution.png"))
    plt.close()

    # ROC Curve
    plt.figure(figsize=(12, 5))
    
    # Logical ROC
    plt.subplot(1, 2, 1)
    y_true_logi = [0] * len(logi_nor) + [1] * len(logi_ano)
    fpr, tpr, _ = roc_curve(y_true_logi, logi_scores)
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{category} - Logical Anomaly ROC')
    plt.legend(loc="lower right")

    # Structural ROC
    plt.subplot(1, 2, 2)
    y_true_stru = [0] * len(stru_nor) + [1] * len(stru_ano)
    fpr_stru, tpr_stru, _ = roc_curve(y_true_stru, stru_scores)
    roc_auc_stru = auc(fpr_stru, tpr_stru)
    plt.plot(fpr_stru, tpr_stru, lw=2, label=f'ROC curve (AUC = {roc_auc_stru:.2f})')
    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{category} - Structural Anomaly ROC')
    plt.legend(loc="lower right")

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"{category}_roc_curve.png"))
    plt.close()

# Baca semua data & simpan threshold ke dictionary untuk plotting
logi_thresholds = {}
stru_thresholds = {}

for category in categories:
    logi_nor = pd.read_csv(f'{category}_logi_normal.csv')['score'].tolist()
    logi_ano = pd.read_csv(f'{category}_logi_anomaly.csv')['score'].tolist()
    logi_scores = pd.read_csv(f'{category}_logi_scores.csv')['score'].tolist()

    stru_nor = pd.read_csv(f'{category}_stru_normal.csv')['score'].tolist()
    stru_ano = pd.read_csv(f'{category}_stru_anomaly.csv')['score'].tolist()
    stru_scores = pd.read_csv(f'{category}_stru_scores.csv')['score'].tolist()

    # Simpan threshold
    thresholds_df = pd.read_csv('anomaly_thresholds.csv')
    for _, row in thresholds_df.iterrows():
        logi_thresholds[row['category']] = row['logical_threshold']
        stru_thresholds[row['category']] = row['structural_threshold']

    # Buat visualisasi
    plot_score_distribution(
        category,
        logi_nor, logi_ano, stru_nor, stru_ano,
        logi_scores, stru_scores
    )

print(f"Visualizations saved to '{output_dir}'")

Visualizations saved to './threshold_analysis_viz'


# modifan

bisa langsung running ini aja biar gmpg

In [3]:
root = os.getcwd()
print(root)

c:\SEMESTER_6\capstone_sml


In [None]:
class Padding2Resize():
    def __init__(self, pad_l, pad_t, pad_r, pad_b):
        self.pad_l = pad_l
        self.pad_t = pad_t
        self.pad_r = pad_r
        self.pad_b = pad_b

    def __call__(self,image,target_size,mode='nearest'):
        shape = len(image.shape)
        if shape == 3:
            image = image[None,:,:,:]
        elif shape == 2:
            image = image[None,None,:,:]
        # B,C,H,W
        if self.pad_b == 0:
            image = image[:,:,self.pad_t:]
        else:
            image = image[:,:,self.pad_t:-self.pad_b]
        if self.pad_r == 0:
            image = image[:,:,:,self.pad_l:]
        else:
            image = image[:,:,:,self.pad_l:-self.pad_r]
        
        if isinstance(image,np.ndarray):
            image = cv2.resize(image,(target_size,target_size),interpolation=cv2.INTER_NEAREST if mode == 'nearest' else cv2.INTER_LINEAR)
        elif isinstance(image,torch.Tensor):
            image = torch.nn.functional.interpolate(image, size=(target_size,target_size), mode=mode)


        if shape == 3:
            return image[0]
        elif shape == 2:
            return image[0,0]
        return image
    
####################################################
# Fungsi untuk padding gambar
def get_padding_functions(orig_size,target_size=256,resize_target_size=None,mode='nearest',fill=0):
    """
        padding_func, inverse_padding_func = get_padding_functions(image.size,target_size=256)
        image2 = padding_func(image) # image2.size = (256,256) with padding
        image2.show()
        image3 = inverse_padding_func(image2) # image3.size = (256,256) without padding
        image3.show()
    """
    resize_target_size = target_size if resize_target_size is None else resize_target_size
    imsize = orig_size
    long_size = max(imsize)
    scale = target_size / long_size
    new_h = int(imsize[1] * scale + 0.5)
    new_w = int(imsize[0] * scale + 0.5)

    if (target_size - new_w) % 2 == 0:
        pad_l = pad_r = (target_size - new_w) // 2
    else:
        pad_l,pad_r = (target_size - new_w) // 2,(target_size - new_w) // 2 + 1
    if (target_size - new_h) % 2 == 0:
        pad_t = pad_b = (target_size - new_h) // 2
    else:
        pad_t,pad_b = (target_size - new_h) // 2,(target_size - new_h) // 2 + 1
    inter =  Image.NEAREST if mode == 'nearest' else Image.BILINEAR

    padding_func = transforms.Compose([
        transforms.Resize((new_h,new_w),interpolation=inter),
        transforms.Pad((pad_l, pad_t, pad_r, pad_b), fill=fill, padding_mode='constant')
    ])
    return padding_func, Padding2Resize(pad_l,pad_t,pad_r,pad_b)

###################################################################
class MVTecLOCODataset(Dataset):
    def __init__(self, image_folder, image_size=256, use_pad=True, to_gpu=False):
        """Inisialisasi dataset untuk inferensi dengan path fleksibel."""
        self.image_folder = image_folder
        self.image_size = image_size
        self.use_pad = use_pad
        self.build_transform()
        # Mengambil semua file PNG dari folder, mendukung subfolder seperti datasets/breakfast_box/11111
        self.img_paths = glob.glob(os.path.join(image_folder, "**", "*.png"), recursive=True)
        self.load_images(to_gpu=to_gpu)

    def build_transform(self):
        self.norm_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.resize_norm_transform = transforms.Compose([
            transforms.Resize((self.image_size,self.image_size),interpolation=Image.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.aug_tranform = transforms.RandomChoice([
            transforms.ColorJitter(brightness=0.2),
            transforms.ColorJitter(contrast=0.2),
            transforms.ColorJitter(saturation=0.2),
        ])
        self.transform_gt = transforms.Compose([
            transforms.ToTensor(),
        ])

    def load_images(self, to_gpu=False):
        """Memuat gambar dari path yang diberikan."""
        if not self.img_paths:
            raise ValueError("Tidak ada gambar PNG ditemukan di folder yang diberikan.")
        # Menggunakan gambar pertama untuk menentukan fungsi padding
        self.pad_func, self.pad2resize = get_padding_functions(
            Image.open(self.img_paths[0]).size,
            target_size=self.image_size,
            mode='bilinear'
        )
        
        self.samples = list()
        for img_path in self.img_paths:
            img = Image.open(img_path).convert('RGB')
            resize_img = self.resize_norm_transform(img)
            pad_img = self.norm_transform(self.pad_func(img))
            if to_gpu:
                resize_img = resize_img.cuda()
                pad_img = pad_img.cuda()
            self.samples.append({
                'image': resize_img,
                'pad_image': pad_img,
                'path': img_path
            })

    def __len__(self):
        """Mengembalikan jumlah gambar dalam dataset."""
        return len(self.img_paths)

    def __getitem__(self, idx):
        """Mengembalikan sample untuk inferensi."""
        return self.samples[idx]
    
###############################################################
def inference_openvino_modif(image_folder, category):
    """Melakukan inferensi pada gambar dari folder tertentu dan mengklasifikasikan langsung."""
    # Inisialisasi OpenVINO
    core = ov.Core()
    # root = os.path.dirname(os.path.abspath(__file__))
    root = os.getcwd()
    compiled_model = core.compile_model(f"{root}/ckpt/openvino_models/{category}.xml", "CPU")
    infer_request = compiled_model.create_infer_request()

    # Fungsi bantu untuk konversi tensor ke numpy
    def to_numpy(tensor):
        return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

    # Membuat dataset dan dataloader
    test_set = MVTecLOCODataset(
        image_folder=image_folder,
        image_size=256,
        to_gpu=False
    )
    test_loader = DataLoader(test_set, batch_size=1, shuffle=False)

    results = []
    
    # Proses inferensi
    for sample in tqdm.tqdm(test_loader, desc="Mengklasifikasikan gambar"):
        image = sample['image']
        path = sample['path']  # Batch size 1, ambil path pertama
        
        # Persiapan input untuk OpenVINO
        input_tensor = ov.Tensor(array=to_numpy(image), shared_memory=False)
        infer_request.set_input_tensor(input_tensor)

        # Jalankan inferensi
        infer_request.start_async()
        infer_request.wait()

        # Ambil skor dari output
        output = infer_request.get_output_tensor()
        score = output.data[0]  # Skor anomali
        print(f"{path}")
        print(f"image score: {score}")
        
        results.append((path, score))
        
    return results

In [24]:
categories = ["screw_bag"]
folder = "C:/SEMESTER_6/project_sml/datasets/screw_bag/11111"
for category in categories:
    results = inference_openvino_modif(folder, category)
    for path, score in results:
        print(f"{path}: {score}")

Mengklasifikasikan gambar:  25%|██▌       | 1/4 [00:00<00:01,  1.76it/s]

['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\000.png']
image score: 28.87126922607422


Mengklasifikasikan gambar:  50%|█████     | 2/4 [00:00<00:00,  2.10it/s]

['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\001.png']
image score: 26.062368392944336


Mengklasifikasikan gambar:  75%|███████▌  | 3/4 [00:01<00:00,  2.23it/s]

['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\002.png']
image score: 10.000765800476074


Mengklasifikasikan gambar: 100%|██████████| 4/4 [00:01<00:00,  2.21it/s]

['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\003.png']
image score: 5.404834747314453
['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\000.png']: 28.87126922607422
['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\001.png']: 26.062368392944336
['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\002.png']: 10.000765800476074
['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\003.png']: 5.404834747314453





In [25]:
print(path[0])
print(path)

C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\003.png
['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\003.png']


In [26]:
print(score)

5.4048347


In [14]:
thresholds = {
    "breakfast_box": {
        "logical": 2.3232,
        "structural": -1.4716
    },
    "juice_bottle": {
        "logical": 13.4014,
        "structural": 14.5729
    },
    "pushpins": {
        "logical": 8.9723,
        "structural": 3.4282
    },
    "screw_bag": {
        "logical": 9.4575,
        "structural": 4.5171
    },
    "splicing_connectors": {
        "logical": 12.4772,
        "structural": 12.2652
    }
}

In [20]:
category = "screw_bag"
logical_threshold = thresholds[category]["logical"]
structural_threshold = thresholds[category]["structural"]

print(f"Threshold Logical untuk {category}: {logical_threshold}")
print(f"Threshold Structural untuk {category}: {structural_threshold}")

Threshold Logical untuk screw_bag: 9.4575
Threshold Structural untuk screw_bag: 4.5171


liat hasil visualisasi

In [None]:
# results = inference_openvino_modif(folder, category)
for path, score in results:
    if category == "screw_bag":
        if score < thresholds[category]["structural"]:
            print(f"gambar {path} adalah NORMAL")
        elif score > 100:
            print(f"gambar {path} memiliki LOGICAL ANOMALY")
        else:
            print(f"terdapat ANOMALY pada gambar {path}")

terdapat ANOMALY pada gambar ['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\000.png']
terdapat ANOMALY pada gambar ['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\001.png']
terdapat ANOMALY pada gambar ['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\002.png']
terdapat ANOMALY pada gambar ['C:/SEMESTER_6/project_sml/datasets/screw_bag/11111\\003.png']
