<a href="https://colab.research.google.com/github/nakamura196/ndl_ocr/blob/main/Image_Similarity_Search_in_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Image Similarity Search in PyTorch

以下を参考にしています。

https://medium.com/pytorch/image-similarity-search-in-pytorch-1a744cf3469

In [48]:
!nvidia-smi

Wed Aug 17 03:47:58 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   38C    P0    39W / 300W |   9263MiB / 16160MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [49]:
HOME_DIR = "/content"
IMG_DIR = "/content/dataset"
IMG_HEIGHT = 256
IMG_WIDTH = 256

`dataset`フォルダがない場合には、データをダウンロード

In [50]:
if not os.path.exists(IMG_DIR):
  !wget --no-check-certificate \
      https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip \
      -O cats_and_dogs_filtered.zip
  !unzip cats_and_dogs_filtered.zip
  !mkdir dataset

  import glob
  from tqdm import tqdm
  files = glob.glob("/content/cats_and_dogs_filtered/*/*/*.jpg")
  for file in tqdm(files):
    filename = file.split("/")[-1]
    !mv $file $IMG_DIR/$filename

  !rm -rf cats_and_dogs_filtered

In [51]:
import torch
from PIL import Image
import os
from tqdm import tqdm
from torch.utils.data import Dataset

import torchvision.transforms as T
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

In [52]:
class FolderDataset(Dataset):
    """
    Creates a PyTorch dataset from folder, returning two tensor images.
    Args: 
    main_dir : directory where images are stored.
    transform (optional) : torchvision transforms to be applied while making dataset
    """
    def __init__(self, main_dir, transform=None):
        self.main_dir = main_dir
        self.transform = transform
        self.all_imgs = os.listdir(main_dir)

    def __len__(self):
        return len(self.all_imgs)

    def __getitem__(self, idx):
        img_loc = os.path.join(self.main_dir, self.all_imgs[idx])
        image = Image.open(img_loc).convert("RGB")

        image = image.resize((IMG_WIDTH, IMG_HEIGHT))

        if self.transform is not None:
            tensor_image = self.transform(image)

        return tensor_image, tensor_image

In [53]:
class ConvEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        # self.img_size = img_size
        self.conv1 = nn.Conv2d(3, 16, (3, 3), padding=(1, 1))
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool2d((2, 2))

        self.conv2 = nn.Conv2d(16, 32, (3, 3), padding=(1, 1))
        self.relu2 = nn.ReLU(inplace=True)
        self.maxpool2 = nn.MaxPool2d((2, 2))

        self.conv3 = nn.Conv2d(32, 64, (3, 3), padding=(1, 1))
        self.relu3 = nn.ReLU(inplace=True)
        self.maxpool3 = nn.MaxPool2d((2, 2))

        
        self.conv4 = nn.Conv2d(64, 128, (3, 3), padding=(1, 1))
        self.relu4 = nn.ReLU(inplace=True)
        self.maxpool4 = nn.MaxPool2d((2, 2))

        '''
        self.conv5 = nn.Conv2d(128, 256, (3, 3), padding=(1, 1))
        self.relu5 = nn.ReLU(inplace=True)
        self.maxpool5 = nn.MaxPool2d((2, 2))
        '''

    def forward(self, x):
        # Downscale the image with conv maxpool etc.
        # print(x.shape)
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        # print(x.shape)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        # print(x.shape)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool3(x)

        # print(x.shape)

        
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool4(x)

        # print(x.shape)

        '''
        x = self.conv5(x)
        x = self.relu5(x)
        x = self.maxpool5(x)

        # print(x.shape)
        
        '''
        return x

class ConvDecoder(nn.Module):
    """
    A simple Convolutional Decoder Model
    """

    def __init__(self):
        super().__init__()
        self.deconv1 = nn.ConvTranspose2d(128, 64, (2, 2), stride=(2, 2))
        # self.upsamp1 = nn.UpsamplingBilinear2d(2)
        self.relu1 = nn.ReLU(inplace=True)

        self.deconv2 = nn.ConvTranspose2d(64, 32, (2, 2), stride=(2, 2))
        # self.upsamp1 = nn.UpsamplingBilinear2d(2)
        self.relu2 = nn.ReLU(inplace=True)

        self.deconv3 = nn.ConvTranspose2d(32, 16, (2, 2), stride=(2, 2))
        # self.upsamp1 = nn.UpsamplingBilinear2d(2)
        self.relu3 = nn.ReLU(inplace=True)

        self.deconv4 = nn.ConvTranspose2d(16, 3, (2, 2), stride=(2, 2))
        # self.upsamp1 = nn.UpsamplingBilinear2d(2)
        self.relu4 = nn.ReLU(inplace=True)

    def forward(self, x):
        # print(x.shape)
        x = self.deconv1(x)
        x = self.relu1(x)
        # print(x.shape)

        x = self.deconv2(x)
        x = self.relu2(x)
        # print(x.shape)

        x = self.deconv3(x)
        x = self.relu3(x)
        # print(x.shape)

        x = self.deconv4(x)
        x = self.relu4(x)
        # print(x.shape)

        return x

