## Install an Library and Import Packages

In [1]:
!pip install coremltools

Collecting coremltools
  Downloading coremltools-7.2-cp310-none-manylinux1_x86_64.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
Collecting cattrs (from coremltools)
  Downloading cattrs-23.2.3-py3-none-any.whl (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.5/57.5 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyaml (from coremltools)
  Downloading pyaml-24.4.0-py3-none-any.whl (24 kB)
Installing collected packages: pyaml, cattrs, coremltools
Successfully installed cattrs-23.2.3 coremltools-7.2 pyaml-24.4.0


In [2]:
import os
import json
import numpy as np
from google.colab import drive
from torchvision.models import vit_b_32, ViT_B_32_Weights
from transformers import BertTokenizer
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
import coremltools as ct



In [3]:
drive.mount('/content/drive')

Mounted at /content/drive


## Load Decoder Model

### Define structure of the model

In [15]:
def create_positional_encoding(max_length, d_model):
    assert d_model % 2 == 0, "Dimension model must be even"

    pos = torch.arange(0, max_length).unsqueeze(1) # (max_length, 1)
    pos_expanded = pos.repeat(1, d_model // 2) # (max_length, d_model // 2)

    power = torch.arange(0, d_model, 2).float() / d_model
    div_term = torch.pow(10000, power).unsqueeze(0) # (1, d_model // 2)
    div_term_expanded = div_term.repeat(max_length, 1)  # (max_length, d_model // 2)

    pe = torch.zeros(max_length, d_model) # (max_length, d_model)
    pe[:, 0::2] = torch.sin(pos_expanded / div_term_expanded) # (max_length, d_model // 2)
    pe[:, 1::2] = torch.cos(pos_expanded / div_term_expanded) # (max_length, d_model // 2)

    return pe

In [16]:
class Embedding(nn.Module):
  def __init__(self, vocab_size, max_length, d_model):
    super(Embedding, self).__init__()
    self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)
    self.pos_encoding = create_positional_encoding(max_length, d_model).to(device)

  def forward(self, x):
    """ Apply embedding and positional encoding to the input

    Input:
      x: (N, seq_length)
    Output:
      x: (N, seq_length, d_model)
    """
    # apply embedding
    x = self.embedding(x)
    # apply positional encoding
    x += self.pos_encoding[:x.size(1)]
    return x

In [17]:
device='cpu'

In [18]:
mask_size = 32
causal_mask = torch.zeros(mask_size, mask_size)
for i in range(mask_size):
    for j in range(mask_size):
        if i < j:
            causal_mask[i, j] = float('-inf')


causal_mask = causal_mask.to(device)
print(causal_mask)

tensor([[0., -inf, -inf,  ..., -inf, -inf, -inf],
        [0., 0., -inf,  ..., -inf, -inf, -inf],
        [0., 0., 0.,  ..., -inf, -inf, -inf],
        ...,
        [0., 0., 0.,  ..., 0., -inf, -inf],
        [0., 0., 0.,  ..., 0., 0., -inf],
        [0., 0., 0.,  ..., 0., 0., 0.]])


In [19]:
class CausalSelfAttention(nn.Module):
  def __init__(self, d_model, num_heads, dropout):
    super(CausalSelfAttention, self).__init__()
    self.mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True)
    self.layer_norm = nn.LayerNorm(d_model)


  def forward(self, x):
    attn_output, _ = self.mha(query=x, key=x, value=x, attn_mask=causal_mask, is_causal=True)
    x = self.layer_norm(x + attn_output)
    return x

In [20]:
class CrossAttention(nn.Module):
  def __init__(self, d_model, num_heads, dropout):
    super(CrossAttention, self).__init__()
    self.mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True)
    self.layer_norm = nn.LayerNorm(d_model)


  def forward(self, x, image_embedding):
    attn_output, _ = self.mha(query=x, key=image_embedding, value=image_embedding)
    x = self.layer_norm(x + attn_output)
    return x

