In [None]:
!pip install brevitas
!pip install "setuptools<70.0"
!pip install transformers



In [None]:
import torch
import torch.nn as nn
from transformers import DistilBertConfig, DistilBertTokenizer, DistilBertModel, DistilBertForSequenceClassification
from brevitas.nn import QuantLinear, QuantEmbedding, QuantReLU

In [None]:
class QuantDistilBertSelfAttention(nn.Module):
    def __init__(self, config, bit_width):
        super().__init__()
        self.num_heads = config.n_heads
        self.head_dim = config.dim // config.n_heads

        self.q_lin = QuantLinear(config.dim, config.dim, bias=True, weight_bit_width=bit_width)
        self.k_lin = QuantLinear(config.dim, config.dim, bias=True, weight_bit_width=bit_width)
        self.v_lin = QuantLinear(config.dim, config.dim, bias=True, weight_bit_width=bit_width)
        self.out_lin = QuantLinear(config.dim, config.dim, bias=True, weight_bit_width=bit_width)

        self.dropout = nn.Dropout(config.dropout)

    def forward(self, query, key, value, mask):
        batch_size, seq_length, dim = query.size()

        q = self.q_lin(query).view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
        k = self.k_lin(key).view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.v_lin(value).view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)
        if mask is not None:
            extended_mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(extended_mask == 0, float('-inf'))
        weights = torch.softmax(scores, dim=-1)
        weights = self.dropout(weights)

        context = torch.matmul(weights, v)
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_length, dim)
        output = self.out_lin(context)
        return output

In [None]:
class QuantDistilBertFeedForward(nn.Module):
    def __init__(self, config, bit_width):
        super().__init__()
        self.lin1 = QuantLinear(config.dim, config.hidden_dim, bias=True, weight_bit_width=bit_width)
        self.lin2 = QuantLinear(config.hidden_dim, config.dim, bias=True, weight_bit_width=bit_width)
        self.dropout = nn.Dropout(config.dropout)
        self.activation = QuantReLU(bit_width=bit_width, return_quant_tensor=False)

    def forward(self, x):
        x = self.lin1(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.lin2(x)
        return x

In [None]:
class QuantDistilBertLayer(nn.Module):
    def __init__(self, config, bit_width):
        super().__init__()
        self.attention = QuantDistilBertSelfAttention(config, bit_width)
        self.sa_layer_norm = nn.LayerNorm(config.dim)
        self.ff = QuantDistilBertFeedForward(config, bit_width)
        self.output_layer_norm = nn.LayerNorm(config.dim)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x, mask):
        sa_output = self.attention(x, x, x, mask)
        x = self.sa_layer_norm(x + self.dropout(sa_output))
        ff_output = self.ff(x)
        x = self.output_layer_norm(x + self.dropout(ff_output))
        return x

In [None]:
class QuantDistilBert(nn.Module):
    def __init__(self, config, bit_width):
        super().__init__()
        self.embeddings = QuantEmbedding(config.vocab_size, config.dim, padding_idx=config.pad_token_id, weight_bit_width=bit_width)
        self.position_embeddings = QuantEmbedding(config.max_position_embeddings, config.dim, weight_bit_width=bit_width)
        self.token_type_embeddings = QuantEmbedding(2, config.dim, weight_bit_width=bit_width)
        self.emb_dropout = nn.Dropout(config.dropout)
        self.emb_layer_norm = nn.LayerNorm(config.dim)

        self.transformer = nn.ModuleList([
            QuantDistilBertLayer(config, bit_width) for _ in range(config.n_layers)
        ])

        self.pre_classifier = QuantLinear(config.dim, config.dim, bias=True, weight_bit_width=bit_width)
        self.classifier = QuantLinear(config.dim, config.num_labels, bias=True, weight_bit_width=bit_width)
        self.activation = QuantReLU(bit_width=bit_width, return_quant_tensor=False)

    def forward(self, input_ids, attention_mask):
        seq_length = input_ids.size(1)
        position_ids = torch.arange(0, seq_length, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids)
        token_type_ids = torch.zeros_like(input_ids)

        word_embeddings = self.embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)

        embeddings = word_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.emb_layer_norm(self.emb_dropout(embeddings))

        hidden_state = embeddings
        for layer in self.transformer:
            hidden_state = layer(hidden_state, attention_mask)

        pooled_output = hidden_state[:, 0]
        pooled_output = self.pre_classifier(pooled_output)
        pooled_output = self.activation(pooled_output)
        logits = self.classifier(pooled_output)
        return logits


