In [None]:
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import json
from transformers import AutoTokenizer, BertModel
import tqdm
import torch.nn as nn
import math
import random
from torch.nn.utils import prune


class OurDataset(Dataset):
    def __init__(self, data_file, labels_file):
        self.full_data = json.load(open(data_file))
        self.labels = torch.load(labels_file)

        self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
        self.model = BertModel.from_pretrained("bert-base-uncased")

    def __len__(self):
        return len(self.full_data)

    def __getitem__(self, idx):
        inputs = self.tokenizer(self.full_data[idx], return_tensors="pt")
        outputs = self.model(**inputs)
        last_hidden_states = outputs.last_hidden_state
        return last_hidden_states, self.labels[idx]

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

def main():
    return TransformerClassifier()

class TransformerClassifier(nn.Module):

    def __init__(
        self,
        d_model=768,
        n_classes=3,
        nhead=4,
        dim_feedforward=512,
        num_layers=6,
        dropout=0.1,
        activation="relu",
        classifier_dropout=0.1,
    ):

        super().__init__()

        self.pos_encoder = PositionalEncoding(
            d_model=d_model,
            dropout=dropout,
            max_len=5000,
        )

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers,
        )
        self.head = nn.Sequential(
            nn.Linear(d_model, 256),
            nn.ReLU(),
            nn.Linear(256, n_classes),
            nn.Softmax(dim=1)
        )

        self.d_model = d_model
        self.dropout = nn.Dropout(p=classifier_dropout)

    def forward(self, x):
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)
        x = self.dropout(x)
        x = self.head(x)

        return x

In [None]:
t = TransformerClassifier()
ds = OurDataset("full_data.json", "labels.torch")



tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

dl = DataLoader(ds)
# Assuming you have a DataLoader `data_loader` and the total dataset size `total_data_len`
# Example: DataLoader was created from a TensorDataset
# dataset = TensorDataset(X, y)
total_data_len = len(dl.dataset)

# Determine the sizes for the training and testing sets
train_size = int(0.7 * total_data_len)  # 70% for training
test_size = total_data_len - train_size  # Remaining 30% for testing

# Split the DataLoader into train and test DataLoaders
train_data, test_data = torch.utils.data.random_split(dl.dataset, [train_size, test_size])

# Create separate DataLoader objects for training and testing
train_loader = DataLoader(train_data, batch_size=dl.batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=dl.batch_size, shuffle=False)

# Now you have separate DataLoader objects for training and testing

In [None]:
# Function to count parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Count parameters before training
params_before_training = count_parameters(t)
print(f"Number of parameters before training: {params_before_training}")


Number of parameters before training: 19116547


In [None]:
# Define parameters to prune
parameters_to_prune = []

# Add linear layers in the head
parameters_to_prune.extend([(t.head[0], 'weight'), (t.head[2], 'weight')])

# Add linear layers in the transformer encoder
for layer in t.transformer_encoder.layers:
    # Add the linear layers within the feedforward part
    parameters_to_prune.extend([(layer.linear1, 'weight'), (layer.linear2, 'weight')])

    # Also, add the self-attention layers if desired
    parameters_to_prune.extend([(layer.self_attn, 'in_proj_weight'), (layer.self_attn.out_proj, 'weight')])
                            #(layer.self_attn, 'in_proj_bias'),

                            #(layer.self_attn.out_proj, 'bias')])

# Print the total number of parameters to prune
print("Total number of parameters to prune:", len(parameters_to_prune))

Total number of parameters to prune: 26


Train the model without pruning

