In [1]:
from transformers import XLMRobertaTokenizer, XLMRobertaModel
import math
from typing import Tuple, List
import torch
import torch.nn as nn
from timm.models import create_model
from timm.models.layers import trunc_normal_ as __call_trunc_normal__
from timm.models.registry import register_model
from torchscale.model.BEiT3 import BEiT3
from torchscale.architecture.config import EncoderConfig
import os
from gensim.models import KeyedVectors
from transformers import AutoTokenizer, AutoModel
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# 1) load the pretrained tokenizer and model
tokenizer = XLMRobertaTokenizer.from_pretrained("xlm-roberta-large")
model = XLMRobertaModel.from_pretrained("xlm-roberta-large")

# 2) get ordered vocab list
#    (tokenizer.get_vocab() returns a dict token→id)
vocab = sorted(tokenizer.get_vocab().items(), key=lambda x: x[1])
tokens, ids = zip(*vocab)

# 3) extract embedding matrix (|V|×D)
embs = model.embeddings.word_embeddings.weight.detach().cpu().numpy()

# 4) write Word2Vec text file
D = embs.shape[1]
with open("xlmr.txt", "w", encoding="utf-8") as fout:
    fout.write(f"{len(tokens)} {D}\n")
    for tok, vec in zip(tokens, embs):
        vec_str = " ".join(f"{x:.6f}" for x in vec)
        fout.write(f"{tok} {vec_str}\n")

print("Wrote xlmr.txt with", len(tokens), "tokens ×", D, "dimensions")

Wrote xlmr.txt with 250002 tokens × 1024 dimensions


In [3]:
# dump_beit3.py
"""
Export BEiT-3 text embeddings to Word2Vec-style text.
"""

# ---------------------------------------------------------------------
# Configuration constants
# ---------------------------------------------------------------------
# 1) Directory of the saved the BEIT-3 tokenizer
TOKENIZER_DIR = "/cfs/home/u036743/emotion_recognition/beit3.spm"
# 2) Name or path of the BEIT-3 model checkpoint
MODEL_NAME_OR_PATH = "/cfs/home/u036743/emotion_recognition/beit3_large_patch16_224.pth"
# 5) Output file
OUTPUT_TXT = "beit3.txt"


# ---------------------------------------------------------------------
# Utility / config builders
# ---------------------------------------------------------------------
def trunc_normal_(tensor, mean: float = 0.0, std: float = 1.0) -> None:
    __call_trunc_normal__(tensor, mean=mean, std=std, a=-std, b=std)


def _get_base_config(img_size: int = 224,
                     patch_size: int = 16,
                     drop_path_rate: float = 0,
                     checkpoint_activations=None,
                     mlp_ratio: int = 4,
                     vocab_size: int = 64010,
                     **kwargs) -> EncoderConfig:
    return EncoderConfig(
        img_size=img_size,
        patch_size=patch_size,
        vocab_size=vocab_size,
        multiway=True,
        layernorm_embedding=False,
        normalize_output=True,
        no_output_layer=True,
        drop_path_rate=drop_path_rate,
        encoder_embed_dim=768,
        encoder_attention_heads=12,
        encoder_ffn_embed_dim=int(768 * mlp_ratio),
        encoder_layers=12,
        checkpoint_activations=checkpoint_activations,
    )


def _get_large_config(img_size: int = 224,
                      patch_size: int = 16,
                      drop_path_rate: float = 0,
                      checkpoint_activations=None,
                      mlp_ratio: int = 4,
                      vocab_size: int = 64010,
                      **kwargs) -> EncoderConfig:
    return EncoderConfig(
        img_size=img_size,
        patch_size=patch_size,
        vocab_size=vocab_size,
        multiway=True,
        layernorm_embedding=False,
        normalize_output=True,
        no_output_layer=True,
        drop_path_rate=drop_path_rate,
        encoder_embed_dim=1024,
        encoder_attention_heads=16,
        encoder_ffn_embed_dim=int(1024 * mlp_ratio),
        encoder_layers=24,
        checkpoint_activations=checkpoint_activations,
    )