In [54]:
SEED = 42
TRAIN_RATIO = 0.75
VAL_RATIO = 1 - TRAIN_RATIO
SHUFFLE_BUFFER_SIZE = 100

LEARNING_RATE = 1e-3
EPOCHS = 20

TRAIN_BATCH_SIZE = 256  # Let's see, I don't have GPU, Google Colab is best hope
TEST_BATCH_SIZE = 256  # Let's see, I don't have GPU, Google Colab is best hope
FULL_BATCH_SIZE = 256

AUTOENCODER_MODEL_PATH = f"{HOME_DIR}/baseline_autoencoder.pt"
ENCODER_MODEL_PATH = f"{HOME_DIR}/baseline_encoder.pt"
DECODER_MODEL_PATH = f"{HOME_DIR}/baseline_decoder.pt"

In [55]:
"""
I can write this if we need custom training loop etc.
I usually use this in PyTorch.
"""

__all__ = ["train_step", "val_step", "create_embedding"]

import torch
import torch.nn as nn

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def train_step(encoder, decoder, train_loader, loss_fn, optimizer, device):
    # device = "cuda"
    encoder.train()
    decoder.train()

    # print(device)

    for batch_idx, (train_img, target_img) in enumerate(train_loader):
        train_img = train_img.to(device)
        target_img = target_img.to(device)

        optimizer.zero_grad()

        enc_output = encoder(train_img)
        dec_output = decoder(enc_output)

        loss = loss_fn(dec_output, target_img)
        loss.backward()

        optimizer.step()

    return loss.item()


def val_step(encoder, decoder, val_loader, loss_fn, device):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        for batch_idx, (train_img, target_img) in enumerate(val_loader):
            train_img = train_img.to(device)
            target_img = target_img.to(device)

            enc_output = encoder(train_img)
            dec_output = decoder(enc_output)

            loss = loss_fn(dec_output, target_img)

    return loss.item()

In [56]:
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

# print("Setting Seed for the run, seed = {}".format(config.SEED))

# seed_everything(config.SEED)

transforms = T.Compose([T.ToTensor()])
print("------------ Creating Dataset ------------")
full_dataset = FolderDataset(IMG_DIR, transforms)

train_size = int(TRAIN_RATIO * len(full_dataset))
val_size = len(full_dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(
    full_dataset, [train_size, val_size]
)

print("------------ Dataset Created ------------")
print("------------ Creating DataLoader ------------")
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True, drop_last=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=TEST_BATCH_SIZE
)

full_loader = torch.utils.data.DataLoader(
    full_dataset, batch_size=FULL_BATCH_SIZE
)

------------ Creating Dataset ------------
------------ Dataset Created ------------
------------ Creating DataLoader ------------


In [57]:
imgs = train_dataset.dataset.all_imgs

In [58]:
lines = []
for img in imgs:
  lines.append(img + "\n")
with open(f"{HOME_DIR}/output.txt", 'w', encoding='utf-8', newline='\n') as f:
    f.writelines(lines)

In [None]:
print("------------ Dataloader Cretead ------------")

# print(train_loader)
loss_fn = nn.MSELoss()

encoder = ConvEncoder()
decoder = ConvDecoder()

if torch.cuda.is_available():
    print("GPU Availaible moving models to GPU")
else:
    print("Moving models to CPU")

encoder.to(device)
decoder.to(device)

# print(device)

autoencoder_params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.AdamW(autoencoder_params, lr=LEARNING_RATE)

# early_stopper = utils.EarlyStopping(patience=5, verbose=True, path=)
max_loss = 9999

print("------------ Training started ------------")

for epoch in tqdm(range(EPOCHS)):
    train_loss = train_step(
        encoder, decoder, train_loader, loss_fn, optimizer, device=device
    )
    print(f"Epochs = {epoch}, Training Loss : {train_loss}")
    val_loss = val_step(
        encoder, decoder, val_loader, loss_fn, device=device
    )

    # Simple Best Model saving
    if val_loss < max_loss:
        print("Validation Loss decreased, saving new best model")
        torch.save(encoder.state_dict(), ENCODER_MODEL_PATH)
        torch.save(decoder.state_dict(), DECODER_MODEL_PATH)

    print(f"Epochs = {epoch}, Validation Loss : {val_loss}")

