# Initialization

In [None]:
! pip install ftfy regex tqdm
! pip install git+https://github.com/openai/CLIP.git

import numpy as np
import torch
import torch.nn as nn
import torchvision
from pkg_resources import packaging
import matplotlib.pyplot as plt
import clip
from PIL import Image
import os
from glob import glob
from tqdm import tqdm

In [None]:
# load the CLIP model and freeze all its parameters

model, preprocess = clip.load("RN50x4")
model.eval()
model.trainable = False
m = model.requires_grad_(False)

100%|███████████████████████████████████████| 402M/402M [00:04<00:00, 86.4MiB/s]


In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

# Download some images from drive and classify them in folders

!wget https://docs.google.com/uc?id=1jP081zYPv1AKVwrOg2UZFlU_nPd8lLS5 -O /content/trump1.zip
!wget https://docs.google.com/uc?id=1yUDmA07-519KOpTEHbEebZR8SLrcVhKC -O /content/trump2.zip
!wget https://docs.google.com/uc?id=196fkPNzgoayxlTg06yEf43YO13QLO2YG -O /content/usa.zip
!wget https://docs.google.com/uc?id=1pbkUuPjVtAr6EKJaadBAqk6l6Y-xodZd -O /content/flags.zip
!wget https://docs.google.com/uc?id=1eZS_7seR9j7iSWrKINPfKI9fhKLElVpw -O /content/dogs.zip

!rm -r /content/trump1
!mkdir /content/trump1                              # unzip trump1.zip to new trump1 directory
!unzip /content/trump1.zip -d /content/trump1
trump_files1 = glob(os.path.join('/content/trump1', "**"))   # get the paths of all files in trump1 directory

!rm -r /content/trump2
!mkdir /content/trump2
!unzip /content/trump2.zip -d /content/trump2
trump_files2 = glob(os.path.join('/content/trump2', "**"))

!rm -r /content/usa
!mkdir /content/usa
!unzip /content/usa.zip -d /content/usa
usa_files = glob(os.path.join('/content/usa', "**"))

!rm -r /content/flags
!mkdir /content/flags
!unzip /content/flags.zip -d /content/flags
flags_files = glob(os.path.join('/content/flags', "**"))

!rm -r /content/dogs
!mkdir /content/dogs
!unzip /content/dogs.zip -d /content/dogs
dogs_files = glob(os.path.join('/content/dogs', "**"))

#!rm -r /content/myanimals
#!mkdir /content/myanimals
#!unzip /content/drive/MyDrive/myanimals.zip -d /content/myanimals
#myanimals_files = glob(os.path.join('/content/myanimals/myanimals', "**"))

--2024-03-04 14:31:04--  https://docs.google.com/uc?id=1jP081zYPv1AKVwrOg2UZFlU_nPd8lLS5
Resolving docs.google.com (docs.google.com)... 142.251.2.101, 142.251.2.100, 142.251.2.139, ...
Connecting to docs.google.com (docs.google.com)|142.251.2.101|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1jP081zYPv1AKVwrOg2UZFlU_nPd8lLS5 [following]
--2024-03-04 14:31:04--  https://drive.usercontent.google.com/download?id=1jP081zYPv1AKVwrOg2UZFlU_nPd8lLS5
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 142.250.101.132, 2607:f8b0:4023:c06::84
Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|142.250.101.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 475559 (464K) [application/octet-stream]
Saving to: ‘/content/trump1.zip’


2024-03-04 14:31:06 (3.79 MB/s) - ‘/content/trump1.zip’ saved [475559/475559]

--2024-03-04 14:31:06--  https://do

# My model

In [None]:
# Create a model that is the same as clip, having removed its last layers.
# Its output is the input of the last convolution layer, whose activations we want to test
# The code for the original CLIP model can be found in https://github.com/openai/CLIP/blob/main/clip/model.py

from collections import OrderedDict

