In [None]:
!pip install datasets tokenizers transformers

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import get_linear_schedule_with_warmup
import datasets
from datasets import load_dataset
from sklearn.metrics import accuracy_score
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('device :', device)

In [None]:
dataset_train , dataset_test = load_dataset("silicone","dyda_da",split=['train[:100%]','test[:100%]'])
dataset = datasets.DatasetDict({"train" : dataset_train,"test":dataset_test})
dataset = dataset.rename_column("Label", "labels")
dataset = dataset.rename_column("Utterance", "text")
idx2label = {idx:label for idx, label in enumerate(dataset['train'].features['labels'].names)}
idx2label

In [None]:
def preprocess(data):
    tokenized_text = tokenizer(data['text'], padding=True, truncation=True, max_length=64)
    tokenized_text['labels'] = data['labels']
    return tokenized_text

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
encoded_dataset = dataset.map(preprocess, batched=True, 
                              remove_columns=['Dialogue_Act','Dialogue_ID','Idx'])
encoded_dataset.set_format("torch")

In [None]:
model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased",
    num_labels = 4, 
    output_attentions = False, 
    output_hidden_states = False,
)

In [None]:
# Freeze all layers
for param in model.parameters():
    param.requires_grad = False

# Unfreeze the last layer
for param in model.bert.encoder.layer[-1:].parameters():
    param.requires_grad = True

## Check parameters

In [None]:
# Get all of the model's parameters as a list of tuples.
model.named_parameters()

total_n_weights = sum(list(map(lambda param: param[1].flatten().size()[0], model.named_parameters())))
trainable_weights = sum(list(map(lambda param:
                                 param.flatten().size()[0] if param.requires_grad
                                 else 0, model.parameters())))

print('Total weights:', total_n_weights)
print('Trainable weights:', trainable_weights)

params = list(model.named_parameters())
print('\nThe BERT model has {:} different named parameters.\n'.format(len(params)))
print('==== Embedding Layer ====\n')
for p in params[0:5]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))
print('\n==== First Transformer ====\n')
for p in params[5:21]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))
print('\n==== Output Layer ====\n')
for p in params[-4:]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))

In [None]:
args = {
    'batch_size': 8,
    'epochs': 5,
    'learning_rate': 2e-5,
    'warmup_steps': 500,
    'epsilon': 1e-8,
}

In [None]:
# load data
train_data = encoded_dataset['train']
test_data = encoded_dataset['test']
train_loader = DataLoader(train_data, batch_size=args['batch_size'], shuffle=True)
test_loader = DataLoader(test_data, batch_size=args['batch_size'])

# define optimizer and scheduler
optimizer = torch.optim.AdamW(model.parameters(), lr=args['learning_rate'], eps=args['epsilon'])
total_steps = len(train_loader) * args['epochs']
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=args['warmup_steps'],
                                             num_training_steps=total_steps)

# define loss function
loss_fn = torch.nn.CrossEntropyLoss()

In [None]:
# define training loop
def train_loop(model, data_loader, optimizer, scheduler, loss_fn):
    model.train()
    losses = []
    for batch in tqdm(data_loader, desc="Training", leave=False):
        input_ids = batch['input_ids'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, token_type_ids, attention_mask)
        logits = outputs['logits']
        loss = loss_fn(logits, labels)
        losses.append(loss.item())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

    avg_loss = sum(losses) / len(losses)
    return avg_loss

# define evaluation function
def evaluate(model, data_loader):
    model.eval()
    predictions = []
    true_labels = []
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Evaluating", leave=False):
            input_ids = batch['input_ids'].to(device)
            token_type_ids = batch['token_type_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, token_type_ids, attention_mask)
            logits = outputs['logits']
            _, predicted = torch.max(logits, 1)

            predictions.extend(predicted.tolist())
            true_labels.extend(labels.tolist())

    return accuracy_score(true_labels, predictions)

In [None]:
# train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Train the model
for epoch in range(args['epochs']):
    train_loss = train_loop(model, train_loader, optimizer, scheduler, loss_fn)
    test_acc = evaluate(model, test_loader)
    print(f'Epoch {epoch + 1}/{args["epochs"]} - Train loss: {train_loss:.3f} - Test accuracy: {test_acc:.3f}')