# ---------------------------------------------------------------------
# Model wrapper and registration
# ---------------------------------------------------------------------
class BEiT3Wrapper(nn.Module):

    def __init__(self, args, **kwargs):
        super().__init__()
        self.args = args
        self.beit3 = BEiT3(args)
        self.apply(self._init_weights)

    def fix_init_weight(self) -> None:

        def rescale(param, layer_id):
            param.div_(math.sqrt(2.0 * layer_id))

        for layer_id, layer in enumerate(self.blocks):
            rescale(layer.attn.proj.weight.data, layer_id + 1)
            rescale(layer.mlp.fc2.weight.data, layer_id + 1)

    def get_num_layers(self) -> int:
        return self.beit3.encoder.num_layers

    @torch.jit.ignore
    def no_weight_decay(self):
        return {
            'pos_embed', 'cls_token', 'beit3.encoder.embed_positions.A.weight',
            'beit3.vision_embed.cls_token', 'logit_scale'
        }

    def _init_weights(self, m) -> None:
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def get_input_embeddings(self) -> nn.Embedding:
        return self.beit3.text_embed


@register_model
def beit3_large_patch16_224(pretrained: bool = False, **kwargs) -> BEiT3Wrapper:
    args = _get_large_config(**kwargs)
    args.normalize_output = False
    model = BEiT3Wrapper(args, **kwargs)
    return model


# ---------------------------------------------------------------------
# Loading helpers
# ---------------------------------------------------------------------
def load_checkpoint(path: str):
    """Load a checkpoint and return its state dict."""
    ckpt = torch.load(path, map_location="cpu")
    if "model" in ckpt:
        return ckpt["model"]
    if "state_dict" in ckpt:
        return ckpt["state_dict"]
    return ckpt


def build_registered_model() -> nn.Module:
    model = create_model(
        "beit3_large_patch16_224",
        pretrained=False,
        drop_path_rate=0.1,
        vocab_size=64010,
    )
    return model


# ---------------------------------------------------------------------
# Tokenizer / vocab and export helpers
# ---------------------------------------------------------------------
def load_tokenizer(tokenizer_dir: str) -> XLMRobertaTokenizer:
    """Instantiate XLM-R tokenizer from directory."""
    return XLMRobertaTokenizer(tokenizer_dir)


def extract_sorted_vocab(
        tokenizer: XLMRobertaTokenizer) -> Tuple[List[str], List[int]]:
    """Return tokens and ids sorted by id."""
    vocab_items = sorted(tokenizer.get_vocab().items(), key=lambda x: x[1])
    tokens, ids = zip(*vocab_items)
    return list(tokens), list(ids)


def export_embeddings_txt(tokens: List[str], embeddings: torch.Tensor,
                          out_path: str) -> None:
    """Write Word2Vec-style text file."""
    embs = embeddings.detach().cpu().numpy()
    V, D = embs.shape
    with open(out_path, "w", encoding="utf-8") as fout:
        fout.write(f"{V} {D}\n")
        for tok, vec in zip(tokens, embs):
            vec_str = " ".join(f"{x:.6f}" for x in vec)
            fout.write(f"{tok} {vec_str}\n")
    print(f"Wrote {out_path} with {V} tokens × {D} dimensions")


def main():
    # Load tokenizer and model
    print("Loading tokenizer")

    # Instantiate tokenizer
    tokenizer = load_tokenizer(TOKENIZER_DIR)

    # Create and load the timm-registered model
    model = build_registered_model()
    print("Loading model checkpoint from", MODEL_NAME_OR_PATH)
    sd = load_checkpoint(MODEL_NAME_OR_PATH)
    model.load_state_dict(sd, strict=False)

    print("Model loaded")

    # Extract vocab sorted by id
    tokens, _ = extract_sorted_vocab(tokenizer)

    embs_weight = model.get_input_embeddings().weight

    # Export in Word2Vec text format
    export_embeddings_txt(tokens, embs_weight, OUTPUT_TXT)


if __name__ == "__main__":
    main()

Loading tokenizer
Loading model checkpoint from /cfs/home/u036743/emotion_recognition/beit3_large_patch16_224.pth
Model loaded
Wrote beit3.txt with 64010 tokens × 1024 dimensions


In [4]:
def load_vocab(path):
    """
    Load the vocab (first column) from a word2vec-format file.
    Assumes the first line is "V D" header; skips it.
    """
    vocab = []
    with open(path, encoding='utf-8') as f:
        header = f.readline()  # e.g. "64010 1024"
        for line in f:
            tok = line.split(' ', 1)[0]
            vocab.append(tok)
    return vocab