In [21]:
class FeedForward(nn.Module):
  def __init__(self, d_model, dropout):
    super(FeedForward, self).__init__()
    self.seq = nn.ModuleList([
        nn.Linear(d_model, 2 * d_model),
        nn.ReLU(),
        nn.Linear(2 * d_model, d_model),
        nn.Dropout(dropout)
    ])

    self.layernorm = nn.LayerNorm(d_model)

  def forward(self, x):
    original_x = x
    for layer in self.seq:
      x = layer(x)
    out = x + original_x
    return self.layernorm(out)

In [22]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, dropout):
    super(DecoderLayer, self).__init__()
    self.self_attention = CausalSelfAttention(d_model=d_model, num_heads=num_heads, dropout=dropout)
    self.cross_attention = CrossAttention(d_model=d_model, num_heads=num_heads, dropout=dropout)
    self.ff = FeedForward(d_model=d_model, dropout=dropout)

  def forward(self, inputs):
    x, image_embedding = inputs
    x = self.self_attention(x)
    x = self.cross_attention(x, image_embedding)
    x = self.ff(x)
    return x

In [23]:
class Lastlayer(nn.Module):
  def __init__(self, units, vocab_size):
    super(Lastlayer, self).__init__()
    self.linear = nn.Linear(units, vocab_size)

  def forward(self, x):
    x = self.linear(x)
    return x

In [24]:
class Captioner(nn.Module):
  def __init__(self, vocab_size, max_length, d_model, num_heads, dropout):
    super(Captioner, self).__init__()
    self.seq_embedding = Embedding(vocab_size, max_length, d_model)
    self.decoder_layers = nn.ModuleList([
        DecoderLayer(d_model, num_heads, dropout)
    ])
    self.last_layer = Lastlayer(d_model, vocab_size)

    self.linear = nn.Linear(768, d_model)

  def forward(self, image_embeddings, txt):
    """
    inputs:
      txt: (N, max_length)
      image_embeddings: (N, 7, 7, 576)
    """
    x = self.seq_embedding(txt) # x: (N, max_length, d_model)
    # transform image_embeddings to proper shape
    image_embeddings = self.linear(image_embeddings)

    for dec_layer in self.decoder_layers:
      x = dec_layer((x, image_embeddings))

    # x: (N, max_length, units)

    x = self.last_layer(x) # (N, max_length, vocab_size)

    return x

In [25]:
model = Captioner(vocab_size = 32000, max_length = 32, d_model = 512, num_heads = 8, dropout = 0.4)

### Load the model with weights

## Convert

### Convet Decoder

1. Create WrappedDecoder Model

In [26]:
class WrappedDecoder(nn.Module):
  def __init__(self, decoder):
    super().__init__()
    self.decoder = decoder

  def forward(self, image_embeddings, txt):
    logit = self.decoder(image_embeddings, txt)
    pred = logit[:, i, :]
    pred = F.softmax(pred, dim=-1)
    next_word_index = pred.argmax(dim=-1)
    return next_word_index

In [27]:
iOS_decoder_v2 = WrappedDecoder(model)

In [41]:
pred = torch.rand(1, 32, 3000)[:, 2, :]
pred = F.softmax(pred, dim=-1)
print(pred.argmax(dim=-1).shape)

torch.Size([1])


2. trace the wrapped model

In [28]:
example_txt_input = torch.randint(low=1, high=30000, size=(1, 32))
example_image_embedding = torch.rand(1, 32, 768)

In [29]:
traced_decoder_model = torch.jit.trace(iOS_decoder_v2, (example_image_embedding, example_txt_input))

Tensor-likes are not close!