print("Training Done")

------------ Dataloader Cretead ------------
GPU Availaible moving models to GPU
------------ Training started ------------


  0%|          | 0/20 [00:00<?, ?it/s]

Epochs = 0, Training Loss : 0.22409912943840027


  5%|▌         | 1/20 [00:17<05:32, 17.51s/it]

Validation Loss decreased, saving new best model
Epochs = 0, Validation Loss : 0.22495345771312714
Epochs = 1, Training Loss : 0.2089099884033203


 10%|█         | 2/20 [00:34<05:14, 17.49s/it]

Validation Loss decreased, saving new best model
Epochs = 1, Validation Loss : 0.19292905926704407
Epochs = 2, Training Loss : 0.1672382354736328


 15%|█▌        | 3/20 [00:52<04:54, 17.33s/it]

Validation Loss decreased, saving new best model
Epochs = 2, Validation Loss : 0.16914509236812592


In [None]:
def create_embedding(encoder, full_loader, embedding_dim, device):
    encoder.eval()
    embedding = torch.randn(embedding_dim)

    with torch.no_grad():
        for batch_idx, (train_img, target_img) in tqdm(enumerate(full_loader)):
            train_img = train_img.to(device)
            enc_output = encoder(train_img).cpu()
            embedding = torch.cat((embedding, enc_output), 0)
    
    return embedding

# Save the feature representations.
embedding_dim = (1, 128, 16, 16) # This we know from our encoder

# We need feature representations for complete dataset not just train and validation.
# Hence we use full loader here.
embedding = create_embedding(encoder, full_loader, embedding_dim, device)

# Convert embedding to numpy and save them
numpy_embedding = embedding.cpu().detach().numpy()
num_images = numpy_embedding.shape[0]

# Save the embeddings for complete dataset, not just train
flattened_embedding = numpy_embedding.reshape((num_images, -1))

import numpy as np

np.save(f"{HOME_DIR}/data_embedding_f.npy", flattened_embedding)

# 推論

In [None]:
import torch
import numpy as np
from sklearn.neighbors import NearestNeighbors
import torchvision.transforms as T
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
size = IMG_WIDTH

def load_image_tensor(image_path, device):
    img = Image.open(image_path)
    img = img.resize((size, size))
    image_tensor = T.ToTensor()(img)
    image_tensor = image_tensor.unsqueeze(0)
    return image_tensor

In [None]:
def compute_similar_images(image_path, num_images, embedding, device):
    image_tensor = load_image_tensor(image_path, device)
    image_tensor = image_tensor.to(device)

    with torch.no_grad():
        image_embedding = encoder(image_tensor).cpu().detach().numpy()

    flattened_embedding = image_embedding.reshape((image_embedding.shape[0], -1))

    knn = NearestNeighbors(n_neighbors=num_images, metric="cosine")
    knn.fit(embedding)

    _, indices = knn.kneighbors(flattened_embedding)
    indices_list = indices.tolist()
    return indices_list


In [None]:
imgs = train_dataset.dataset.all_imgs

def plot_similar_images(indices_list):
    indices = indices_list[0]

    for index in indices:
        img_name = train_dataset.dataset.all_imgs[index - 1]
        img_path = os.path.join(f"{IMG_DIR}/{img_name}")
        print(img_path)
        img = Image.open(img_path).convert("RGB")
        plt.imshow(img)
        plt.show()

In [None]:
NUM_IMAGES = 10
ENCODER_MODEL_PATH = f"{HOME_DIR}/baseline_encoder.pt"
EMBEDDING_PATH = f"{HOME_DIR}/data_embedding_f.npy"

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
encoder = ConvEncoder() # torch_model.

# Load the state dict of encoder
encoder.load_state_dict(torch.load(ENCODER_MODEL_PATH, map_location=device))
encoder.eval()
encoder.to(device)

# Loads the embedding
embedding = np.load(EMBEDDING_PATH)

猫の例

In [None]:
TEST_IMAGE_PATH = "/content/dataset/cat.0.jpg"
indices_list = compute_similar_images(TEST_IMAGE_PATH, NUM_IMAGES, embedding, device)
plot_similar_images(indices_list)

犬の例

In [None]:
TEST_IMAGE_PATH = "/content/dataset/dog.0.jpg"
indices_list = compute_similar_images(TEST_IMAGE_PATH, NUM_IMAGES, embedding, device)
plot_similar_images(indices_list)