def main(beit3_path, xlmr_path, out_seed_path):
    beit3_vocab = load_vocab(beit3_path)
    xlmr_vocab  = load_vocab(xlmr_path)

    set_beit = set(beit3_vocab)
    set_xlmr = set(xlmr_vocab)

    # Intersection: common tokens
    common = sorted(set_beit & set_xlmr)

    print(f"BEiT-3 vocab size: {len(beit3_vocab)}")
    print(f"XLM-R vocab size:  {len(xlmr_vocab)}")
    print(f"Common tokens:     {len(common)}")

    # Write seed dictionary
    with open(out_seed_path, 'w', encoding='utf-8') as fout:
        for tok in common:
            fout.write(f"{tok} {tok}\n")

    print(f"Wrote seed dictionary to {out_seed_path}")
    
beit3_path, xlmr_path, out_seed_path = "/cfs/home/u036743/emotion_recognition/beit3.txt", "/cfs/home/u036743/emotion_recognition/xlmr.txt", "/cfs/home/u036743/emotion_recognition/out_seed.txt"
main(beit3_path, xlmr_path, out_seed_path)

BEiT-3 vocab size: 64002
XLM-R vocab size:  250002
Common tokens:     22076
Wrote seed dictionary to /cfs/home/u036743/emotion_recognition/out_seed.txt


In [5]:
# Operations that you can perform here or on terminal
# Count lines
!wc -l /cfs/home/u036743/emotion_recognition/beit3_clean.txt
!wc -l /cfs/home/u036743/emotion_recognition/xlmr_clean.txt

# Check embedding dimension
!head -1 /cfs/home/u036743/emotion_recognition/beit3_clean.txt | awk '{print NF-1}'

# Add header lines (prepend the number of tokens + dimension)
!sed -i '1i 64002 1024' /cfs/home/u036743/emotion_recognition/beit3_clean.txt
!sed -i '1i 250002 1024' /cfs/home/u036743/emotion_recognition/xlmr_clean.txt


64003 /cfs/home/u036743/emotion_recognition/beit3_clean.txt
250003 /cfs/home/u036743/emotion_recognition/xlmr_clean.txt
1


In [6]:
# ─── Config ────────────────────────────────────────────────────────────────────
XLMR_MAPPED_TXT = "/cfs/home/u036743/emotion_recognition/xlmr_mapped.txt"
XLMR_TOKENIZER_DIR = "xlm-roberta-large"
OUT_FILE = "xlmr_in_beit3_space.pt"

# ─── 1) load mapped vectors ────────────────────────────────────────────────────
print("Loading mapped XLM-R vectors from", XLMR_MAPPED_TXT)
mapped_kv = KeyedVectors.load_word2vec_format(XLMR_MAPPED_TXT, binary=False)

# ─── 2) load XLM-RoBERTa tokenizer ───────────────────────────────────────────────
print("Loading XLM-RoBERTa-large tokenizer from", XLMR_TOKENIZER_DIR)
tok = XLMRobertaTokenizer.from_pretrained(XLMR_TOKENIZER_DIR)

vocab = tok.get_vocab()  # { token_str: token_id }
vocab_size = len(vocab)
D = mapped_kv.vector_size  # should be 1024

assert D == 1024, f"Expected mapped vectors to be 1024-dim, got {D}"

# ─── 3) allocate + fill ───────────────────────────────────────────────────────
print(f"Building new embedding matrix: {vocab_size} tokens × {D} dims")
new_emb = torch.zeros(vocab_size, D, dtype=torch.float32)

missing = 0
for token, idx in vocab.items():
    if token in mapped_kv:
        new_emb[idx] = torch.from_numpy(mapped_kv[token])
    else:
        # random init for any stray tokens
        new_emb[idx].normal_(0, 0.02)
        missing += 1

print(
    f"  → {missing} tokens were not found in the mapped file; randomly initialized them"
)

# ─── 4) save ───────────────────────────────────────────────────────────────────
print("Saving new embeddings to", OUT_FILE)
torch.save(new_emb, OUT_FILE)
print("Done.")

Loading mapped XLM-R vectors from /cfs/home/u036743/emotion_recognition/xlmr_mapped.txt
Loading XLM-RoBERTa-large tokenizer from xlm-roberta-large
Building new embedding matrix: 250002 tokens × 1024 dims


  new_emb[idx] = torch.from_numpy(mapped_kv[token])


  → 0 tokens were not found in the mapped file; randomly initialized them
Saving new embeddings to xlmr_in_beit3_space.pt
Done.


In [11]:
from timm.models.layers import trunc_normal_ as __call_trunc_normal_
XLRM_TOKENIZER_VOCAB_SIZE = 250002
XLMR_EMB_PATH = "xlmr_in_beit3_space.pt"

def trunc_normal_(tensor, mean=0., std=1.):
    __call_trunc_normal_(tensor, mean=mean, std=std, a=-std, b=std)


