# Import

In [1]:
from PIL import Image
import numpy as np
import pandas as pd

import torch
from torchsummary import summary

from script.tool import ROOT, ROOT_NFS_TEST, ROOT_NFS_DATA, standardize_feature
from tqdm import tqdm
from pathlib import Path
import time, os

# Initial

In [2]:
path_dataset = ROOT_NFS_DATA / 'Cosmenet_products_15000/raw_data'
device = torch.device("cuda:0")
df_pd = pd.read_csv(path_dataset / 'data_cleaned.csv')

In [5]:
class convert_feature:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        
    def process_extract(self, img):
        output = self.pipeline.extract(img)
        return output
    
    def check_data(self, img_path, classes):
        if len(img_path) != len(classes):
            raise ValueError("img_path and classes must have the same length")
    
    def to_pandas(self, output, classes, path):
        df_x = pd.DataFrame(output)
        df_y = pd.DataFrame([[classes, path]], columns=['classes', 'path_img'])
        return pd.concat([df_x, df_y], axis=1)
    
    def save_data(self, output, classes, path, file_name_output, i):
        if len(Path(file_name_output).parents) > 1:
            path_output = Path(file_name_output).parent
            file_name_output = str(Path(file_name_output).name)
        else:
            path_output = ROOT_NFS_TEST
        path_output = path_output / "feature_map"
        
        if not os.path.exists(path_output):
            os.makedirs(path_output)
        path_output = path_output / str(file_name_output+'.csv')
        
        data = self.to_pandas(output, classes, path)
        if i == 0:
            data.to_csv(path_output, index=False)
        else:
            data.to_csv(path_output, mode='a', header=False, index=False)
    
    def __call__(self, img_path, classes, file_name_output=None, return_extract=False):
        self.check_data(img_path, classes)
        
        for i, path in enumerate(tqdm(img_path)):
            img = Image.open(path).convert('RGB')
            output = self.process_extract(img)
            
            if file_name_output:
                self.save_data(output, classes[i], path, file_name_output, i)
            
            if return_extract:
                if i == 0:
                    X_trans = output
                else:
                    X_trans = np.concatenate((X_trans, output))
                    
        if return_extract: 
            return X_trans

# Module

## Timm

In [3]:
from timm.models import create_model
import timm

In [4]:
def select_timm_model(model, num_classes=0, pretrain=True):
    model = create_model(model, num_classes=num_classes, pretrained=pretrain)
    data_config = timm.data.resolve_model_data_config(model)
    processor = timm.data.create_transform(**data_config, is_training=False)
    return model, processor

In [None]:
# pipeline for timm library
class pipeline_timm:
    def __init__(self, device='cuda:0'):
        self.device = device
    
    def selct_model(self, model, processor):
        self.model = model
        self.processor = processor
        self.model.eval().to(self.device)
    
    def process_model(self, img):
        inputs = self.processor(img).to(self.device).unsqueeze(0)
        outputs = self.model(inputs)
        return outputs
        
    def extract(self, img):
        ### return specific layer
        outputs = self.process_model(img)
        outputs.flatten().unsqueeze(0)
        outputs = standardize_feature(outputs).to('cpu').detach().numpy()
        return outputs
    
    def report_test(self):
        img = Image.new('RGB', (224, 224))
        start_time_torch = time.time()
        outputs = self.process_model(img)
        delta_time_torch = time.time() - start_time_torch
        print("runtime :", delta_time_torch*1000, "ms")
        print(f"Output shape at layer : {outputs.shape}")

### efficientnet

In [6]:
model, preprocess = select_timm_model('efficientnet_b1', num_classes=0, pretrain=True)
eff_pipe = pipeline_timm(device=device)
eff_pipe.selct_model(model, preprocess)
eff_pipe.report_test()
cvt_feature_eff = convert_feature(eff_pipe)

runtime : 26.701688766479492 ms
Output shape at layer : torch.Size([1, 1280])


In [None]:
cvt_feature_eff(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="test"
    )

In [7]:
model, preprocess = select_timm_model('efficientnet_b5', num_classes=0, pretrain=True)
eff_b5_pipe = pipeline_timm(device=device)
eff_b5_pipe.selct_model(model, preprocess)
eff_b5_pipe.report_test()
cvt_feature_eff_b5 = convert_feature(eff_b5_pipe)

runtime : 40.941715240478516 ms
Output shape at layer : torch.Size([1, 2048])


In [None]:
cvt_feature_eff(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="efficientnet_b5"
    )

## Transformer

In [6]:
def select_transformers_model(model, processor, pretrain="google/vit-base-patch16-224-in21k"):
    model = model.from_pretrained(pretrain)
    processor = processor.from_pretrained(pretrain)
    return model, processor

In [7]:
# pipeline for transformer library
class pipeline_transformer:
    def __init__(self, layer, row=False, device='cuda:0'):
        self.device = device
        self.layer = layer
        self.row = row
    
    def selct_model(self, model, processor):
        self.model = model
        self.processor = processor
        self.model.eval().to(self.device)
    
    def process_model(self, img):
        inputs = self.processor(images=img, return_tensors="pt").to(self.device)
        outputs = self.model(**inputs)
        return outputs
        
    def extract(self, img):
        ### return specific layer
        outputs = self.process_model(img)
        if type(self.row) == bool and not self.row:
            outputs = outputs[self.layer]
        else:
            outputs = outputs[self.layer][:, self.row]
        outputs = outputs.flatten().unsqueeze(0)
        outputs = standardize_feature(outputs).to('cpu').detach().numpy()
        return outputs
    
    def report_test(self):
        img = Image.new('RGB', (224, 224))
        start_time_torch = time.time()
        outputs = self.process_model(img)
        delta_time_torch = time.time() - start_time_torch
        print("runtime :", delta_time_torch*1000, "ms")
        print(f"outputs layers : {outputs.keys()}")
        print(f"shape last_hidden_state : {outputs.last_hidden_state.shape}")
        print(f"shape pooler_output : {outputs.pooler_output.shape}")