In [None]:
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(t.parameters(), lr=0.00001, momentum=0.9)
loss = None
#for i in range(train_loader.__len__()):
dl = tqdm.tqdm(dl)
for i, data in enumerate(train_loader):
  #idx = random.randint(0,ds.__len__()-1)
  #x,y = train_loader.__getitem__(idx)
  #x,y = train_loader[idx]
  x, y = data
  y = y[0].long()
  outputs = t(x[0])
  loss = loss_fn(outputs, y)
  if i%10 == 0:
    print(i, loss.item())


  #optimizer.zero_grad()
  loss.backward()

  count_parameters(t)

  # Apply pruning
  '''prune.global_unstructured(
      parameters_to_prune,
      pruning_method=prune.L1Unstructured,
      amount=0.2,
  )'''

  '''# Remove pruned weights
  for layer in t.transformer_encoder.layers:
      prune.remove(layer.self_attn, 'in_proj_weight')
      prune.remove(layer.self_attn.out_proj, 'weight')
      #prune.remove(layer.self_attn, 'in_proj_bias')
      #prune.remove(layer.self_attn.out_proj, 'bias')

  prune.remove(t.head[0], 'weight')
  prune.remove(t.head[2], 'weight')
  print(count_parameters(t))'''

  optimizer.step()

In [None]:
correct = 0
total = 0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.type(torch.float32)
        y_pred = t(X_batch[0])
        y_pred_class = y_pred.argmax(dim=1)  # get the predicted class
        correct += (y_pred_class == y_batch.sum().item())
        total += y_batch.size(0)

acc = correct / total
print("Model accuracy: %.2f%%" % (acc*100))

Train a model with pruning

In [None]:
t2 = TransformerClassifier()



In [None]:
# Define parameters to prune
parameters_to_prune = []

# Add linear layers in the head
parameters_to_prune.extend([(t2.head[0], 'weight'), (t2.head[2], 'weight')])

'''# Add linear layers in the transformer encoder
for layer in t2.transformer_encoder.layers:
    # Add the linear layers within the feedforward part
    parameters_to_prune.extend([(layer.linear1, 'weight'), (layer.linear2, 'weight')])

    # Also, add the self-attention layers if desired
    parameters_to_prune.extend([(layer.self_attn, 'in_proj_weight'), (layer.self_attn.out_proj, 'weight')])
                            #(layer.self_attn, 'in_proj_bias'),
                            #(layer.self_attn.out_proj, 'bias')])'''

# Print the total number of parameters to prune
print("Total number of parameters to prune:", len(parameters_to_prune))

Total number of parameters to prune: 2


In [None]:
print(parameters_to_prune)

[(Linear(in_features=768, out_features=256, bias=True), 'weight'), (Linear(in_features=256, out_features=3, bias=True), 'weight')]


In [None]:
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(t2.parameters(), lr=0.00001, momentum=0.9)
loss = None
#for i in range(train_loader.__len__()):
dl = tqdm.tqdm(dl)
for i, data in enumerate(train_loader):
  #idx = random.randint(0,ds.__len__()-1)
  #x,y = train_loader.__getitem__(idx)
  #x,y = train_loader[idx]
  x, y = data
  y = y[0].long()
  outputs = t2(x[0])
  loss = loss_fn(outputs, y)
  if i%10 == 0:
    print(i, loss.item())

  #optimizer.zero_grad()
  loss.backward()

  count_parameters(t)

  # Apply pruning
  prune.global_unstructured(
      parameters_to_prune,
      pruning_method=prune.L1Unstructured,
      amount=0.2,
  )

  # Remove pruned weights

  prune.remove(t2.head[0], 'weight')
  prune.remove(t2.head[2], 'weight')

  '''for layer in t2.transformer_encoder.layers:
      prune.remove(layer.self_attn, 'in_proj_weight')
      prune.remove(layer.self_attn.out_proj, 'weight')
      #prune.remove(layer.self_attn, 'in_proj_bias')
      #prune.remove(layer.self_attn.out_proj, 'bias')'''



  optimizer.step()

In [None]:
correct = 0
total = 0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.type(torch.float32)
        y_pred = t2(X_batch[0])
        y_pred_class = y_pred.argmax(dim=1)  # get the predicted class
        correct += (y_pred_class == y_batch.sum().item())
        total += y_batch.size(0)

acc = correct / total
print("Model accuracy: %.2f%%" % (acc*100))

In [None]:
# Count parameters after pruning

params_after_pruning = count_parameters(t)
print(f"Number of parameters after pruning: {params_after_pruning}")