def _get_large_config(img_size=224,
                      patch_size=16,
                      drop_path_rate=0,
                      checkpoint_activations=None,
                      mlp_ratio=4,
                      vocab_size=XLRM_TOKENIZER_VOCAB_SIZE,
                      **kwargs):
    return EncoderConfig(
        img_size=img_size,
        patch_size=patch_size,
        vocab_size=vocab_size,
        multiway=True,
        layernorm_embedding=False,
        normalize_output=True,
        no_output_layer=True,
        drop_path_rate=drop_path_rate,
        encoder_embed_dim=1024,
        encoder_attention_heads=16,
        encoder_ffn_embed_dim=int(1024 * mlp_ratio),
        encoder_layers=24,
        checkpoint_activations=checkpoint_activations,
    )


class BEiT3Wrapper(nn.Module):

    def __init__(self, args, **kwargs):
        super().__init__()
        self.args = args
        self.beit3 = BEiT3(args)
        self.apply(self._init_weights)

    def fix_init_weight(self):

        def rescale(param, layer_id):
            param.div_(math.sqrt(2.0 * layer_id))

        for layer_id, layer in enumerate(self.blocks):
            rescale(layer.attn.proj.weight.data, layer_id + 1)
            rescale(layer.mlp.fc2.weight.data, layer_id + 1)

    def get_num_layers(self):
        return self.beit3.encoder.num_layers

    @torch.jit.ignore
    def no_weight_decay(self):
        return {
            'pos_embed', 'cls_token', 'beit3.encoder.embed_positions.A.weight',
            'beit3.vision_embed.cls_token', 'logit_scale'
        }

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)


@register_model
def beit3_large_patch16_224(pretrained=False, **kwargs):
    args = _get_large_config(**kwargs)
    args.normalize_output = False
    model = BEiT3Wrapper(args, **kwargs)
    return model


def load_checkpoint_dict(path: str) -> dict:
    ckpt = torch.load(path, map_location="cpu")
    if "model" in ckpt:
        return ckpt["model"]
    if "state_dict" in ckpt:
        return ckpt["state_dict"]
    return ckpt


def inject_new_embeddings(model: nn.Module, new_emb: torch.Tensor):
    # Access the text embedding layer directly from the wrapper
    layer = model.beit3.text_embed
    assert layer.weight.shape == new_emb.shape, (
        f"Expected {layer.weight.shape}, got {new_emb.shape}"
    )
    with torch.no_grad():
        layer.weight.copy_(new_emb)


def load_beit3_model(model_name,
                     tokenizer_model_name="xlmr-roberta-large",
                     xlmr_embeddings_path=XLMR_EMB_PATH,
                     checkpoint_path=None):
    """
    Load the BEiT3 model with the specified model name and checkpoint path.
    """
    # Load the tokenizer
    tokenizer = XLMRobertaTokenizer.from_pretrained(tokenizer_model_name)

    # Create the model
    model = create_model(
        model_name,
        drop_path_rate=0.1,
    )

    # Load the checkpoint if provided
    if checkpoint_path:
        sd = load_checkpoint_dict(checkpoint_path)

        # Remove the text_embed.weight from the state dict
        # Because we will inject the new embeddings
        sd.pop("beit3.text_embed.weight", None)
        
        model.load_state_dict(sd, strict=False)
        print("✓ Backbone loaded (excluding text_embed).")

        # Inject mapped embeddings
        # Load new XLM-R→BEiT-3 embeddings
        new_emb = torch.load(xlmr_embeddings_path)

        # Confirm vocab size matches XLM-R tokenizer
        assert len(tokenizer) == new_emb.shape[0], (
            f"Tokenizer vocab ({len(tokenizer)}) "
            f"!= embeddings ({new_emb.shape[0]})")

        # Inject new embeddings
        inject_new_embeddings(model, new_emb)
        print("✓ Injected new embeddings.")

    return model, tokenizer


if __name__ == "__main__":
    # Example usage
    model_name = "beit3_large_patch16_224"
    tokenizer_model_name = "xlm-roberta-large"
    xlmr_embeddings_path = XLMR_EMB_PATH
    checkpoint_path = "/cfs/home/u036743/emotion_recognition/beit3_large_patch16_224.pth"

    model, tokenizer = load_beit3_model(model_name, tokenizer_model_name,
                                        xlmr_embeddings_path, checkpoint_path)

    print("Model and tokenizer loaded successfully.")

✓ Backbone loaded (excluding text_embed).
✓ Injected new embeddings.
Model and tokenizer loaded successfully.
