In [1]:
import torch, torchvision
from torchvision import datasets, models, transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import time
from torchsummary import summary

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import os
import cv2
import hashlib
from PIL import Image

from dataclasses import dataclass

In [2]:
def image_preprocess_transforms():
    
    preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor()
        ])
    
    return preprocess

def image_common_transforms(mean=(0.5043, 0.4968, 0.4870), std=(0.1937, 0.1901, 0.1887)):
    preprocess = image_preprocess_transforms()
    
    common_transforms = transforms.Compose([
        preprocess,
        transforms.Normalize(mean, std)
    ])
    
    return common_transforms

@dataclass
class SystemConfiguration:
    '''
    Describes the common system setting needed for reproducible training
    '''
    seed: int = 21  # seed number to set the state of all random number generators
    cudnn_benchmark_enabled: bool = True  # enable CuDNN benchmark for the sake of performance
    cudnn_deterministic: bool = True  # make cudnn deterministic (reproducible training)
        

    
def setup_system(system_config: SystemConfiguration) -> None:
    torch.manual_seed(system_config.seed)
    if torch.cuda.is_available():
        torch.backends.cudnn_benchmark_enabled = system_config.cudnn_benchmark_enabled
        torch.backends.cudnn.deterministic = system_config.cudnn_deterministic

In [3]:
def load_model(model, model_dir='', model_file_name='CIPP_classifier_6_classes_v5.pt'):
    model_path = os.path.join(model_dir, model_file_name)

    # loading the model and getting model parameters by using load_state_dict
    model.load_state_dict(torch.load(model_path))
    
    return model

In [4]:
class ImagemNaoEnviada(Exception):  pass

class CIPP(Dataset):

    
    def __init__(self, data_root, num_cipp, image_shape=None, transform=None):
        
        
        # set image_resize attribute
        if image_shape is not None:
            if isinstance(image_shape, int):
                self.image_shape = (image_shape, image_shape)
            
            elif isinstance(image_shape, tuple) or isinstance(image_shape, list):
                assert len(image_shape) == 1 or len(image_shape) == 2, 'Invalid image_shape tuple size'
                if len(image_shape) == 1:
                    self.image_shape = (image_shape[0], image_shape[0])
                else:
                    self.image_shape = image_shape
            else:
                raise NotImplementedError 
                
        else:
            self.image_shape = image_shape
            
        # set transform attribute
        self.transform = transform
                
        
        # initialize the data dictionary
        self.data_dict = {
            'image_path': [],
            'n_cipp': []
        }
        
           

        class_path = os.path.join(data_root, num_cipp)

        for img in os.listdir(class_path):
            if img.lower().endswith(".jpg") or img.endswith(".png"):
                img_path = os.path.join(class_path, img)
                self.data_dict['image_path'].append(img_path)
                self.data_dict['n_cipp'].append(str(num_cipp))
        if (len(self.data_dict['image_path'])==0):
            raise ImagemNaoEnviada()
            
                    
    def __len__(self):
    
        """
        return length of the dataset
        """
        return len(self.data_dict['image_path'])
    
    
    def __getitem__(self, idx):
        """
        For given index, return images with resize and preprocessing.
        """
        image = Image.open(self.data_dict['image_path'][idx]).convert("RGB")

        im_hash = hashlib.sha1(image.tobytes())
        im_hash=im_hash.hexdigest()
        if self.image_shape is not None:
            image = F.resize(image, self.image_shape)

        if self.transform is not None:
            image = self.transform(image)
 
        n_cipp = self.data_dict['n_cipp'][idx]
        image_path=self.data_dict['image_path'][idx]
        return image, n_cipp, image_path,im_hash   

In [5]:
def prediction(model, device, batch_input):
    
    # send model to cpu/cuda according to your system configuration
    model.to(device)
    
    # it is important to do model.eval() before prediction
    model.eval()

    #data = batch_input.to(device)

    output = model(batch_input)

    # Score to probability using softmax
    prob = F.softmax(output, dim=1)

    # get the max probability
    pred_prob = prob.data.max(dim=1)[0]
    
    # get the index of the max probability
    pred_index = prob.data.max(dim=1)[1]
    
    return pred_index.cpu().numpy(), pred_prob.cpu().numpy()

In [6]:
def pretrained_resnet50(transfer_learning=True, num_class=6):
    resnet = models.resnet50(pretrained=True)
    
    if transfer_learning:
        for param in resnet.parameters():
            param.requires_grad = False
            
    last_layer_in = resnet.fc.in_features
    resnet.fc = nn.Linear(last_layer_in, num_class)
    
    return resnet

In [7]:

model = pretrained_resnet50()
model = load_model(model)

model_copy = pretrained_resnet50()
model_copy = load_model(model_copy)

In [8]:
import time



idt=0

inicio = time.time()
device='cpu'

transform = image_common_transforms()
classes=["CIPP_Frente","CIPP_Verso","Manometro","Não Tanque","Placas","Tanque"]

for cipp in os.listdir(home):
    dataset =  CIPP(home, num_cipp=cipp,transform=transform)

    data_len = dataset.__len__()

    imgs = []
    n_cipps = []
    im_paths = []
    im_hashes=[]
    for i in range(data_len):
        img, num_cipp, image_path, im_hash = dataset.__getitem__(i)
        imgs.append(img)
        n_cipps.append(num_cipp) 
        im_paths.append(image_path.replace("'","''"))
        im_hashes.append(im_hash)

    imgs = torch.stack(imgs)

    cls, prob = prediction(model, device="cpu", batch_input=imgs)

    sql=''   
    image_path=''
    TB_INSPECAO_CIPP_FOTOS_CLONAGEM=[]
    TB_FOTO_AVALIACAO_CLONAGEM=[]
    for i in range(data_len):

        image_path=im_paths[i]
        TB_INSPECAO_CIPP_FOTOS_CLONAGEM.append((cipp, 'R', im_paths[i],im_hashes[i]))#,previsao,precisao)
        


        previsao=classes[cls[i]]
        precisao=prob[i]


        id_inserido = 0

        TB_FOTO_AVALIACAO_CLONAGEM.append((id_inserido,1 ,previsao,precisao))

       

    
fim = time.time()


In [9]:
%run assinatura_foto.py
#na primeira execução baixou resnet50 da web
fotos=[]
for item in TB_INSPECAO_CIPP_FOTOS_CLONAGEM:
    print('Imagem %s com HASH %s'%(item[2].split('\\')[8], item[3]))
    fotos.append(item[2])

Imagem imagem_adulterada1.jpg com HASH c1197915d39aa081cbd1cfa9ce95b9efa2f4b5b5
Imagem imagem_base_sem adulteracao.jpg com HASH e2ca2329e526904875f37a4a38f730bdfbfc9be3


In [10]:
fot=[]
for foto in fotos:
    fot.append((foto,carregaTensor_to_numpy(foto,model)))
matriz=[]  
#print(fot)
X1=fot[0][1]
Y=fot[1][1]
dist = np.linalg.norm(X1-Y)
print('Distância entre as imagens: %f'%(dist))



Distância entre as imagens: 44.255966
