# Install and import libraries

In [None]:
!pip install datasets evaluate accelerate
!pip install causal-conv1d>=1.1.0
!pip install mamba-ssm

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [3]:
import os
import random
import json
import torch
import torch.nn as nn
from collections import namedtuple
from dataclasses import dataclass, field, asdict
from mamba_ssm.models.mixer_seq_simple import MambaLMHeadModel
from mamba_ssm.utils.hf import load_config_hf, load_state_dict_hf

import evaluate
import numpy as np
from datasets import load_dataset
from transformers import Trainer
from transformers import AutoTokenizer, TrainingArguments

# Dowload dataset

In [None]:
imdb = load_dataset("imdb")

# Build custom Mamba Model

In [5]:
# @title
 # Config class of Mamba
class MambaConfig:
    d_model: int = 2560

    n_layer: int = 64
    vocab_size: int = 50277
    ssm_cfg: dict = field(default_factory=dict)
    rms_norm: bool = True
    residual_in_fp32: bool = True
    fused_add_norm: bool = True
    pad_vocab_size_multiple: int = 8

    def to_json_string(self):
        return json.dumps(asdict(self))
    def to_dict(self):
        return asdict(self)

In [6]:
# class head
class MambaClassificationHead(nn.Module):
    def __init__(self, d_model, num_classes, **kwargs):

        super(MambaClassificationHead, self).__init__()
        self.classification_head = nn.Linear(d_model, num_classes,**kwargs)
    def forward(self, hidden_states):
        return self.classification_head(hidden_states)

In [9]:
class MambaTextClassification(MambaLMHeadModel):
    def __init__(
        self,
        config: MambaConfig,
        initializer_cfg = None,
        device=None,
        dtype=None,
    )-> None:
        super().__init__(config, initializer_cfg, device, dtype)

        self.classification_head = MambaClassificationHead(d_model=config.d_model, num_classes=2)

        del self.lm_head

    def forward(self, input_ids, attention_mask=None, labels=None):
        # Truyền input_ids qua model gốc để nhận hidden_states.
        hidden_states = self.backbone(input_ids)
        # Lấy trung bình của hidden_states theo chiều thứ 2 để tạo ra [CLS] feature đại điện
        mean_hidden_states = hidden_states.mean(dim=1)
        # Đưa mean_hidden_states qua đầu phân loại để nhận logits.
        logits = self.classification_head(mean_hidden_states)
        if labels is None:
            ClassificationOutput = namedtuple("ClassificationOutput", ["logits"])
            return ClassificationOutput(logits=logits)
        else:
            ClassificationOutput = namedtuple("ClassificationOutput", ["loss", "logits"])
            # Sử dụng hàm mất mát CrossEntropyLoss để tính loss.
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)
            return ClassificationOutput(loss=loss, logits=logits)
    def predict(self, text, tokenizer, id2label=None):
        input_ids = torch.tensor(tokenizer(text)['input_ids'], device='cuda')[None]
        with torch.no_grad():
            logits = self.forward(input_ids).logits[0]
            label = np.argmax(logits.cpu().numpy())
        if id2label is not None:
            return id2label[label]
        else:
            return label
    @classmethod
    def from_pretrained(cls, pretrained_model_name, device=None, dtype=None, **kwargs):
        # Tải cấu hình từ model đã được train trước đó.
        config_data = load_config_hf(pretrained_model_name)
        config = MambaConfig(**config_data)
        # Khởi tạo model từ cấu hình và chuyển nó đến thiết bị và kiểu dữ liệu mong muốn.
        model = cls(config, device=device, dtype=dtype, **kwargs)
        # Tải trạng thái model đã được train trước đó.
        model_state_dict = load_state_dict_hf(pretrained_model_name,
        device=device, dtype=dtype)
        model.load_state_dict(model_state_dict, strict=False)
        # In ra các tham số embedding mới được khởi tạo.
        print("Newly initialized embedding:", set(model.state_dict().keys())- set(model_state_dict.keys()))
        return model

In [None]:
# Tải model Mamba từ model đã được train trước đó.
model = MambaTextClassification.from_pretrained("state-spaces/mamba-130m")
model.to("cuda")

# Tải tokenizer của model Mamba từ model gpt-neox-20b.
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neox-20b")
# Đặt id của token pad bằng id của token eos trong tokenizer.
tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
# Tạo chức năng tiền xử lý để mã hóa văn bản và cắt bớt các chuỗi không dài hơn độ dài đầu vào tối đa của mã thông báo
def preprocess_function(examples):
  samples = tokenizer(examples["text"], truncation=True)
  # Không cần attention_mask
  # Cụ thể hơn về token masking của mamba có thể tham khảo: https://
  github.com/state-spaces/mamba/issues/49
  samples.pop('attention_mask')
  return samples
# Thực hiện mã hóa văn bản
tokenized_imdb = imdb.map(preprocess_function, batched=True)

# Set seed cho hàm random
random.seed(42)

# Tạo tập train và test
train_dataset = tokenized_imdb["train"]
test_dataset = tokenized_imdb["test"]

# Tạo tập evaluation để đánh giá trong lúc train
# Do số lượng tập test lớn nên chỉ lấy mẫu 1% tập dữ liệu test để đánh giá
total_samples = len(test_dataset)
eval_samples = int(0.1 * total_samples)
eval_indices = random.sample(range(total_samples), eval_samples)
eval_dataset = test_dataset.select(eval_indices)