### Vit google

In [8]:
from transformers import ViTImageProcessor, ViTModel

In [10]:
model, preprocess = select_transformers_model(ViTModel, ViTImageProcessor, pretrain="google/vit-base-patch16-224-in21k")
vit_gg_pipe = pipeline_transformer(layer="last_hidden_state", row=0, device=device)
vit_gg_pipe.selct_model(model, preprocess)
vit_gg_pipe.report_test()
cvt_feature_vit_gg = convert_feature(vit_gg_pipe)

runtime : 20.165681838989258 ms
outputs layers : odict_keys(['last_hidden_state', 'pooler_output'])
shape last_hidden_state : torch.Size([1, 197, 768])
shape pooler_output : torch.Size([1, 768])


In [None]:
cvt_feature_vit_gg(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="vit_base_patch16_224_in21k_last_hidden_state"
    )

In [25]:
model, preprocess = select_transformers_model(ViTModel, ViTImageProcessor, 
                                              pretrain=ROOT_NFS_TEST / '/weights/vit_gg_lr2e-05_eu_9ep_0_95099acc')
vit_gg_trained_lr2e_05_pipe = pipeline_transformer(layer="last_hidden_state", row=0, device=device)
vit_gg_trained_lr2e_05_pipe.selct_model(model, preprocess)
vit_gg_trained_lr2e_05_pipe.report_test()
cvt_feature_vit_gg_trained_lr2e_05 = convert_feature(vit_gg_trained_lr2e_05_pipe)

runtime : 24.142026901245117 ms
outputs layers : odict_keys(['last_hidden_state', 'pooler_output'])
shape last_hidden_state : torch.Size([1, 197, 768])
shape pooler_output : torch.Size([1, 768])


In [26]:
cvt_feature_vit_gg_trained_lr2e_05(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="vit_b_p16_224_last_hidden_trained_lr2e_05_eu_9ep_0_95099acc"
    )

100%|██████████| 15524/15524 [20:08<00:00, 12.84it/s]


## Transformer onnx

In [13]:
from onnxruntime import InferenceSession
from transformers import ViTImageProcessor

In [14]:
def select_transformers_onnx_model(path="google/vit-base-patch16-224-in21k", processor=None, providers=['CPUExecutionProvider']):
    model = InferenceSession(path, providers=providers)
    processor = processor.from_pretrained(Path(path).parent)
    return model, processor

In [None]:
# pipeline for transformer onnx library
class pipeline_transformer_onnx:
    def __init__(self, layer, row=False):
        self.layer = layer
        self.row = row
    
    def selct_model(self, model, processor):
        self.model = model
        self.processor = processor
    
    def process_model(self, img):
        inputs = self.processor(images=img, return_tensors="np")
        outputs = self.model.run(output_names=[self.layer], input_feed=dict(inputs))[0]
        return outputs
        
    def extract(self, img):
        ### return specific layer
        outputs = self.process_model(img)
        if type(self.row) == bool and not self.row:
            outputs = outputs[0]
        else:
            outputs = outputs[:, self.row]
        outputs = standardize_feature(outputs)
        return outputs
    
    def report_test(self):
        img = Image.new('RGB', (224, 224))
        start_time_torch = time.time()
        outputs = self.process_model(img)
        delta_time_torch = time.time() - start_time_torch
        print("runtime :", delta_time_torch*1000, "ms")
        print(f"shape : {outputs.shape}")

### Vit google

In [30]:
model, preprocess = select_transformers_onnx_model("/home/music/Desktop/measure_model/models/vit_gg/onnx/model.onnx", 
                                                   processor=ViTImageProcessor, providers=['CPUExecutionProvider'])
vit_gg_onnx_pipe = pipeline_transformer_onnx(layer="last_hidden_state", row=0)
vit_gg_onnx_pipe.selct_model(model, preprocess)
vit_gg_onnx_pipe.report_test()
cvt_feature_vit_gg_onnx = convert_feature(vit_gg_onnx_pipe)

runtime : 285.5355739593506 ms
shape : (1, 197, 768)


In [None]:
cvt_feature_vit_gg_onnx(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="vit_b_p16_224_last_hidden_onnx_"
    )

In [45]:
model, preprocess = select_transformers_onnx_model("/home/music/Desktop/measure_model/models/vit_gg/onnx_quantize/model_quantized.onnx", 
                                                   processor=ViTImageProcessor, providers=['CPUExecutionProvider'])
vit_gg_onnx_quantize_pipe = pipeline_transformer_onnx(layer="last_hidden_state", row=0)
vit_gg_onnx_quantize_pipe.selct_model(model, preprocess)
vit_gg_onnx_quantize_pipe.report_test()
cvt_feature_vit_gg_onnx_quantize = convert_feature(vit_gg_onnx_quantize_pipe)

runtime : 203.07588577270508 ms
shape : (1, 197, 768)


In [None]:
cvt_feature_vit_gg_onnx_quantize(
    df_pd['path_img'], 
    df_pd['classes'], 
    file_name_output="test"
    )