In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, IterableDataset

# from model import SwipeCurveEncoderTransformer


In [3]:
class SwipeCurveEncoderTransformer_old(nn.Module):
    """
    Transformer-based Curve encoder takes in a sequence of vectors and creates a representation
    of a swipe gesture on a samrtphone keyboard.
    Each vector contains information about finger trajectory at a time step.
    It contains:
    * x coordinate
    * y coordinate
    * Optionally: t
    * Optionally: dx/dt
    * Optionally: dy/dt
    * Optionally: keyboard key that has x and y coordinates within its boundaries
    """

    def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout

        # self.pos_encoder = PositionalEncoding(input_size, dropout)
        self.encoder_layer = nn.TransformerEncoderLayer(input_size, num_heads, hidden_size, dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers)
    
    def forward(self, x):
        # x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        return x



class SwipeCurveDecoderTransformerv1(nn.Module):
    """
    Decodes a swipe gesture representation into a sequence of characters.

    Uses decoder transformer with masked attention to prevent the model from cheating.
    """

    def __init__(self, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, activation):
        super().__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.num_encoder_layers = num_encoder_layers
        self.num_decoder_layers = num_decoder_layers
        self.dim_feedforward = dim_feedforward
        self.dropout = dropout
        self.activation = activation

        self.decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_decoder_layers)

    def forward(self, x, memory):
        x = self.transformer_decoder(x, memory)
        return x


class SwipeCurveTransformer(nn.Module):
    """
    SwipeCurveTransformer is a sequence-to-sequence model that encodes a sequence of vectors
    representing a swipe gesture into a sequence of characters.
    """

    def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout

        self.encoder = SwipeCurveEncoderTransformerv1(input_size, hidden_size, num_layers, num_heads, dropout)
        self.decoder = SwipeCurveDecoderTransformerv1(input_size, num_heads, num_layers, num_layers, hidden_size, dropout, 'relu')
        self.out = nn.Linear(input_size, input_size)
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, x, y):
        x = self.encoder(x)
        x = self.decoder(y, x)
        x = self.out(x)
        x = self.softmax(x)
        return x
    

In [4]:
# let's test SwipeCurveTransformer on a tensor

input_size = 5
hidden_size = 10
num_layers = 2

x = torch.rand(10, 32, input_size)
y = torch.rand(20, 32, input_size)

model = SwipeCurveTransformer(input_size, hidden_size, num_layers, num_layers, 0.1)

output = model(x, y)

AssertionError: embed_dim must be divisible by num_heads

In [10]:
# class SwipeCurveEncoderTransformerLSTM(nn.Module):
#     """
#     Transformer-based Curve encoder takes in a sequence of vectors and creates a representation
#     of a swipe gesture on a samrtphone keyboard.
#     Each vector contains information about finger trajectory at a time step.
#     It contains:
#     * x coordinate
#     * y coordinate
#     * Optionally: t
#     * Optionally: dx/dt
#     * Optionally: dy/dt
#     * Optionally: keyboard key that has x and y coordinates within its boundaries
#     """

#     def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout):
#         super().__init__()
#         self.input_size = input_size
#         self.hidden_size = hidden_size
#         self.num_layers = num_layers
#         self.num_heads = num_heads
#         self.dropout = dropout

#         # self.pos_encoder = PositionalEncoding(input_size, dropout)
#         self.encoder_layer = nn.TransformerEncoderLayer(input_size, num_heads, hidden_size, dropout)
#         self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers)
#         self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
    
#     def forward(self, x):
#         # x = self.pos_encoder(x)
#         x = self.transformer_encoder(x)
#         x, _ = self.lstm(x)
#         return x

In [5]:
import json
from torch.utils.data import IterableDataset


class NeuroSwipeIterableDatasetv1(IterableDataset):
    """
    Dataset class for NeuroSwipe dataset.
    The dataset file weights over 3 GB and contains over 6 million swipe gestures.


    """

    def __init__(self, data_path):
        """
        Args:
            data_path (string): Path to the NeuroSwipe dataset in JSON format.
                A custom version of the dataset is used:
                "grid" property is replaced with "grid_name" property.
        """
        self.json_file = open(data_path, "r", encoding="utf-8")

    def __del__(self):
        self.json_file.close()

    def _get_data_from_json_line(self, line):
        """
        Parses a JSON line and returns a dictionary with data.
        """
        data = json.loads(line)
        word: str = data['word']

        X_list = data['curve']['x']
        Y_list = data['curve']['y']
        T_list = data['curve']['t']

        X = torch.tensor(X_list, dtype=torch.float32)
        Y = torch.tensor(Y_list, dtype=torch.float32)
        T = torch.tensor(T_list, dtype=torch.float32)

        return X, Y, T, word
    
    def __iter__(self):
        for line in self.json_file:
            yield self._get_data_from_json_line(line)

In [5]:
import json
from typing import Optional, List, Tuple
import array

from torch.utils.data import Dataset
from tqdm import tqdm


