In [12]:
import os
import sys
import json
import random
import warnings
from pathlib import Path

import torch
import torch.nn as nn
from torchvision import models as models
from torchvision import transforms as transforms
from torchvision.models.feature_extraction import create_feature_extractor
import numpy as np
import gradio as gr
from gradio.themes.utils.sizes import Size
from PIL import Image
from mmengine import Config
import transformers
from transformers import BitsAndBytesConfig
from torch.distributions import Categorical


SLURM_ENV = {k: v for k, v in os.environ.items() if 'SLURM' in k}
sys.path.append(str(Path(os.getcwd()).parent.parent))
# sys.path.append(str(Path(__file__).parent.parent.parent))

from mllm.models.builder.build_llava import load_pretrained_llava
from mllm.dataset.process_function import PlainBoxFormatter
from demo_dataset import prepare_demo_dataset

# モデル定義

In [2]:
# TEMP_FILE_DIR = Path(__file__).parent / 'temp'
TEMP_FILE_DIR = Path(os.getcwd()) / 'temp'
TEMP_FILE_DIR.mkdir(parents=True, exist_ok=True)

#region paser

model_path=r'/home/oshita/vlm/Link-Context-Learning/model_result/LCL_PG+VI'
remove_model = False

model_name_or_path = model_path
vision_tower_path = r'/home/oshita/vlm/Link-Context-Learning/clip_vit_large_patch14.pt'
#endregion

#region configs
model_args = dict(
    type='llava',
    # TODO: process version; current version use default version
    version='v1',

    # checkpoint config
    cache_dir=None,
    model_name_or_path=model_name_or_path,
    vision_tower=vision_tower_path,
    pretrain_mm_mlp_adapter=None,
    # model config
    mm_vision_select_layer=-2,
    model_max_length=2048,
    
    # finetune config
    freeze_backbone=False,
    tune_mm_mlp_adapter=False,
    freeze_mm_mlp_adapter=False,
    freeze_mm_projector=False,

    # data process config
    is_multimodal=True,
    sep_image_conv_front=False,
    image_token_len=256,
    mm_use_im_start_end=True,

    target_processor=dict(
        boxes=dict(type='PlainBoxFormatter'),
    ),

    process_func_args=dict(
        conv=dict(type='LLavaConvProcessV1'),
        target=dict(type='BoxFormatProcess'),
        text=dict(type='LlavaTextProcessV2'),
        image=dict(type='LlavaImageProcessorV1'),
    ),

    conv_args=dict(
        conv_template=['causal_v1.0', 'hypnotized_ans_v1.0', 'final_v1.0', 'vicuna_v1.1'],
        transforms=dict(type='Expand2square'),
        tokenize_kwargs=dict(truncation_size=2048),
    ),

    gen_kwargs_set_pad_token_id=True,
    gen_kwargs_set_bos_token_id=True,
    gen_kwargs_set_eos_token_id=True,
)
model_args = Config(model_args)

training_args = Config(dict(
    bf16=False,
    fp16=True,
    device='cuda',
    fsdp=None,
))

quantization_kwargs = dict()
#region Load model and dataset
if not remove_model:
    model, preprocessor = load_pretrained_llava(model_args, training_args, **quantization_kwargs)
    preprocessor['target'] = {'boxes': PlainBoxFormatter()}
    tokenizer = preprocessor['text']

    if not getattr(model, 'is_quantized', False):
        model.to(dtype=torch.float16, device=torch.device('cuda'))
    if not getattr(model.model.vision_tower[0], 'is_quantized', False):
        model.model.vision_tower[0].to(dtype=torch.float16, device=torch.device('cuda'))

    dataset_demo = prepare_demo_dataset(model_args=model_args, preprocessor=preprocessor)

    print(f"LLM device: {model.device}, is_quantized: {getattr(model, 'is_quantized', False)}, is_loaded_in_4bit: {getattr(model, 'is_loaded_in_4bit', False)}, is_loaded_in_8bit: {getattr(model, 'is_loaded_in_8bit', False)}")
    print(f"vision device: {model.model.vision_tower[0].device}, is_quantized: {getattr(model.model.vision_tower[0], 'is_quantized', False)}, is_loaded_in_4bit: {getattr(model, 'is_loaded_in_4bit', False)}, is_loaded_in_8bit: {getattr(model, 'is_loaded_in_8bit', False)}")