In [None]:
# @title
# Tải module "accuracy" từ thư viện evaluate.
accuracy = evaluate.load("accuracy")
# Định nghĩa hàm compute_metrics để tính các độ đo hiệu suất (metrics) cho việc đánh giá model.
def compute_metrics(eval_pred):
  predictions, labels = eval_pred
  # Lấy chỉ số của lớp có xác suất cao nhất trong predictions.
  predictions = np.argmax(predictions, axis=1)
  # Sử dụng module "accuracy" để tính độ chính xác dựa trên predictions và labels.
  return accuracy.compute(predictions=predictions, references=labels)

In [None]:
# Định nghĩa tên project để log thông tin quá trình train trên wandb
# os.environ["WANDB_PROJECT"] = "mamba_tutorial"

# Định nghĩa các tham số train trong class TrainingArguments.
# Cụ thể hơn về các tham số hỗ trợ có thể tham khảo: https://huggingface.co/docs/transformers/main_classes/trainer
training_args = TrainingArguments(
  output_dir="mamba_text_classification", # Tên folder output
  learning_rate=5e-5,
  per_device_train_batch_size=4, # Số lượng train sample trên mỗi device
  per_device_eval_batch_size=16, # Số lượng eval sample trên mỗi device
  num_train_epochs=1, # Số epoch train
  report_to="none", # "wandb" nếu muốn log kết quả
  warmup_ratio=0.01, # Tỉ lệ tăng dần lr trong giai đoạn warmup
  lr_scheduler_type="cosine", # Loại scheduler để giảm lr
  evaluation_strategy="steps", # Xác định metric đánh giá sau mỗi số bước
  eval_steps=0.1, # Số bước giữa các đợt đánh giá
  save_strategy="steps", # Xác định khi nào lưu checkpoint
  save_steps=0.1, # Số bước giữa các lần lưu checkpoint
  logging_strategy="steps", # Xác định khi nào in thông tin log
  logging_steps=1, # Số bước giữa các lần in thông tin log
  push_to_hub=True, # Đẩy kết quả lên Hub
  load_best_model_at_end=True, # Load model có kết quả evaluation tốt nhất trong quá trình train
)

In [None]:
# Định nghĩa một class MambaTrainer kế thừa từ class Trainer.
class MambaTrainer(Trainer):

  # Định nghĩa hàm compute_loss để tính toán hàm mất mát trong quá trình train.
  def compute_loss(self, model, inputs, return_outputs=False):
      # Lấy giá trị input_ids và labels từ inputs.
      input_ids = inputs.pop("input_ids")
      labels = inputs.pop('labels')
      # Gọi hàm forward của model với input_ids và labels để nhận các kết quả.
      outputs = model(input_ids=input_ids, labels=labels)
      # Lấy giá trị loss từ kết quả của model.
      loss = outputs.loss
      # Trả về cả loss và outputs nếu return_outputs là True, ngược lại chỉ trả về loss.
      return (loss, outputs) if return_outputs else loss

  # Định nghĩa hàm save_model để lưu model trong quá trình train.
  def save_model(self, output_dir = None, _internal_call = False):
      # Kiểm tra nếu thư mục lưu trữ không được chỉ định, sử dụng thư mục mặc định từ đối số ’args’.
      if output_dir is None:
          output_dir = self.args.output_dir
      # Nếu thư mục đầu ra không tồn tại, tạo mới nó.
      if not os.path.exists(output_dir):
          os.makedirs(output_dir)
      # Lưu trạng thái của model PyTorch vào file ’pytorch_model.bin’ trong thư mục đầu ra.
      torch.save(self.model.state_dict(), f"{output_dir}/pytorch_model.bin")
      # Lưu trạng thái của tokenizer vào thư mục đầu ra.
      self.tokenizer.save_pretrained(output_dir)
      # Lưu cấu hình của model vào file ’config.json’ trong thư mục đầu ra.
      with open(f'{output_dir}/config.json', 'w') as f:
          json.dump(self.model.config.to_dict(), f)

In [None]:
# Khởi tạo classs MambaTrainer để thực hiện quá trình train của

trainer = MambaTrainer(
  model=model, # Model cần train
  train_dataset=train_dataset, # Dữ liệu train
  eval_dataset=eval_dataset, # Dữ liệu đánh giá
  tokenizer=tokenizer, # Tokenizer sử dụng để mã hóa dữ liệu
  args=training_args, # Các tham số train đã được định nghĩa trước đó
  compute_metrics=compute_metrics # Hàm tính các độ đo hiệu suất (metrics) cho đánh giá
 )
 # Bắt đầu quá trình train bằng cách gọi hàm train() trên classs
 trainer.train()

In [None]:
 # Đẩy model lên huggingface hub
trainer.push_to_hub(commit_message="Training complete")


In [None]:
 # Thực hiện dự đoán trên tập dữ liệu validation
outputs = trainer.predict(test_dataset)
print(outputs.metrics)

In [None]:
# Tải model Mamba từ model đã được train trước đó.
model = MambaTextClassification.from_pretrained("trinhxuankhai/
mamba_text_classification")
model.to("cuda")

# Tải tokenizer của model Mamba từ model đã được train trước đó.
tokenizer = AutoTokenizer.from_pretrained("trinhxuankhai/mamba_text_classification")
# Đặt id của token pad bằng id của token eos trong tokenizer.
tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
id2label = {0: "NEGATIVE", 1: "POSITIVE"}
text = imdb['test'][0]['text']
label = imdb['test'][0]['label']
response = model.predict(text, tokenizer, id2label)
print(f'Classify: {text}\nGT: {id2label[label]}\nPredict: {response}')