Firstly, we create our SpaceDataset. We set the number of max_label_length to 50, as we have already preprocessed and prepared our dataset so every entry has less or equal than 50 tokens after parsing the AST C tree. 
Moreover, we pad every labels sequence by adding number 100 to it. This way, every labels array will have exactly 50 elements. Later on, index/number 100 will be ignored when calculating loss and accuracy. 
In the end, __getitem__ method returns no_of_tokens, spellings, kinds and labels for the single example.

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(0)

class SpaceDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
        self.max_label_length = 50

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        no_of_tokens = len(self.dataset[idx]['token_spellings'])
        spellings = " ".join(self.dataset[idx]['token_spellings'])
        kinds = " ".join(self.dataset[idx]['token_kinds'])
        labels = self.dataset[idx]['labels']

        if len(labels) < self.max_label_length:
            labels = labels + [100] * (self.max_label_length - len(labels))  # padding with 100 for as ignoring index
        else:
            labels = labels[:self.max_label_length]

        labels = torch.tensor(labels, dtype=torch.float)
        return no_of_tokens, spellings, kinds, labels

We use ALBERT tranformer for obtaining relevant information on the code data. ALBERT transformer is loaded with pre-trained weights. We added one more fully connected layer on top of the transformer model for fine tuning it on our space prediction.

Spellings and kinds are passed together as an input through the tokenizer. Later, we obtain last_hidden_state of the output and pass it through the fully connected layer. Finally, we pass previous output through the sigmoid function to obtain space prediction.

In [None]:
from torch import nn
import torch
from transformers import AlbertTokenizer, AlbertModel
# import torch_directml
# dml = torch_directml.device()

class SpaceALBERT(nn.Module):
    def __init__(self, pretrained_model_name='albert-base-v2'):
        super(SpaceALBERT, self).__init__()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = AlbertModel.from_pretrained(pretrained_model_name).to(self.device)
        self.tokenizer = AlbertTokenizer.from_pretrained(pretrained_model_name)
        self.fc = nn.Linear(768, 1)

    def forward(self, spellings, kinds):
        inputs = self.tokenizer(spellings, kinds, return_tensors='pt', padding=True, truncation=True).to(self.device)
        outputs = self.model(**inputs)
        last_hidden_state = outputs.last_hidden_state
        token_logits = self.fc(last_hidden_state).squeeze(-1)
        space_preds = torch.sigmoid(token_logits)

        return space_preds

Here, we obtain previously processed data and create training, validation and test datasets and dataloaders.

In [3]:
from datasets import load_dataset
hf_train_set = load_dataset('json', data_files='data/train_serialized.json')['train']
train_set = SpaceDataset(hf_train_set)
train_loader = DataLoader(train_set, batch_size = 16, shuffle = True)

hf_val_set = load_dataset('json', data_files='data/val_serialized.json')['train']
val_set = SpaceDataset(hf_val_set)
val_loader = DataLoader(val_set, batch_size = 16, shuffle = False)

hf_test_set = load_dataset('json', data_files='data/test_serialized.json')['train']
test_set = SpaceDataset(hf_test_set)
test_loader = DataLoader(test_set, batch_size = 16, shuffle = False)

We initialize the model. For optimizer, we use Adam optimizer with learning rate of 1e-5. For computing the loss, we use BCELoss.

In [None]:
import torch.optim as optim
model = SpaceALBERT().to("cuda" if torch.cuda.is_available() else "cpu")
optimizer = optim.Adam(model.parameters(), lr=1e-5)
criterion = nn.BCELoss()

We define our training method. For outputs, we slice it to 50, as no input sequence has more than 50 tokens in AST tree. Additionally, we apply a mask to ignore padding labels - which were set to 100 in SpaceDataset class. We calculate the loss and update the model weights.