else:
    print(f'Skip model process.')


Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPVisionModel: ['text_model.encoder.layers.2.self_attn.k_proj.bias', 'text_model.encoder.layers.0.mlp.fc2.weight', 'text_model.encoder.layers.10.layer_norm1.bias', 'text_model.encoder.layers.11.self_attn.q_proj.weight', 'text_model.encoder.layers.1.layer_norm2.bias', 'text_model.encoder.layers.11.layer_norm2.weight', 'text_model.encoder.layers.7.self_attn.q_proj.weight', 'text_model.encoder.layers.6.self_attn.k_proj.weight', 'text_model.encoder.layers.9.layer_norm2.weight', 'text_model.encoder.layers.9.self_attn.out_proj.weight', 'text_model.encoder.layers.11.mlp.fc2.weight', 'text_model.encoder.layers.10.mlp.fc2.bias', 'text_model.encoder.layers.5.self_attn.v_proj.bias', 'text_model.encoder.layers.8.self_attn.k_proj.weight', 'text_model.encoder.layers.5.mlp.fc2.bias', 'text_model.final_layer_norm.weight', 'text_model.encoder.layers.6.mlp.fc2.weight', 'text_model.encoder.layers.7.sel

LLM device: cuda:0, is_quantized: False, is_loaded_in_4bit: False, is_loaded_in_8bit: False
vision device: cuda:0, is_quantized: False, is_loaded_in_4bit: False, is_loaded_in_8bit: False


In [21]:
def init_vqa_state():
    return {
        'mode' : 'vqa',
        'infer_img': [],
        'infer_q': []
    }
    
def state_update(state, key, value):
    if value is None:
        return
    # format inputs
    if isinstance(value, str):
        special_tokens = [' <question>',' <image>', '<im_start>', '<im_end>', '[BEGIN EXAMPLE]', '[END EXAMPLE]', '[FINAL QUESTION]']
        for token in special_tokens:
            value = value.replace(token, '')
    state[key].append(value)

def predict(data_meta,class_name,idx,img_path):
    if len(data_meta['infer_q']) == 0:
        raise Exception('Please input question.')
    
    dataset_demo.update_data(data_meta)
    model_inputs = dataset_demo[0]
    print(f'=====model_inupts=====\n\n {model_inputs}')  
    model_dtype = next(model.parameters()).dtype
    model_inputs['images'] = model_inputs['images'].to(model_dtype)
    print(f'=====model_inupts["image"]=====\n\n {model_inputs}')  

    gen_kwargs = dict(
        use_cache=True,
        pad_token_id=tokenizer.pad_token_id,
        bos_token_id=tokenizer.bos_token_id,
        eos_token_id=tokenizer.eos_token_id,
        max_new_tokens=256,
    )

    input_ids = model_inputs['input_ids']
    with torch.inference_mode():
        with torch.autocast(dtype=torch.float16, device_type='cuda'):
            outputs = model.generate(**model_inputs, **gen_kwargs, return_dict_in_generate=True, output_scores=True)
            output_ids = outputs.sequences

            transition_scores = model.compute_transition_scores(outputs.sequences, outputs.scores, normalize_logits=True)
            generated_tokens = outputs.sequences[:, input_ids.shape[-1]:]
            import numpy as np
            for tok, score, full_score in zip(generated_tokens[0], transition_scores[0], outputs.scores):
                full_score = full_score[0]
                topk_softmax_score, topk_index = full_score.softmax(dim=-1).topk(5)
                topk_origin_score = full_score[topk_index]
                topk_tokens = tokenizer.convert_ids_to_tokens(topk_index)
                topk_strs = [f"[{idx:5d} | {token:8s} | {oscore:.3f} | {sscore:.2%}]" for idx, token, oscore, sscore in zip(topk_index, topk_tokens, topk_origin_score, topk_softmax_score)]

    input_token_len = input_ids.shape[-1]
    response = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
    print(f"response: {response}")

def get_image_feature(path:str):
    state = init_vqa_state()

    img_path = path
    infer_imgbox = Image.open(img_path)

    infer_q = 'What is the color of the cat?'
    state_update(state, 'infer_img', infer_imgbox)
    state_update(state, 'infer_q', infer_q)

    dataset_demo.update_data(state)
    model_inputs = dataset_demo[0]
    model_dtype = next(model.parameters()).dtype
    model_inputs['images'] = model_inputs['images'].to(model_dtype)
    model_inputs['images'] = model_inputs['images'].reshape((1,3,224,224))

    return model.model.vision_tower[0](model_inputs['images'],output_hidden_states=True)

def cos_sim(v1, v2, mean=True,dim=1):
    cos = nn.CosineSimilarity(dim=dim, eps=1e-6)
    if mean:
        cos_sim = cos(v1, v2)
    else:
        combined = torch.cat((v1, v2), dim=0)
        # print(combined.shape)
        average = torch.mean(combined)
        std = torch.std(combined)
        _v1,_v2 = (v1-average)/std, (v2-average)/std
        cos_sim = cos(_v1, _v2)
    return cos_sim


In [4]:
model_dtype = next(model.parameters()).dtype
"""CLIPの中間特徴量を射影するmlpの重みを取得"""
mm_projector = nn.Linear(1024, 4096)
weights = torch.load(model_path+'/pytorch_model-00003-of-00003.bin', map_location='cpu')
mm_projector_weights = weights['model.mm_projector.weight']
mm_projector.bias.data = weights['model.mm_projector.bias']
mm_projector.weight.data = mm_projector_weights
mm_projector.to(device='cuda').to(model_dtype)


Linear(in_features=1024, out_features=4096, bias=True)

# CLIP特徴の類似度

In [66]:
normal_data_path = r'/home/dataset/mvtec/hazelnut/test/good/000.png'
defect_data_path = r'/home/dataset/mvtec/hazelnut/test/crack/000.png'

good_feature = get_image_feature(normal_data_path).hidden_states[-2]
good_feature = good_feature[:, 1:]
good_feature = mm_projector(good_feature)

defect_feature = get_image_feature(defect_data_path).hidden_states[-2]
defect_feature = defect_feature[:, 1:]
defect_feature = mm_projector(defect_feature)
print(defect_feature.shape)
# print(good_feature.max().item(), good_feature.min().item())
# print(defect_feature.max().item(), defect_feature.min().item())

print('=====平均=====')
print(cos_sim(good_feature, defect_feature,mean=False).mean().item())
print(cos_sim(good_feature, defect_feature,mean=True).mean().item())
print('=====最大最小=====')
print(cos_sim(good_feature, defect_feature,mean=False).max().item())
print(cos_sim(good_feature, defect_feature,mean=True).min().item())
print(cos_sim(good_feature, defect_feature,mean=False).max().item())
print(cos_sim(good_feature, defect_feature,mean=True).min().item())
print("=====閾値=====")
threshold = 0.5
T_above_threshold = torch.sum(cos_sim(good_feature, defect_feature,mean=True) > threshold).item() / cos_sim(good_feature, defect_feature,mean=True).numel()
F_above_threshold = torch.sum(cos_sim(good_feature, defect_feature,mean=False) > threshold).item() / cos_sim(good_feature, defect_feature,mean=False).numel()
print(f'T_above_threshold: {T_above_threshold}')
print(f'F_above_threshold: {F_above_threshold}')


torch.Size([1, 256, 4096])
=====平均=====
0.400146484375
0.400146484375
=====最大最小=====
0.927734375
-0.02117919921875
0.927734375
-0.02117919921875
=====閾値=====
T_above_threshold: 0.204345703125
F_above_threshold: 0.20361328125


: 

# Imagenetで事前学習したモデル特徴の類似度

In [15]:
device = torch.device('cuda')
model = models.resnet50(pretrained=True).eval()
transform_compose = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [24]:
def extract_features_from_resnet50(model, data, layer_name='layer4'):
    # 抽出するレイヤーを指定
    return_nodes = {layer_name: 'features'}
    # 特徴抽出モデルを作成
    feature_extractor = create_feature_extractor(model, return_nodes=return_nodes)
    # 特徴量の抽出
    with torch.no_grad():
        features = feature_extractor(data)
    return features['features']

normal_data_path = r'/home/dataset/mvtec/hazelnut/test/good/000.png'
defect_data_path = r'/home/dataset/mvtec/hazelnut/test/crack/000.png'

normal_data = Image.open(normal_data_path)
normal_data = transform_compose(normal_data)
normal_data = normal_data.unsqueeze(0)
normal_data = normal_data.to(device)

defect_data = Image.open(defect_data_path)
defect_data = transform_compose(defect_data)
defect_data = defect_data.unsqueeze(0)
defect_data = defect_data.to(device)

good_feature = extract_features_from_resnet50(model, normal_data,layer_name='layer3').squeeze(0)
defect_feature = extract_features_from_resnet50(model, defect_data,layer_name='layer3').squeeze(0)


print('=====平均=====')
print(cos_sim(good_feature, defect_feature,mean=False).mean().item())
print(cos_sim(good_feature, defect_feature,mean=True).mean().item())
print('=====最大最小=====')
print(cos_sim(good_feature, defect_feature,mean=False).max().item())
print(cos_sim(good_feature, defect_feature,mean=True).min().item())
print(cos_sim(good_feature, defect_feature,mean=False).max().item())
print(cos_sim(good_feature, defect_feature,mean=True).min().item())
print("=====閾値=====")
threshold = 0.5
F_above_threshold = torch.sum(cos_sim(good_feature, defect_feature,mean=False) > threshold).item() / cos_sim(good_feature, defect_feature,mean=False).numel()
T_above_threshold = torch.sum(cos_sim(good_feature, defect_feature,mean=True) > threshold).item() / cos_sim(good_feature, defect_feature,mean=True).numel()
print(f'F_above_threshold: {F_above_threshold}')
print(f'T_above_threshold: {T_above_threshold}')

=====平均=====
0.543789267539978
0.3592729866504669
=====最大最小=====
1.0000001192092896
0.0
1.0000001192092896
0.0
=====閾値=====
T_above_threshold: 0.3843470982142857
F_above_threshold: 0.5895647321428571


: 

# 正常画像と欠陥画像の類似度(平均値)

In [35]:
import os
import json

# Initialize the dictionaries
class_avg_similarities = {}
defect_similarities = {}

mvtec_names = ['bottle', 'cable', 'capsule', 'carpet', 'grid', 'hazelnut', 'leather', 'metal_nut', 'pill', 'screw', 'tile', 'toothbrush', 'transistor', 'wood', 'zipper']
for name in mvtec_names:
    defect_names = [p.name for p in Path(r'/home/dataset/mvtec/'+name+'/test').glob('*') if p.is_dir()]
    defect_names.remove('good')
    if 'combined' in defect_names:
        defect_names.remove('combined')
    for defect_name in defect_names:
        # Get the feature of the middle layer
        good_feature = get_image_feature(rf'/home/dataset/mvtec/{name}/test/good/000.png').hidden_states[-2]
        # Remove the class token
        good_feature = good_feature[:, 1:]
        good_feature = mm_projector(good_feature)

        defect_paths = [p for p in Path(rf'/home/dataset/mvtec/{name}/test/{defect_name}').glob('*.png')]
        similarity_scores = []
        for path in defect_paths:
            defect_feature = get_image_feature(path).hidden_states[-2]
            defect_feature = defect_feature[:, 1:]
            defect_feature = mm_projector(defect_feature)
            similarity_scores.append(cos_sim(good_feature, defect_feature))
            # print(cos_sim(good_feature, defect_feature))

        average_similarity = sum(similarity_scores) / len(similarity_scores)
        print(average_similarity)

        # Save the average similarity for the class
        class_avg_similarities[name] = average_similarity.item()  # Convert tensor to a Python number

        # Save the similarity scores for each defect
        if name not in defect_similarities:
            defect_similarities[name] = {}
        defect_similarities[name][defect_name] = [s.item() for s in similarity_scores]  # Convert each tensor to a Python number

    # Save the dictionaries as .txt files in the specified path
    save_path = f'/home/oshita/vlm/Link-Context-Learning/mllm/demo/result/mvtec_feature/{name}/{Path(model_path).name}'
    os.makedirs(save_path, exist_ok=True)
    with open(os.path.join(save_path, 'OKvsNO_class_avg_similarities.txt'), 'w') as f:
        f.write(json.dumps(class_avg_similarities))
    with open(os.path.join(save_path, 'OKvsNO_defect_similarities.txt'), 'w') as f:
        f.write(json.dumps(defect_similarities))

tensor(0.7593, device='cuda:0', dtype=torch.float16, grad_fn=<DivBackward0>)
tensor(0.7842, device='cuda:0', dtype=torch.float16, grad_fn=<DivBackward0>)
tensor(0.7593, device='cuda:0', dtype=torch.float16, grad_fn=<DivBackward0>)
tensor(0.3032, device='cuda:0', dtype=torch.float16, grad_fn=<DivBackward0>)


KeyboardInterrupt: 

In [161]:
# Initialize the dictionaries
good2good_class_avg_similarities = {}

mvtec_names = ['bottle', 'cable', 'capsule', 'carpet', 'grid', 'hazelnut', 'leather', 'metal_nut', 'pill', 'screw', 'tile', 'toothbrush', 'transistor', 'wood', 'zipper']
for name in mvtec_names:
    good_feature = get_image_feature(rf'/home/dataset/mvtec/{name}/test/good/000.png').hidden_states[-2]
    good_feature = good_feature[:, 1:]
    good_feature = mm_projector(good_feature)
    good_paths = [p for p in Path(rf'/home/dataset/mvtec/{name}/test/good').glob('*.png')]
    good_paths.remove(good_paths[0])
    similarity_scores = []
    for path in good_paths:
        good2_feature = get_image_feature(path).hidden_states[-2]
        good2_feature = good2_feature[:, 1:]
        good2_feature = mm_projector(good2_feature)
        similarity_scores.append(cos_sim(good_feature, good2_feature))

    average_similarity = sum(similarity_scores) / len(similarity_scores)
    print(f'{name} : {average_similarity}')

    # Save the average similarity for the class
    good2good_class_avg_similarities[name] = average_similarity.item()

    save_path = f'/home/oshita/vlm/Link-Context-Learning/mllm/demo/result/mvtec_feature/{name}/{Path(model_path).name}'
    os.makedirs(save_path, exist_ok=True)
    with open(os.path.join(save_path, 'OKvsOK_class_avg_similarities.txt'), 'w') as f:
        f.write(json.dumps(good2good_class_avg_similarities))
    


bottle : 0.8037109375
cable : 0.316650390625
capsule : 0.49462890625
carpet : 0.336181640625
grid : 0.2705078125
hazelnut : 0.468017578125
leather : 0.317626953125
metal_nut : 0.301025390625
pill : 0.49560546875
screw : 0.317626953125
tile : 0.25244140625
toothbrush : 0.4833984375
transistor : 0.431884765625
wood : 0.267578125
zipper : 0.40478515625


In [162]:
for (OKvsNO_key,OKvsNO_value),(OKvsOK_key, OKvsOK_value) in zip(class_avg_similarities.items(),good2good_class_avg_similarities.items()):
    print(f"{OKvsNO_key} : {np.round(np.abs(OKvsNO_value-OKvsOK_value),3)}")
    save_path = f'/home/oshita/vlm/Link-Context-Learning/mllm/demo/result/mvtec_feature/{OKvsNO_key}/{Path(model_path).name}'
    os.makedirs(save_path, exist_ok=True)
    with open(os.path.join(save_path, 'diff_class_avg_similarities.txt'), 'w') as f:
        f.write(f"{OKvsNO_key} : {np.round(np.abs(OKvsNO_value-OKvsOK_value),3)}")
    



bottle : 0.044
cable : 0.031
capsule : 0.008
carpet : 0.037
grid : 0.038
hazelnut : 0.001
leather : 0.071
metal_nut : 0.031
pill : 0.004
screw : 0.028
tile : 0.036
toothbrush : 0.031
transistor : 0.145
wood : 0.056
zipper : 0.008


: 

In [None]:
import torch
import torchvision.models as models
from torchvision.models.feature_extraction import create_feature_extractor
from torchvision import transforms
from PIL import Image

def load_and_transform_image(image_path, size=(224, 224)):
    # 画像の読み込み
    image = Image.open(image_path).convert('RGB')

    # 画像の前処理
    transform = transforms.Compose([
        transforms.Resize(size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return transform(image)

def extract_features_from_resnet50(model,image_path, layer_name='layer4'):
    return_nodes = {layer_name: 'features'}
    
    feature_extractor = create_feature_extractor(model, return_nodes=return_nodes)
    image = load_and_transform_image(image_path)
    image = image.unsqueeze(0)
    with torch.no_grad():
        features = feature_extractor(image)
    return features['features']

# 使用例
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = models.resnet50(pretrained=True).to(device).eval()
good_path = '/home/dataset/mvtec/wood/test/good/000.png'  # 画像のパスを指定
layer_to_extract = 'layer4'  # 抽出したいレイヤーを指定
features = extract_features_from_resnet50(model,path, layer_to_extract)

print(features.shape)  # 抽出された特徴量の形状を表示