In [None]:
def transfer_weights(pretrained, quant):
    quant.embeddings.weight.data = pretrained.embeddings.word_embeddings.weight.data.clone()
    quant.position_embeddings.weight.data = pretrained.embeddings.position_embeddings.weight.data.clone()

    if hasattr(pretrained.embeddings, "token_type_embeddings"):
        quant.token_type_embeddings.weight.data = pretrained.embeddings.token_type_embeddings.weight.data.clone()
    else:
        with torch.no_grad():
            quant.token_type_embeddings.weight.zero_()
    #tranform layer
    quant.emb_layer_norm.load_state_dict(pretrained.embeddings.LayerNorm.state_dict())

    for q_layer, p_layer in zip(quant.transformer, pretrained.transformer.layer):
        q_layer.attention.q_lin.weight.data = p_layer.attention.q_lin.weight.data.clone()
        q_layer.attention.q_lin.bias.data = p_layer.attention.q_lin.bias.data.clone()
        q_layer.attention.k_lin.weight.data = p_layer.attention.k_lin.weight.data.clone()
        q_layer.attention.k_lin.bias.data = p_layer.attention.k_lin.bias.data.clone()
        q_layer.attention.v_lin.weight.data = p_layer.attention.v_lin.weight.data.clone()
        q_layer.attention.v_lin.bias.data = p_layer.attention.v_lin.bias.data.clone()
        q_layer.attention.out_lin.weight.data = p_layer.attention.out_lin.weight.data.clone()
        q_layer.attention.out_lin.bias.data = p_layer.attention.out_lin.bias.data.clone()
        # feedforward
        q_layer.ff.lin1.weight.data = p_layer.ffn.lin1.weight.data.clone()
        q_layer.ff.lin1.bias.data = p_layer.ffn.lin1.bias.data.clone()
        q_layer.ff.lin2.weight.data = p_layer.ffn.lin2.weight.data.clone()
        q_layer.ff.lin2.bias.data = p_layer.ffn.lin2.bias.data.clone()
        #layernorm
        q_layer.sa_layer_norm.load_state_dict(p_layer.sa_layer_norm.state_dict())
        q_layer.output_layer_norm.load_state_dict(p_layer.output_layer_norm.state_dict())

In [None]:
if __name__ == "__main__":
    config = DistilBertConfig.from_pretrained("distilbert-base-uncased")
    config.num_labels = 2
    bit_width = 4

    pretrained_model = DistilBertModel.from_pretrained("distilbert-base-uncased")
    quant_model = QuantDistilBert(config, bit_width=bit_width)
    transfer_weights(pretrained_model, quant_model)
    print("Pretrained weights transferred to quantized model.")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Pretrained weights transferred to quantized model.


In [None]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import nltk
import re
from nltk.corpus import stopwords
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm

In [None]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
df = pd.read_csv("/content/drive/MyDrive/tweetdataset.csv")

stop_words = set(stopwords.words('english'))