class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, final = False):
        super().__init__()

        # all conv layers have stride 1. an avgpool is performed after the second convolution when stride > 1
        self.conv1 = nn.Conv2d(inplanes, planes, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu1 = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(planes, planes, 3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.relu2 = nn.ReLU(inplace=True)

        self.avgpool = nn.AvgPool2d(stride) if stride > 1 else nn.Identity()

        if not final:
          self.conv3 = nn.Conv2d(planes, planes * self.expansion, 1, bias=False)
          self.bn3 = nn.BatchNorm2d(planes * self.expansion)
          self.relu3 = nn.ReLU(inplace=True)

        self.downsample = None
        self.stride = stride
        self.final = final

        if stride > 1 or inplanes != planes * Bottleneck.expansion:
            # downsampling layer is prepended with an avgpool, and the subsequent convolution has stride 1
            self.downsample = nn.Sequential(OrderedDict([
                ("-1", nn.AvgPool2d(stride)),
                ("0", nn.Conv2d(inplanes, planes * self.expansion, 1, stride=1, bias=False)),
                ("1", nn.BatchNorm2d(planes * self.expansion))
            ]))

    def forward(self, x: torch.Tensor):
        identity = x

        out = self.relu1(self.bn1(self.conv1(x)))
        out = self.relu2(self.bn2(self.conv2(out)))
        out = self.avgpool(out)

        if not self.final:
          out = self.bn3(self.conv3(out))

          if self.downsample is not None:
            identity = self.downsample(x)

          out += identity
          out = self.relu3(out)
        return out

class MyModifiedResNet(nn.Module):    # I have removed some final layers, up to before the last convolution
    """
    A ResNet class that is similar to torchvision's but contains the following changes:
    - There are now 3 "stem" convolutions as opposed to 1, with an average pool instead of a max pool.
    - Performs anti-aliasing strided convolutions, where an avgpool is prepended to convolutions with stride > 1
    - The final pooling layer is a QKV attention instead of an average pool
    """

    def __init__(self, layers, input_resolution=224, width=64):
        super().__init__()
        self.input_resolution = input_resolution

        # the 3-layer stem
        self.conv1 = nn.Conv2d(3, width // 2, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(width // 2)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(width // 2, width // 2, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(width // 2)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv3 = nn.Conv2d(width // 2, width, kernel_size=3, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(width)
        self.relu3 = nn.ReLU(inplace=True)
        self.avgpool = nn.AvgPool2d(2)

        # residual layers
        self._inplanes = width  # this is a *mutable* variable used during construction
        self.layer1 = self._make_layer(width, layers[0])
        self.layer2 = self._make_layer(width * 2, layers[1], stride=2)
        self.layer3 = self._make_layer(width * 4, layers[2], stride=2)
        self.layer4 = self._make_layer(width * 8, layers[3], stride=2, final = True)

    def _make_layer(self, planes, blocks, stride=1, final = False):
        layers = [Bottleneck(self._inplanes, planes, stride)]

        self._inplanes = planes * Bottleneck.expansion

        if final:
          blocks -= 1
        for _ in range(1, blocks):
            layers.append(Bottleneck(self._inplanes, planes))

        if final:
          layers.append(Bottleneck(self._inplanes, planes, final = True))

        return nn.Sequential(*layers)

    def forward(self, x):
        def stem(x):
            x = self.relu1(self.bn1(self.conv1(x)))
            x = self.relu2(self.bn2(self.conv2(x)))
            x = self.relu3(self.bn3(self.conv3(x)))
            x = self.avgpool(x)
            return x

        x = x.type(self.conv1.weight.dtype)
        x = stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        return x

In [None]:
mymodel = MyModifiedResNet([4,6,10,6], width = 80)   # these are the parameters used in CLIP RN50x4
mymodel.load_state_dict(model.visual.state_dict(), strict = False, assign = True)    # copy model parameters
mymodel.cuda()
mymodel.trainable = False       # freeze mymodel parameters, and enter evaluation mode
mymodel.eval()
m = mymodel.requires_grad_(False)

In [None]:
convlayer = model.visual.layer4[5].conv3.half()
param_tensor = list(convlayer.parameters())[0]   # get last conv layer parameters
print(param_tensor.shape)
param_tensor = param_tensor.mean(dim = (2,3))    # set their dimension to 2560x640, we have 2560 filters of dimension 640 (x1x1)
print(param_tensor.shape)

torch.Size([2560, 640, 1, 1])
torch.Size([2560, 640])


# Maximizing text

In [None]:
# Test the tokenizer

my_tokenizer = clip.clip._tokenizer

txt_input = clip.tokenize(["A photo of clouds"]).cuda()
my_list = [i.item() for i in txt_input[0]]
res = my_tokenizer.decode(my_list)
start = "<|startoftext|>"
end = " <|endoftext|>"
result = res[res.find(start)+len(start):res.rfind(end)]      # get the text between the first <startoftext> and the last <endoftext>

print(txt_input)
print(txt_input.argmax(dim=-1))
result

tensor([[49406,   320,  1125,   539,  6244, 49407,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0]], device='cuda:0',
       dtype=torch.int32)
tensor([5], device='cuda:0')


'a photo of clouds'

In [None]:
my_tokenizer = clip.clip._tokenizer

photo = [preprocess(Image.open("/content/trump1/im-799917.jfif"))]  # select an image and encode it, this will serve as the optimization target
im_input = torch.stack(photo).cuda()
#im_input = input.clone().detach()
target = model.encode_image(im_input)
target /= target.norm(dim=-1, keepdim=True)

In [None]:
# Train on input

random_seed = 20
num_epochs = 20   # how many optim epochs to take
iter = 100   # how many iterations at each epoch
learning_rate = 2


def get_input_emb(text = "Donald Trump"):    # Get token_input and input embedding from text (the input will be used for training)
  with torch.no_grad():
    token_input = clip.tokenize([text]).cuda()
    input2 = model.token_embedding(token_input).type(model.dtype)
    input2.requires_grad_(True)  # input will be used for training
  return token_input, input2

token_input, input2 = get_input_emb("This is a sentence")
start_input = input2.clone().detach()

def my_encode_text(text_embedding):  # my_encode_text is the same as the model's encode_text, but it gets as input the text_embedding instead of the text tokens
  x = text_embedding + model.positional_embedding.type(model.dtype)
  x = x.permute(1, 0, 2)  # NLD -> LND
  x = model.transformer(x)
  x = x.permute(1, 0, 2)  # LND -> NLD
  x = model.ln_final(x).type(model.dtype)
  x = x[torch.arange(x.shape[0]), token_input.argmax(dim=-1)] @ model.text_projection
  x = x / x.norm(dim=-1, keepdim=True)
  return x

def get_nearest(input):     # Get the nearest token to input embedding
  nearest = []
  for xi in input[0]:
    distance = torch.norm(model.token_embedding.weight.data - xi, dim=1)   # this tensor holds the distance of each token embedding to the embedding xi of our input
    nearest.append(torch.argmin(distance))                                 # the closest token indices are stored in nearest list
  nearest = torch.Tensor(nearest).type(torch.int32)

  res = my_tokenizer.decode(nearest.tolist())           # decode the tokens to corresponding text to display it
  start = "<|startoftext|>"
  end = "<|endoftext|>"
  result = res[res.find(start)+len(start):res.rfind(end)]
  return result

losses = []     # store the losses at the end of each epoch and the text results
results = []
my_tqdm = tqdm(range(num_epochs))
for step in my_tqdm:
  torch.manual_seed(step * random_seed)           # get a different seed at each iteration to prevent repeating the same steps
  my_lr = learning_rate #* (1 - step / num_steps / 2)
  optim = torch.optim.SGD([input2], lr=my_lr)
  for _ in range(iter):
    output = my_encode_text(input2)     # get model output
    loss = - output @ target.T          # loss is minus inner product of output and target, as we want them to be as similar as possible (they have already been normalized)
    loss.backward(retain_graph=True)    # retain graph to solve some errors during runtime
    optim.step()
    optim.zero_grad()
    with torch.no_grad():
      input2[0][0] = start_input[0][0]     # freeze all input weights expect for tokens 1-4, in this way, we end up only training tokens 1-4
      input2[0][5:] = start_input[0][5:]
  result = get_nearest(input2)
  token_input, input2 = get_input_emb(result)
  loss = (- my_encode_text(input2) @ target.T).item()
  losses.append(loss)
  results.append(result)
  my_tqdm.set_postfix(loss = loss, result = result)

100%|██████████| 20/20 [00:42<00:00,  2.14s/it, loss=-0.286, result=<|startoftext|>appetdrainthe(#]


In [None]:
a = [x + str(y) for y, x in sorted(list(set(zip(losses, results))))]  # print the text result and the loss at the end of each epoch, sorted by increasing loss
sorted_results = [x for _, x in sorted(list(set(zip(losses, results))))]
a

['realdonaldtrump <|startoftext|><|startoftext|>parkrun -0.343017578125',
 'nafta <|startoftext|><|startoftext|>malaria -0.340576171875',
 'barron <|startoftext|>clusive <|startoftext|>-0.322021484375',
 'gopdebate solar pper <|startoftext|>-0.3134765625',
 'comey mariano wolfpack <|startoftext|>-0.30712890625',
 '🇸🇪 reveals ludwig <|startoftext|>-0.30224609375',
 '<|startoftext|><|startoftext|>recruiting <|startoftext|>-0.298095703125',
 'desk <|startoftext|>striking supposed -0.296875',
 'hungover clown harvey <|startoftext|>-0.296875',
 'defeat lulu <|startoftext|><|startoftext|>-0.28955078125',
 '<|startoftext|><|startoftext|><|startoftext|>canelo -0.2880859375',
 '<|startoftext|>appetdrainthe(# -0.2861328125',
 'melania 🇫🇷 <|startoftext|>sculpting -0.284912109375',
 '🔴poche dex <|startoftext|>-0.281005859375',
 "hungover sympathi:'penny -0.280029296875",
 'rameshvettel benghazi cobra -0.278076171875',
 'saffron 📷@ myo<|startoftext|>-0.270751953125',
 'hungover arias barclay �-0.26

In [None]:
# Here, we compare our result to the result of the result from an optimal text that describes the image

t0 = model.encode_text(clip.tokenize([sorted_results[0]]).cuda())
t0 /= t0.norm(dim=-1, keepdim=True)
my_result = torch.norm(t0 @ target.T).item()

t1 = model.encode_text(clip.tokenize(["Donald Trump"]).cuda())
t1 /= t1.norm(dim=-1, keepdim=True)
opt_result = torch.norm(t1 @ target.T).item()

print(my_result, opt_result)

0.343017578125 0.3359375
