In [None]:
import torch

# Kiểm tra số lượng GPU khả dụng
print("Số lượng GPU khả dụng:", torch.cuda.device_count())

# Kiểm tra tên GPU
for i in range(torch.cuda.device_count()):
    print(f"GPU {i}: {torch.cuda.get_device_name(i)}")

In [None]:
!pip install openai-clip


In [None]:
import sys
sys.path.append("/kaggle/input/test-n")

import other_utils
import ddpm_utils
import UNet_utils

# 5. CLIP

Contrastive Language-Image Pre-Training or [CLIP](https://github.com/openai/CLIP/tree/main) is a text and image encoding tool used with many popular Generative AI models such as [DALL-E](https://openai.com/dall-e-2) and [Stable Diffusion](https://github.com/Stability-AI/stablediffusion).

CLIP in itself is not a Generative AI model, but is instead used to align text encodings with image encodings. If there is such a thing as the perfect text description of an image, the goal of CLIP is to create the same vector embedding for both the image and the text. Let's see what this means in practice.

The goals of this notebook are to:
* Learn how to use CLIP Encodings
  * Get an image encoding
  * Get a text encoding
  * Calculate the cosine similarity between them
* Use CLIP to create a text-to-image neural network

## 5.1 Encodings

First, let's load the libraries needed for this exercise.

In [24]:
import csv
import glob
import numpy as np
import torch
import torch.nn.functional as F
from torch.optim import Adam
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

# Visualization tools
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.utils import save_image, make_grid
from textwrap import wrap

There are a few different variations of CLIP based on popular image recognition neural networks:

In [25]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
import clip
print(clip.available_models())  # Kiểm tra danh sách mô hình CLIP có sẵn
clip_model, clip_preprocess = clip.load("ViT-B/32")

In [None]:

clip_model.eval()
CLIP_FEATURES = 512

For this notebook, we will be using `ViT-B/32`, which is based on the [Vision Transformer](https://huggingface.co/docs/transformers/main/model_doc/vit) architecture. It has `512` features, which we will later feed into our diffusion model.

### 5.1.1 Image Encodings

When we load CLIP, it will also come with a set of image transformations we can use to feed images into the CLIP model:

In [None]:
clip_preprocess

We can test this on one of our flower photos. Let's start with a picturesque daisy.

In [None]:
DATA_DIR = "/kaggle/input/hoahoa/flower_photos/"
img_path = DATA_DIR + "daisy/2877860110_a842f8b14a_m.jpg"
img = Image.open(img_path)
img.show()

We can find the CLIP embedding by first transforming our image with `clip_preprocess` and converting the result to a tensor. Since the `clip_model` expects a batch of images, we can use [np.stack](https://numpy.org/doc/stable/reference/generated/numpy.stack.html) to turn the processed image into a single element batch.

In [None]:
clip_imgs = torch.tensor(np.stack([clip_preprocess(img)])).to(device)
clip_imgs.size()

Then, we can pass the batch to `clip_model.encode_image` to find the embedding for the image. Uncomment `clip_img_encoding` if you would like to see what an encoding looks like. When we print the size, it lists `512` features for our `1` image.

In [None]:
clip_img_encoding = clip_model.encode_image(clip_imgs)
print(clip_img_encoding.size())
#clip_img_encoding

### 5.1.2 Text Encodings

Now that we have an image encoding, let's see if we can get a matching text encoding. Below is a list of different flower descriptions. Like with the images, the text needs to be preprocessed before it can be encoded by CLIP. To do this, CLIP comes with a `tokenize` function in order to convert each word into an integer.

In [None]:
text_list = [
    "A white daisy with a yellow center",
    "An orange sunflower with a big brown center",
    "A red rose bud"
]
text_tokens = clip.tokenize(text_list).to(device)
text_tokens

Then, we can pass the tokens to `encode_text` to get our text encodings. Uncomment `clip_text_encodings` if you would like to see what an encoding looks like. Similar to our image encoding, there are `512` features for each of our `3` images.

In [None]:
clip_text_encodings = clip_model.encode_text(text_tokens).float()
print(clip_text_encodings.size())
#clip_text_encodings

### 5.1.3 Similarity

In order to see which one of our text descriptions best describes the daisy, we can calculate the [cosine similarity](https://medium.com/@milana.shxanukova15/cosine-distance-and-cosine-similarity-a5da0e4d9ded) between the text encodings and the image encodings. When the cosine similarity is `1`, it's a perfect match. When the cosine similarity is `-1`, the two encodings are opposites.

The cosine similarity is equivalent to a [dot product](https://mathworld.wolfram.com/DotProduct.html) with each vector normalized by their magnitude. In other words, the magnitude of each vector becomes `1`.

We can use the following formula to calculate the dot product:

$X \cdot Y = \sum_{i=1}^{n} x_i y_i = x_1y_1 + x_2 y_2 + \cdots  + x_n y_n$

In [None]:
clip_img_encoding /= clip_img_encoding.norm(dim=-1, keepdim=True)
clip_text_encodings /= clip_text_encodings.norm(dim=-1, keepdim=True)
similarity = (clip_text_encodings * clip_img_encoding).sum(-1)
similarity

What do you think? Does the most descriptive text get the highest score?

In [None]:
for idx, text in enumerate(text_list):
    print(text, " - ", similarity[idx])

Let's practice a little more. Below, we've added a sunflower and a rose image.

In [None]:
img_paths = [
    DATA_DIR + "daisy/2877860110_a842f8b14a_m.jpg",
    DATA_DIR + "sunflowers/2721638730_34a9b7a78b.jpg",
    DATA_DIR + "roses/8032328803_30afac8b07_m.jpg"
]

imgs = [Image.open(path) for path in img_paths]
for img in imgs:
    img.show()

**TODO**: The below `get_img_encodings` function is riddled with `FIXMEs`. Please replace each `FIXME` with the appropriate code to generate CLIP encodings from PIL images.

Click the `...` for an answer.

In [None]:
def get_img_encodings(imgs):
    processed_imgs = [FIXME(img) for img in imgs]
    clip_imgs = torch.tensor(np.stack(FIXME)).to(device)
    clip_img_encodings = FIXME.encode_image(clip_imgs)
    return clip_img_encodings

In [None]:
def get_img_encodings(imgs):
    processed_imgs = [clip_preprocess(img) for img in imgs]
    clip_imgs = torch.tensor(np.stack(processed_imgs)).to(device)
    clip_img_encodings = clip_model.encode_image(clip_imgs)
    return clip_img_encodings

In [None]:
clip_img_encodings = get_img_encodings(imgs)
clip_img_encodings

**TODO**: Find text that describes the above images well and will result in a high similarity score. After calculating the similarity score, feel free to repeat this exercise and modify. We will be using this text list again later.

Click the `...` for an example.

In [None]:
# text_list = [
#     "A daisy",
#     "A sunflower",
#     "A rose"
# ]

```python
text_list = [
    "A white daisy with a yellow center",
    "An orange sunflower with a big brown center",
    "A deep red rose flower"
]
```

In [None]:
# text_tokens = clip.tokenize(text_list).to(device)
# clip_text_encodings = clip_model.encode_text(text_tokens).float()
# clip_text_encodings

It would be nice to compare each combination of text and image. To do so, we can [repeat](https://pytorch.org/docs/stable/generated/torch.Tensor.repeat.html#torch.Tensor.repeat) each text encoding for each image encoding. Similarly, we can [repeat_interleave](https://pytorch.org/docs/stable/generated/torch.repeat_interleave.html) each image encoding for each text encoding.

In [None]:
clip_img_encodings /= clip_img_encodings.norm(dim=-1, keepdim=True)
clip_text_encodings /= clip_text_encodings.norm(dim=-1, keepdim=True)

n_imgs = len(imgs)
n_text = len(text_list)

In [None]:
# repeated_clip_text_encodings = clip_text_encodings.repeat(n_imgs, 1)
# repeated_clip_text_encodings

In [None]:
repeated_clip_img_encoding = clip_img_encodings.repeat_interleave(n_text, dim=0)
repeated_clip_img_encoding

In [None]:
# similarity = (repeated_clip_text_encodings * repeated_clip_img_encoding).sum(-1)
# similarity = torch.unflatten(similarity, 0, (n_text, n_imgs))
# similarity

Let's compare. Ideally, the diagonal from the top left to the bottom right should be a bright yellow corresponding to their high value. The rest of the values should be low and blue.

In [None]:
# fig = plt.figure(figsize=(10, 10))
# gs = fig.add_gridspec(2, 3, wspace=.1, hspace=0)

# for i, img in enumerate(imgs):
#     ax = fig.add_subplot(gs[0, i])
#     ax.axis("off")
#     plt.imshow(img)

# ax = fig.add_subplot(gs[1, :])
# plt.imshow(similarity.detach().cpu().numpy().T, vmin=0.1, vmax=0.3)

# labels = [ '\n'.join(wrap(text, 20)) for text in text_list ]
# plt.yticks(range(n_text), labels, fontsize=10)
# plt.xticks([])

# for x in range(similarity.shape[1]):
#     for y in range(similarity.shape[0]):
#         plt.text(x, y, f"{similarity[x, y]:.2f}", ha="center", va="center", size=12)

## 5.2 A CLIP Dataset

In the previous notebook, we used the flower category as the label. This time, we're going to use CLIP encodings as our label.

If the goal of CLIP is to align text encodings with image encodings, do we need a text description for each of the images in our dataset? Hypothesis: we do not need text descriptions and only need the image CLIP encodings to create a text-to-image pipeline.

To test this out, let's add the CLIP encodings as the "label" to our dataset. Running CLIP on each batch of data augmented images would be more accurate, but it is also slower. We can speed things up by preprocessing and storing the encodings ahead of time.

We can use [glob](https://docs.python.org/3/library/glob.html) to list all of our image filepaths:

In [None]:
data_paths = glob.glob(DATA_DIR + '*/*.jpg', recursive=True)
data_paths[:5]

The next code block runs the following loop for each filepath:
* Open the image associated with the path and store it in `img`
* Preprocess the image, find the CLIP encoding, and store it in `clip_img`
* Convert the CLIP encoding from a tensor to a python list
* Store the filepath and the CLIP encoding as a row in a csv file

In [None]:
csv_path = 'clip.csv'

with open(csv_path, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    for idx, path in enumerate(data_paths):
        img = Image.open(path)
        clip_img = torch.tensor(np.stack([clip_preprocess(img)])).to(device)
        label = clip_model.encode_image(clip_img)[0].tolist()
        writer.writerow([path] + label)

It may take a few seconds to process the full dataset. When complete, open [clip.csv](clip.csv) to see the results.

We can use the same image transformations as we did with the other notebook:

In [26]:
IMG_SIZE = 32 # Due to stride and pooling, must be divisible by 2 multiple times
IMG_CH = 3
BATCH_SIZE = 128*4
INPUT_SIZE = (IMG_CH, IMG_SIZE, IMG_SIZE)

pre_transforms = [
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),  # Scales data into [0,1]
    transforms.Lambda(lambda t: (t * 2) - 1)  # Scale between [-1, 1]
]
pre_transforms = transforms.Compose(pre_transforms)
random_transforms = [
    transforms.RandomCrop(IMG_SIZE),
    transforms.RandomHorizontalFlip(),
]
random_transforms = transforms.Compose(random_transforms)

Below is the code to initialize our new dataset. Since we've `preprocessed_clip`, we will preload it onto our GPU with the `__init__` function. We've kept the "on the fly" CLIP encoding as an example. It will produce slightly better results, but it is much slower.

In [None]:
#dọc được link ảnh
#dọc được text của ảnh đó
#chuyển nó thành dạng (C,T,D)

In [36]:
from transformers import CLIPProcessor, CLIPModel
# Hàm chuyển từ chữ thành vectoembedingembeding
import torch
from transformers import CLIPProcessor, CLIPModel
DIR = 'naruto_images/'
list_texts = []
list_imgs = []
file_path = 'captions.json'
device = "cuda" if torch.cuda.is_available() else "cpu"

# Văn bản đầu vào
text = ["A dog playing in the parkwg g rf"]


# 🔥 Dùng transformers (cho (B, T, 512))
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model_hf = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
# Hàm khởi tạo embeding.
def clip_encode(text, processor, device):
    inputs = processor(
        text=text, 
        return_tensors="pt", 
        padding="max_length",  # Đồng nhất T
        truncation=True, 
        max_length=20, 
        return_attention_mask=True  # Tạo mask
        ).to(device)
    with torch.no_grad():
        outputs = clip_model_hf.text_model(**inputs)

    text_features_hf = outputs.last_hidden_state  # (B, T, 512)
    #K * attention_mask.unsqueeze(-1)
    return text_features_hf * inputs["attention_mask"].unsqueeze(-1) #attention_mask
fea= clip_encode(text, processor, device)
print(fea)

tensor([[[ 0.3393,  0.1165,  0.1020,  ...,  0.2468,  0.5906,  0.1013],
         [ 1.9753, -0.5844,  0.3685,  ...,  1.1658,  0.8050, -0.9801],
         [-0.1366, -0.4112,  1.4093,  ...,  0.2005,  0.2695, -1.4270],
         ...,
         [ 0.0000,  0.0000,  0.0000,  ..., -0.0000,  0.0000,  0.0000],
         [ 0.0000,  0.0000,  0.0000,  ..., -0.0000,  0.0000,  0.0000],
         [ 0.0000,  0.0000,  0.0000,  ..., -0.0000,  0.0000,  0.0000]]],
       device='cuda:0')


In [None]:
import json
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
# Hàm chuyển từ chữ thành vectoembedingembeding
import torch
from transformers import CLIPProcessor, CLIPModel
DIR = 'naruto_images/'
file_path = 'captions.json'
device = "cuda" if torch.cuda.is_available() else "cpu"

# Văn bản đầu vào
text = ["A dog playing in the park"]


# 🔥 Dùng transformers (cho (B, T, 512))
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model_hf = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
# Hàm khởi tạo embeding.
def clip_encode(text, processor, device):
    inputs = processor(
        text=text, 
        return_tensors="pt", 
        padding="max_length",  # Đồng nhất T
        truncation=True, 
        max_length=20, 
        return_attention_mask=True  # Tạo mask
        ).to(device)
    with torch.no_grad():
        outputs = clip_model_hf.text_model(**inputs)

    text_features_hf = outputs.last_hidden_state  # (B, T, 512)
    #K * attention_mask.unsqueeze(-1)
    return text_features_hf * inputs["attention_mask"].unsqueeze(-1)
#build đoc dataset
class MyDataset(Dataset):
    def __init__(self, file_path, preprocessed_clip=True):
        self.imgs = []
        self.labels = []
        self.preprocessed_clip = preprocessed_clip
        
        with open(file_path,'r',encoding='utf-8' ) as file:
            reader = json.load(file)
            for item in reader:
                img = Image.open(DIR+item['image']).convert('RGB')
                self.imgs.append(pre_transforms(img).to(device))
                if preprocessed_clip:
                    label = clip_encode(item['caption'],processor, device).to(device)
                    self.labels.append(label)

    def __getitem__(self, idx):
        img = random_transforms(self.imgs[idx])
        if self.preprocessed_clip:
            label = self.labels[idx]
        else:
            batch_img = img[None, :, :, :]
            encoded_imgs = clip_model.encode_image(clip_preprocess(batch_img))
            label = encoded_imgs.to(device).float()[0]
        return img, label

    def __len__(self):
        return len(self.imgs)

In [40]:
train_data = MyDataset(file_path)
dataloader = DataLoader(train_data, batch_size=1, shuffle=True, drop_last=True)

In [41]:
images, labels = next(iter(dataloader))
print(images.shape)  # Kích thước của batch ảnh
print(labels.shape)  # Kích thước của batch caption embedding


torch.Size([1, 3, 32, 32])
torch.Size([1, 1, 20, 512])


In [None]:
T = 400
B_start = 0.0001
B_end = 0.02
B = torch.linspace(B_start, B_end, T).to(device)

ddpm = ddpm_utils.DDPM(B, device)
model = UNet_utils.UNet(
    T, IMG_CH, IMG_SIZE, down_chs=(256, 256, 512), t_embed_dim=8, c_embed_dim=CLIP_FEATURES
)
print("Num params: ", sum(p.numel() for p in model.parameters()))
# model_flowers = model.to(device)
# Kiểm tra nếu có GPU
if torch.cuda.is_available() and torch.cuda.device_count() > 1:
    print("Sử dụng 2 GPU!")
    model = nn.DataParallel(model)  # Sử dụng cả 2 GPU
model_flowers = model
# Chuyển mô hình lên GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)



The `get_context_mask` function will change a little bit. Since we're replacing our categorical input with a CLIP embedding, we no longer need to one-hot encode our label. We'll still randomly set values in our encoding to `0` to help the model learn without context.

In [None]:
def get_context_mask(c, drop_prob):
    c_mask = torch.bernoulli(torch.ones_like(c).float() - drop_prob).to(device)
    return c_mask

Let's also recreate the `sample_flowers` function. This time, it will take our `text_list` as a parameter and convert it to a CLIP encoding. The `sample_w` function remains mostly the same and has been moved to the bottom of [ddpm_utils.py](utils/ddpm_utils.py).

In [None]:
def sample_flowers(text_list):
    text_tokens = clip.tokenize(text_list).to(device)
    c = clip_model.encode_text(text_tokens).float()
    x_gen, x_gen_store = ddpm_utils.sample_w(model, ddpm, INPUT_SIZE, T, c, device)
    return x_gen, x_gen_store

Time to get training! After about `50` `epochs`, the model will start generating something recognizable, and at `100` it will hit its stride. What do you think? Do the generated images match your descriptions?

In [None]:
epochs= 2000
c_drop_prob = 0.1
lrate = 1e-4
save_dir = "/kaggle/working/"

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=lrate)

model.train()
for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
        optimizer.zero_grad()
        t = torch.randint(0, T, (BATCH_SIZE,), device=device).float()
        x, c = batch
        c_mask = get_context_mask(c, c_drop_prob)
        loss = ddpm.get_loss(model_flowers, x, t, c, c_mask)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch} | Step {step:03d} | Loss: {loss.item()}")
    # if epoch % 5 == 0 or epoch == int(epochs - 1):
    #     x_gen, x_gen_store = sample_flowers(text_list)
    #     grid = make_grid(x_gen.cpu(), nrow=len(text_list))
    #     save_image(grid, save_dir + f"image_ep{epoch:02}.png")
    #     print("saved images in " + save_dir + f" for episode {epoch}")

Now that the model is trained, let's play with it! What happens when we give it a prompt of something not in the dataset? Or can you craft the perfect prompt to generate an image you can imagine?

The art of crafting a prompt to get the results you desire is called **prompt engineering**, and as shown here, is dependent on the kind of data the model is trained on.

In [None]:
# Change me
text_list = [
    "A daisy ",
    "A sunflower shine",
    "A rose beautiful"
]
model.eval()
x_gen, x_gen_store = sample_flowers(text_list)
grid = make_grid(x_gen.cpu(), nrow=len(text_list))
other_utils.show_tensor_image([grid])
plt.show()


Once you've found a set of images you enjoy, run the below cell to turn it into an animation. It will be saved to [05_images/flowers.gif](05_images/flowers.gif)

In [None]:
torch.save(model_flowers.state_dict(), "/kaggle/working/model_flowers.pth")


In [None]:
grids = [other_utils.to_image(make_grid(x_gen.cpu(), nrow=len(text_list))) for x_gen in x_gen_store]
other_utils.save_animation(grids, "/kaggle/working/flowers.gif")