Mismatched elements: 1 / 1 (100.0%)
Greatest absolute difference: 10894.0 at index (0,) (up to 0.0 allowed)
Greatest relative difference: 0.6416161140232052 at index (0,) (up to 1e-05 allowed)
  _check_trace(


3. conver the traced model

In [30]:
decoder_model = ct.convert(
    traced_decoder_model,
    convert_to="mlprogram",
    inputs=[ct.TensorType(shape=example_image_embedding.shape), ct.TensorType(shape=example_txt_input.shape)]
 )

decoder_model.save("iOS_Decoder_V2.mlpackage")

Converting PyTorch Frontend ==> MIL Ops:  99%|█████████▉| 188/189 [00:00<00:00, 817.61 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 200.58 passes/s]
Running MIL default pipeline: 100%|██████████| 78/78 [00:02<00:00, 36.16 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 271.80 passes/s]


4. zip the file and move it to google drive for faster download

In [31]:
!zip -r iOS_Decoder_V2.zip iOS_Decoder_V2.mlpackage
!cp iOS_Decoder_V2.zip /content/drive/MyDrive/

  adding: iOS_Decoder_V2.mlpackage/ (stored 0%)
  adding: iOS_Decoder_V2.mlpackage/Data/ (stored 0%)
  adding: iOS_Decoder_V2.mlpackage/Data/com.apple.CoreML/ (stored 0%)
  adding: iOS_Decoder_V2.mlpackage/Data/com.apple.CoreML/weights/ (stored 0%)
  adding: iOS_Decoder_V2.mlpackage/Data/com.apple.CoreML/weights/weight.bin (deflated 9%)
  adding: iOS_Decoder_V2.mlpackage/Data/com.apple.CoreML/model.mlmodel (deflated 81%)
  adding: iOS_Decoder_V2.mlpackage/Manifest.json (deflated 60%)


### Convert Encoder

1. Create a wrapped model


In [32]:
weights = ViT_B_32_Weights.DEFAULT
encoder = vit_b_32(weights=weights)
encoder.heads = torch.nn.Identity()

Downloading: "https://download.pytorch.org/models/vit_b_32-d86f8d99.pth" to /root/.cache/torch/hub/checkpoints/vit_b_32-d86f8d99.pth
100%|██████████| 337M/337M [00:05<00:00, 64.3MB/s]


In [33]:
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class WrappedEncoder(nn.Module):
    def __init__(self, encoder, preprocess):
        super(WrappedEncoder, self).__init__()
        self.encoder = encoder
        self.preprocess = preprocess

    def forward(self, x):
        x = self.preprocess(x)
        output = self.encoder(x)
        output = output.repeat(32, 1)
        output = output.unsqueeze(0)
        return output

iOS_Encoder = WrappedEncoder(encoder, preprocess)

2. trace the wrapped model

In [34]:
traced_encoder_model = torch.jit.trace(iOS_Encoder, torch.rand(1, 3, 224, 224))

  assert condition, message


3. convert the traced model

In [35]:
encoder_model = ct.convert(
    traced_encoder_model,
    inputs=[ct.ImageType(shape=(1, 3, 224, 224))],
)

# Save the model to a file
encoder_model.save("VIT_iOS_Encoder_v3.mlpackage")

Converting PyTorch Frontend ==> MIL Ops: 100%|█████████▉| 1006/1007 [00:00<00:00, 1862.13 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 35.36 passes/s]
Running MIL default pipeline: 100%|██████████| 78/78 [00:13<00:00,  5.74 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 21.29 passes/s]


4. zip the file and move it to google drive for faster download

In [36]:
!zip -r VIT_iOS_Encoder_v3.zip VIT_iOS_Encoder_v3.mlpackage
!cp VIT_iOS_Encoder_v3.zip /content/drive/MyDrive/

  adding: VIT_iOS_Encoder_v3.mlpackage/ (stored 0%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Data/ (stored 0%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Data/com.apple.CoreML/ (stored 0%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Data/com.apple.CoreML/weights/ (stored 0%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Data/com.apple.CoreML/weights/weight.bin (deflated 7%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Data/com.apple.CoreML/model.mlmodel (deflated 89%)
  adding: VIT_iOS_Encoder_v3.mlpackage/Manifest.json (deflated 59%)


## Create vocab file

In [37]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
vocab = tokenizer.get_vocab()
with open('index_to_word.json', 'w') as file:
  json.dump({idx: token for token, idx in vocab.items()}, file)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]



config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]