# Attention mechanism

In [None]:
import sys

import numpy as np
import scipy.special

import textwrap

In [None]:
def create_tensor(t):
    """
    Function for creating an np.array array from a list of lists

      Arguments:
        t: list of lists

      Returns:
        np.ndarray
    """
    return np.array(t)

def print_tensor(t, name):
    """Displaying the size of the tensor and itself"""
    print(f'{name} shape: {t.shape}\n {t}\n')

In [None]:
assert create_tensor([[1, 0, 0], [0, 1, 0]]).shape == (2, 3)
assert type(create_tensor([[1, 2, 3], [4, 5, 6]])) == np.ndarray

In [None]:
q = create_tensor([[1, 0, 0], [0, 1, 0]])
print_tensor(q, 'query')
k = create_tensor([[1, 2, 3], [4, 5, 6]])
print_tensor(k, 'key')
v = create_tensor([[0, 1, 0], [1, 0, 1]])
print_tensor(v, 'value')
m = create_tensor([[0, 0], [-1e9, 0]])
print_tensor(m, 'mask')

query shape: (2, 3)
 [[1 0 0]
 [0 1 0]]

key shape: (2, 3)
 [[1 2 3]
 [4 5 6]]

value shape: (2, 3)
 [[0 1 0]
 [1 0 1]]

mask shape: (2, 2)
 [[ 0.e+00  0.e+00]
 [-1.e+09  0.e+00]]



In [None]:
def dotProdAtt(query, key, value, mask, scale=True):
    """
    Self attention with a scalar product
    """
    
    assert query.shape[-1] == key.shape[-1] == value.shape[-1], "There is a problem with q, k, v - dimentions differ"
    
    # We preserve the depth/dimension of the embedding of the query to reduce the scale of the scalar product
    if scale: 
        depth = query.shape[-1]
    else:
        depth = 1

    # Calculate the scaled scalar product of the key on query
    
    dots = np.matmul(query, key.T) / np.sqrt(depth) 
    
    # Apply mask
    if mask is not None:
        dots = np.where(mask, dots, np.full_like(dots, -1e9)) 
    
    # Calculate softmax
    from scipy.special import logsumexp
    logsumexp = logsumexp(dots, axis=-1, keepdims=True) 

    # Getting sotmax
    dots = np.exp(dots - logsumexp)

    # Multiply dots by value to get self-awareness  
    attention = np.matmul(dots, value)
    
    return attention

In [None]:
def dotProdSelfAtt(q, k, v, scale=True):
    """ 
    Masked self-attention
    """
    
    # Size of the penultimate dimension of the query
    mask_size = q.shape[-2] 

    # Creating a matrix with units under the main diagonal and 0 above it. Final dimension: (1, mask_size, mask_size)
    mask = np.expand_dims(np.tril(np.ones(mask_size)), axis=0)
    
    assert np.allclose(mask, np.array([[[1., 0.],
                                        [1., 1.]]]))
    mask = mask.astype(bool)
        
    return dotProdAtt(q, k, v, mask, scale=scale)

In [None]:
assert np.allclose(dotProdSelfAtt(q, k, v), 
               np.array([[[0., 1., 0.],[0.84967455, 0.15032545, 0.84967455]]]))

# BERT

## Classification of sentiment

Based on a dataset of movie reviews, we want to determine the user's mood (sentiment) and predict 1 - if positive sentiment and 0 - if negative.

In fact, we use two models for this task:
- DistilBERT is a lighter version of BERT created by HuggingFace, while showing a final quality close to BERT.
- Logistic regression from sklearn for final classification into positive and negative sentiment.

In [None]:
!pip install transformers



In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_val_score
import torch
import transformers as ppb
import warnings
warnings.filterwarnings('ignore')

## Loading and preparing

To speed up processing, we will take only 3000 sentences from the dataset

In [None]:
df = pd.read_csv(
    'https://github.com/clairett/pytorch-sentiment-classification/raw/master/data/SST2/train.tsv',
    delimiter='\t',
    header=None
)

In [None]:
N = 3000
batch_1 = df[:N]
batch_1[1].value_counts()

1    1565
0    1435
Name: 1, dtype: int64

Loading the pre-trained model

In [None]:
model_class, tokenizer_class, pretrained_weights = (ppb.DistilBertModel, ppb.DistilBertTokenizer, 'distilbert-base-uncased')

# Loading pre-trained models/tokenizers
tokenizer = tokenizer_class.from_pretrained(pretrained_weights)
model = model_class.from_pretrained(pretrained_weights)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_transform.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_projector.bias', 'vocab_projector.weight', 'vocab_layer_norm.bias']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
# Transform the text to the format acceptable for BERT
tokenized = batch_1[0].apply((lambda x: tokenizer.encode(x, add_special_tokens=True)))

In [None]:
assert tokenized[0] == [101, 1037, 18385, 1010, 6057, 1998, 2633, 18276, 2128, 16603, 1997, 5053, 1998, 1996, 6841, 1998, 5687, 5469, 3152, 102]

Each sentence is tokenized, and in order for BERT to be able to process all the examples in one batch, it is necessary to bring all the lists to the same size using padding.

In [None]:
max_len = 0
for i in tokenized.values:
    if len(i) > max_len:
        max_len = len(i)

padded = np.array([i + [0]*(max_len-len(i)) for i in tokenized.values])

In [None]:
assert np.array(padded).shape == (3000, 66)

In order not to confuse the model, it is necessary to create another variable containing a mask that will help ignore paddings during processing.

In [None]:
attention_mask = np.where(padded != 0, 1, 0)
attention_mask.shape

(3000, 66)

In [None]:
assert len(attention_mask[0] == 1) == 66

## Getting embeddings of offers

The model() function runs sentences through BERT.

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
torch.cuda.get_device_name(0) 

'Tesla K80'

In [None]:
input_ids = torch.tensor(padded).cuda()
attention_mask = torch.tensor(attention_mask).cuda()
model = model.to(device)
with torch.no_grad():
    last_hidden_states = model(input_ids, attention_mask=attention_mask)

In [None]:
last_hidden_states[0].shape

torch.Size([3000, 66, 768])

In [None]:
features = last_hidden_states[0][:,0,:].cpu().numpy()

In [None]:
# we save the labels of positive and negative sentences to the labels variable
labels = batch_1[1]

## Divide the data into train and test for classification and select parameters

In [None]:
train_features, test_features, train_labels, test_labels = train_test_split(features, labels)

In [None]:
parameters = {'C': np.linspace(0.0001, 100, 20)}
grid_search = GridSearchCV(LogisticRegression(), parameters)
grid_search.fit(train_features, train_labels)

print('best parameters: ', grid_search.best_params_)
print('best scrores: ', grid_search.best_score_)

best parameters:  {'C': 5.263252631578947}
best scrores:  0.8271111111111111


In [None]:
assert grid_search.best_params_['C'].round(2) == 5.26

In [None]:
lr_clf = LogisticRegression(C=grid_search.best_params_['C'])
lr_clf.fit(train_features, train_labels)

LogisticRegression(C=5.263252631578947)

## Quality estimation

In [None]:
lr_clf.score(test_features, test_labels)

0.828

For the purity of the experiment, we will find out the quality of the random classifier

In [None]:
from sklearn.dummy import DummyClassifier
clf = DummyClassifier()

scores = cross_val_score(clf, train_features, train_labels)
print("Dummy classifier score: %0.3f (+/- %0.2f)" % (scores.mean(), scores.std() * 2))

Dummy classifier score: 0.523 (+/- 0.00)