def preprocess(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    tokens = text.split()
    tokens = [word for word in tokens if word not in stop_words and word.isalpha()]
    return ' '.join(tokens)

df['text'] = df['text'].astype(str).apply(preprocess)

le = LabelEncoder()
df['label'] = le.fit_transform(df['sentiment'])

In [None]:
df['label'].value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
1,12547
2,9685
0,8782


In [None]:
import pandas as pd

target_count = 8500

df_label_0 = df[df['label'] == 0].sample(n=min(target_count, df['label'].value_counts()[0]), random_state=42)
df_label_1 = df[df['label'] == 1].sample(n=min(target_count, df['label'].value_counts()[1]), random_state=42)
df_label_2 = df[df['label'] == 2].sample(n=min(target_count, df['label'].value_counts()[2]), random_state=42)

df_balanced = pd.concat([df_label_0, df_label_1, df_label_2]).sample(frac=1, random_state=42).reset_index(drop=True)

print(df_balanced['label'].value_counts())
print(f"Total size of balanced dataframe: {len(df_balanced)}")

df = df_balanced


label
0    8500
1    8500
2    8500
Name: count, dtype: int64
Total size of balanced dataframe: 25500


In [None]:
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

class TweetDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        enc = tokenizer(self.texts[idx], truncation=True, padding='max_length', max_length=self.max_len, return_tensors='pt')
        return {
            'input_ids': enc['input_ids'].squeeze(),
            'attention_mask': enc['attention_mask'].squeeze(),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }

In [None]:
from sklearn.model_selection import train_test_split
train_texts, val_texts, train_labels, val_labels = train_test_split(df['text'].tolist(), df['label'].tolist(), test_size=0.1,random_state=42)

train_dataset = TweetDataset(train_texts, train_labels, tokenizer)
val_dataset = TweetDataset(val_texts, val_labels, tokenizer)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_labels = len(le.classes_)

fp_model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=num_labels)
fp_model.to(device)
optimizer = torch.optim.AdamW(fp_model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)

for epoch in range(5):
    fp_model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"FP Epoch {epoch+1}"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        optimizer.zero_grad()
        outputs = fp_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"FP Epoch {epoch+1}, Training Loss: {total_loss / len(train_loader):.4f}")

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
FP Epoch 1: 100%|██████████| 1435/1435 [04:00<00:00,  5.98it/s]


FP Epoch 1, Training Loss: 0.6724


FP Epoch 2: 100%|██████████| 1435/1435 [04:00<00:00,  5.97it/s]


FP Epoch 2, Training Loss: 0.5245


FP Epoch 3: 100%|██████████| 1435/1435 [03:59<00:00,  6.00it/s]


FP Epoch 3, Training Loss: 0.3930


FP Epoch 4: 100%|██████████| 1435/1435 [04:01<00:00,  5.94it/s]


FP Epoch 4, Training Loss: 0.2593


FP Epoch 5: 100%|██████████| 1435/1435 [04:01<00:00,  5.95it/s]

FP Epoch 5, Training Loss: 0.1658





In [None]:
fp_save_path = "/content/finetuned_distilbert.pth"
torch.save(fp_model.state_dict(), fp_save_path)
print(f"Full-precision fine-tuned model saved to {fp_save_path}")

In [None]:
bit_widths = [1, 4, 8, 32]
qat_epochs = 3 #bisa naikin, lagi coba qat 3 epoch 5
models = {}

In [None]:
config = DistilBertConfig.from_pretrained("distilbert-base-uncased")
config.num_labels = num_labels

for bit in bit_widths:
    print(f"QAT {bit}-bit")
    quant_model = QuantDistilBert(config, bit_width=bit)
    base_fp_model = DistilBertModel(config)
    fp_state_dict = torch.load(fp_save_path, map_location='cpu')
    encoder_state_dict = {k.replace("distilbert.", ""): v for k, v in fp_state_dict.items() if k.startswith("distilbert.")}
    base_fp_model.load_state_dict(encoder_state_dict, strict=False)

    transfer_weights(base_fp_model, quant_model)
    quant_model.to(device)

    optimizer = torch.optim.AdamW(quant_model.parameters(), lr=2e-5)
    criterion = nn.CrossEntropyLoss()
    for epoch in range(qat_epochs):
        quant_model.train()
        total_loss = 0
        for batch in tqdm(train_loader, desc=f"QAT {bit}-bit Epoch {epoch+1}"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            optimizer.zero_grad()
            outputs = quant_model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"QAT {bit}-bit Epoch {epoch+1}, Training Loss: {total_loss / len(train_loader):.4f}")

    save_path = f"/content/quant_distilbert_{bit}bit.pth"
    torch.save(quant_model.state_dict(), save_path)
    models[bit] = quant_model
    print(f"QAT {bit}-bit model saved to {save_path}")

In [None]:
print("\Evaluation")
for bit, model in models.items():
    all_preds = []
    all_labels = []
    model.eval()
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    acc = accuracy_score(all_labels, all_preds)
    print(f"Accuracy for {bit}bit QAT model: {acc:.4f}")