class NeuroSwipeDatasetv1(Dataset):
    """
    Dataset class for NeuroSwipe dataset.
    The dataset file weights over 3 GB and contains over 6 million swipe gestures.


    """

    def __init__(self,
                 data_path,
                #  max_len: int,
                #  word_tokenizer,
                 add_velcities: bool = True,
                 add_accelerations: bool = True,
                 total: Optional[int] = None):
        """
        Args:
            data_path (string): Path to the NeuroSwipe dataset in JSON format.
                A custom version of the dataset is used:
                "grid" property is replaced with "grid_name" property.
        """
        if add_accelerations and not add_velcities:

            raise ValueError("Accelerations are supposed \
                             to be an addition to velocities. Add velocities.")

        self.add_velcities = add_velcities
        self.add_accelerations = add_accelerations

        self.data_list = []
        self._set_data(data_path, self.data_list, total = total)
    
    def _set_data(self, data_path: str, data_list: list, total: Optional[int] = None):
        with open(data_path, "r", encoding="utf-8") as json_file:
            for line in tqdm(json_file, total = total):
                data_list.append(self._get_data_from_json_line(line))

    def _get_dx_dt(self, X: torch.tensor, T: torch.tensor) -> List[float]:
        """
        Calculates dx/dt for a list of x coordinates and a list of t coordinates.
        """
        dx_dt = torch.zeros_like(X)
        dx_dt[1:-1] = (X[2:] - X[:-2]) / (T[2:] - T[:-2])

        # x0 x1 x2 x3
        # t0 t1 t2 t3
        # dx_dt[0] = 0
        # dx_dt[1] = (x2 - x0) / (t2 - t0)
        # dx_dt[2] = (x3 - x1) / (t3 - t1)
        # dx_dt[3] = 0

        return dx_dt

    def _get_data_from_json_line(self, line) -> Tuple[list, list, list, str]:
        """
        Parses a JSON line and returns a dictionary with data.
        """
        data = json.loads(line)
        word: str = data['word']
        try:
            X = array.array('h', data['curve']['x'])
            Y = array.array('h', data['curve']['y'])
            T = array.array('h', data['curve']['t'])
        except:
            print(data['curve']['x'])
            print(data['curve']['y'])
            print(data['curve']['t'])

        return X, Y, T, word

    def __len__(self):
        return len(self.data_list)
    
    def __getitem__(self, idx):
        X_list, Y_list, T_list, word =  self.data_list[idx]

        X = torch.tensor(X_list, dtype=torch.float32)
        Y = torch.tensor(Y_list, dtype=torch.float32)
        T = torch.tensor(T_list, dtype=torch.float32)

        xyt = torch.cat(
            [
                X.reshape(-1, 1),
                Y.reshape(-1, 1),
                T.reshape(-1, 1)
            ],
            axis = 1
        )

        if self.add_velcities:
            dx_dt = self._get_dx_dt(X, T)
            dy_dt = self._get_dx_dt(Y, T)
            xyt = torch.cat(
                [
                    xyt,
                    dx_dt.reshape(-1, 1),
                    dy_dt.reshape(-1, 1)
                ],
                axis = 1
            )

        if self.add_accelerations:
            d2x_dt2 = self._get_dx_dt(dx_dt, T)
            d2y_dt2 = self._get_dx_dt(dy_dt, T)
            xyt = torch.cat(
                [
                    xyt,
                    d2x_dt2.reshape(-1, 1),
                    d2y_dt2.reshape(-1, 1)
                ],
                axis = 1
            )
    
        return xyt, word

In [2]:
train_dataset_path = "../data/data_separated_grid/train.jsonl"

In [3]:
dataset = NeuroSwipeDatasetv1(data_path=train_dataset_path, total = 6_000_000)

 20%|██        | 1216087/6000000 [01:17<05:04, 15717.60it/s]


KeyboardInterrupt: 

In [7]:
batch_size = 10
seq_len = 32
num_features = 3

encoder = SwipeCurveEncoderTransformerv1(input_size=3, hidden_size=128, num_layers=1, num_heads=1, dropout=0.1)
encoder(torch.rand(batch_size, seq_len, num_features)).shape

torch.Size([10, 32, 3])

In [13]:
batch_size = 10
seq_len = 32
num_features = 3

encoder = SwipeCurveEncoderTransformerLSTM(
    input_size=3, hidden_size=128, num_layers=1, num_heads=1, dropout=0.1)

encoder_out = encoder(torch.rand(batch_size, seq_len, num_features))

In [15]:
encoder_out[0].shape

torch.Size([10, 32, 256])

In [27]:
encoder_out[1]

AttributeError: 'tuple' object has no attribute 'shape'