In [5]:
def train(model, optimizer, train_loader, criterion):
    model.train()
    total_loss = 0
    bc = 0
    for batch in train_loader:
        optimizer.zero_grad()
        no_of_tokens, spellings, kinds, labels = batch

        labels = torch.tensor(labels, dtype=torch.float).to(model.device)

        outputs = model(spellings, kinds)
        outputs = outputs[:, :50]

        mask = (labels != 100).float()
        masked_outputs = outputs * mask
        masked_labels = labels * mask

        loss = criterion(masked_outputs, masked_labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f'Training loss: {total_loss/len(train_loader)}')

Here we define our validation method. The code logic is the same as in the train method. Additionally, we obtain predictions by comparing outputs with the threshold of 0.5. After that, we apply the same mask for predictions. We calculate the loss and accuracy for validation dataset.

In [6]:
def evaluate(model, val_loader, criterion, threshold=0.5):
    model.eval()
    total_loss = 0
    correct_predictions = 0
    total_valid_predictions = 0

    with torch.no_grad():
        for batch in val_loader:
            no_of_tokens, spellings, kinds, labels = batch

            labels = torch.tensor(labels, dtype=torch.float).to(model.device)

            outputs = model(spellings, kinds)
            outputs = outputs[:, :50]

            mask = (labels != 100).float()
            masked_outputs = outputs * mask
            masked_labels = labels * mask

            loss = criterion(masked_outputs, masked_labels)
            total_loss += loss.item()

            predictions = (masked_outputs > threshold).float()
            masked_predictions = predictions * mask

            correct_predictions += ((masked_predictions == masked_labels) * mask).sum().item()
            total_valid_predictions += mask.sum().item() 

    avg_loss = total_loss / len(val_loader)
    accuracy = correct_predictions / total_valid_predictions * 100

    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return avg_loss, accuracy


Finally, we run our training process. In every epoch, we run train and evaluate methods. We obtain the accuracy and if it's higher than the latest best accuracy, we set it to new best accuracy and save the model (the output after the cell shows the results for epoch 7 and 8). 

In [None]:
top_accuracy = 0
epochs = 8

for i in range(0, epochs):
  train(model, optimizer, train_loader, criterion)
  _, acc = evaluate(model, val_loader, criterion)
  if acc > top_accuracy:
    top_accuracy = acc
    model_state_dict = model.state_dict()
    torch.save(dict(model_state_dict=model_state_dict, epoch=i), f"space_{i}.pkl")

  labels = torch.tensor(labels, dtype=torch.float).to(model.device)


Training loss: 0.06282496693549565


  labels = torch.tensor(labels, dtype=torch.float).to(model.device)


Validation Loss: 0.0778, Accuracy: 89.53%
Training loss: 0.05721921129236799
Validation Loss: 0.0761, Accuracy: 90.10%


Now it's time to test our model on test dataset. Firstly, we need to load the best model.

In [None]:
checkpoint = torch.load("checkpoints/space_7.pkl", map_location='cpu')
msg = model.load_state_dict(checkpoint['model_state_dict'], strict=False)

Next, we have to write evaluate_test function which is almost the same as the evaluate function.

In [None]:
def evaluate_test(model, threshold=0.5):
    model.eval()
    correct_predictions = 0
    total_valid_predictions = 0

    with torch.no_grad():
        for batch in val_loader:
            no_of_tokens, spellings, kinds, labels = batch

            labels = torch.tensor(labels, dtype=torch.float).to(model.device)

            outputs = model(spellings, kinds)
            outputs = outputs[:, :50]

            mask = (labels != 100).float()
            masked_outputs = outputs * mask
            masked_labels = labels * mask

            predictions = (masked_outputs > threshold).float()
            masked_predictions = predictions * mask

            correct_predictions += ((masked_predictions == masked_labels) * mask).sum().item()
            total_valid_predictions += mask.sum().item() 

    accuracy = correct_predictions / total_valid_predictions * 100
    print(f'Test Accuracy: {accuracy:.2f}%')

Now we only have to run the previous function.

In [None]:
evaluate_test(model, test_loader, criterion)

  labels = torch.tensor(labels, dtype=torch.float).to(model.device)


Test Accuracy: 90.10%


Great! Our model achieved the accuracy of 90.1% on our test dataset after only 8 epochs.