In [None]:
# организовать padding encoder'a и decoder'a


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SwipeCurveTransformerEncoderv1(nn.Module):
    """
    Transformer-based Curve encoder takes in a sequence of vectors and creates a representation
    of a swipe gesture on a samrtphone keyboard.
    Each vector contains information about finger trajectory at a time step.
    It contains:
    * x coordinate
    * y coordinate
    * Optionally: t
    * Optionally: dx/dt
    * Optionally: dy/dt
    * Optionally: keyboard key that has x and y coordinates within its boundaries
    """

    def __init__(self, input_size, d_model,
                 dim_feedforward, num_layers, num_heads,
                 padding_idx = 0,
                 dropout = 0.1):
        """
        Arguments:
        ----------
        input_size: int
            Size of input vectors.
        d_model: int
            Size of the embeddings (output vectors).
            Should be equal to char embedding size of the decoder.
        dim_feedforward: int
        num_layers: int
            Number of encoder layers including the first layer.

        """
        super().__init__()
        # self.input_size = input_size
        # self.d_model = d_model
        # self.dim_feedforward  = dim_feedforward
        # self.num_layers = num_layers
        # self.num_heads = num_heads
        # self.dropout = dropout

        # self.pos_encoder = PositionalEncoding(input_size, dropout)
        self.first_encoder_layer = nn.TransformerEncoderLayer(input_size, num_heads, dim_feedforward, dropout)
        self.liner = nn.Linear(input_size, d_model)  # to convert embedding to d_model size
        num_layer_after_first = num_layers - 1
        if num_layer_after_first > 0:
            encoder_layer = nn.TransformerEncoderLayer(input_size, num_heads, dim_feedforward, dropout)
            self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        else:
            self.transformer_encoder = None
    
    def forward(self, x):
        # x = self.pos_encoder(x)
        x = self.first_encoder_layer(x)
        x = self.liner(x)
        if self.transformer_encoder:
            x = self.transformer_encoder(x)
        return x


class SwipeCurveTransformerDecoderv1(nn.Module):
    """
    Decodes a swipe gesture representation into a sequence of characters.

    Uses decoder transformer with masked attention to prevent the model from cheating.
    """

    def __init__(self, char_emb_size, nhead, num_decoder_layers, dim_feedforward, dropout, activation = F.relu):
        super().__init__()

        self.decoder_layer = nn.TransformerDecoderLayer(char_emb_size, nhead, dim_feedforward, dropout, activation)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_decoder_layers)
        self.out = nn.Linear(char_emb_size, char_emb_size)
        self.softmax = nn.LogSoftmax(dim=2)
    
    def forward(self, x, memory, tgt_mask):
        x = self.transformer_decoder(x, memory, tgt_mask=tgt_mask)
        x = self.out(x)
        # x = self.softmax(x)
        return x

 
    


class SwipeCurveTransformer(nn.Module):
    """
    SwipeCurveTransformer is a sequence-to-sequence model that encodes a sequence of vectors
    representing a swipe gesture into a sequence of characters.
    """

    def _get_mask(self, max_seq_len: int):
        """
        Returns a mask for the decoder transformer.
        """
        mask = torch.triu(torch.ones(max_seq_len, max_seq_len), diagonal=1)
        mask = mask.masked_fill(mask == 1, float('-inf'))
        return mask

    def __init__(self,
                 input_size,
                 curv_emb_size,
                 char_emb_size,
                 char_vocab_size,
                 num_encoder_layers,
                 num_decoder_layers,
                 dim_feedforward,
                 num_heads,
                 dropout,
                 activation = F.relu,
                 max_out_seq_len):
        super().__init__()
        
        self.char_embedding = nn.Embedding(char_vocab_size, char_emb_size)

        self.encoder = SwipeCurveTransformerEncoderv1(
            input_size, curv_emb_size, dim_feedforward, num_encoder_layers, num_heads, dropout)
        self.decoder = SwipeCurveTransformerDecoderv1(
            char_emb_size, num_heads, num_decoder_layers, dim_feedforward, dropout, activation, max_out_seq_len)
        self.out = nn.Linear(char_emb_size, char_emb_size)
        self.softmax = nn.LogSoftmax(dim=2)

        self.mask = self._get_mask(max_out_seq_len)

    def forward(self, x, y):
        x = self.encoder(x)
        y = self.char_embedding(y)
        x = self.decoder(y, x, tgt_mask=self.mask)
        x = self.out(x)
        x = self.softmax(x)
        return x

In [12]:
encoder = SwipeCurveTransformerEncoderv1(input_size=3, d_model=128, dim_feedforward=128, num_layers=1, num_heads=1, dropout=0.1)

batch_size = 10
seq_len = 32
num_features = 3

encoder(torch.rand(batch_size, seq_len, num_features)).shape

torch.Size([10, 32, 128])

In [13]:
decoder = SwipeCurveTransformerDecoderv1(char_emb_size=128, nhead=1, num_decoder_layers=1, dim_feedforward=128, dropout=0.1)

batch_size = 10
seq_len = 32
char_emb_size = 128

def get_mask(max_seq_len: int):
    """
    Returns a mask for the decoder transformer.
    """
    mask = torch.triu(torch.ones(max_seq_len, max_seq_len), diagonal=1)
    mask = mask.masked_fill(mask == 1, float('-inf'))
    return mask

target_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)

decoder(torch.rand(batch_size, seq_len, char_emb_size), torch.rand(batch_size, seq_len, char_emb_size), tgt_mask=torch.rand(seq_len, seq_len).masked_fill(torch.rand(seq_len, seq_len) > 0.5, float('-inf'))).shape

In [None]:
decoder = SwipeCurveTransformerDecoderv1()