In [1]:
import pandas as pd
import numpy as np
import urllib.request,json
import datetime
import torch
import string
from torch.utils.data import Dataset,DataLoader
import torch.nn as nn
import torch.optim as optim

In [None]:
class IdentifyUser:
    
    def __init__(self):
        self.target = "Be Authentic. Be Yourself. Be Typing."
        self.pad_length = 45
        self.num_epochs = 4
        self.encodeChars()
        self.validateUsers()
        self.buildUserIndexMaps()
        self.partitionData()
        vec_length,output_size = len(self.charMap)+1,len(self.valid_users)
        self.keynet = KeyStrokeNet(self.pad_length,vec_length,output_size)
        self.train_net(self.num_epochs)
     
    def encodeChars(self):
        self.charMap = {}
        keyboard_chars = string.ascii_letters + string.digits + string.punctuation
        for i in range(len(keyboard_chars)):
            key = keyboard_chars[i]
            self.charMap[key] = i
        self.charMap['[backspace]'] = len(keyboard_chars)
        self.charMap[' '] = len(keyboard_chars) + 1
            
            
    def buildUserIndexMaps(self):
        self.userMap,self.indexMap = {},{}
        for i in range(len(self.valid_users)):
            user = self.valid_users[i]
            self.userMap[user] = i
            self.indexMap[i] = user
       
    def isValid(self,type_dict):
        s = ""
        for one_type in type_dict:
            char = one_type['character']
            if char == '[backspace]':
                if len(s) > 0:
                    s = s[:len(s)-1]
            else:
                s = s + char
        return (s == self.target)
     
    def validateUsers(self):
        url_str = "user_4a438fdede4e11e9b986acde48001122.json"
        prefix = "https://challenges.unify.id/v1/mle/"
        target = "Be Authentic. Be Yourself. Be Typing."
        self.valid_users, self.invalid_users = [],[]
        self.examples,self.labels = [],[]

        while url_str != None:

            with urllib.request.urlopen(prefix + url_str) as url:

                agg_user_data = json.loads(url.read().decode())
                user_data = agg_user_data['user_data']
                user_label = agg_user_data['user_label']
                next_user = agg_user_data['next']
                url_str = next_user

                num_valid_strings = 0

                for type_dict in user_data:
                    if self.isValid(type_dict):
                        num_valid_strings += 1

                valid = False
                if num_valid_strings >= 300:
                    self.valid_users.append(user_label)
                    self.examples.extend(user_data)
                    self.labels.extend([user_label]*len(user_data))
                else:
                    self.invalid_users.append(user_label)

    
    def train_net(self,epochs):

        ce_loss = torch.nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.keynet.parameters(), lr=4e-3)
        for e in range(0, epochs):
            for batch_idx, batch in enumerate(self.train_dataloader):
                row = batch['row']
                label = batch['label'].squeeze(1)
                optimizer.zero_grad()
                label_probs = self.keynet.forward(row)
                
                loss = ce_loss(label_probs,label)
                loss.backward()
                optimizer.step()

                pred_label = torch.argmax(label_probs,dim=1)
                batch_accuracy = torch.sum(pred_label == label)

            train_accuracy = batch_accuracy/len(self.train_dataloader.dataset)

            with torch.no_grad():    
                for batch_idx, batch in enumerate(self.val_dataloader):
                    row = batch['row']
                    label = batch['label']
                    label_probs = self.keynet.forward(row)
                    pred_label = torch.argmax(label_probs,dim=1)
                    test_accuracy = 100*torch.sum(pred_label == label)/len(label)

            print('Epoch: {} \tTrain_Accuracy: {:.5f}'.format(
                  e, train_accuracy))
            print('Epoch: {} \tTest_Accuracy: {:.5f}'.format(
                  e, test_accuracy))
            
    def partitionData(self):
        indices = np.arange(len(self.labels))
        np.random.shuffle(indices)
        test_indices = indices[:len(indices)//4]
        train_indices = indices[len(indices)//4:]
        val_data = np.take(self.examples,test_indices)
        train_data = np.take(self.examples,train_indices)
        val_labels = np.take(self.labels,test_indices)
        train_labels = np.take(self.labels,train_indices)
        
        train_dataset = KeyStrokeDataset(train_data,train_labels,True,
                                            self.charMap,self.userMap,self.pad_length)
        val_dataset = KeyStrokeDataset(val_data,val_labels,True,
                                            self.charMap,self.userMap,self.pad_length)
        
        self.train_dataloader = DataLoader(train_dataset, batch_size=150)
        self.val_dataloader = DataLoader(val_dataset,batch_size=len(val_dataset))
    
    def predict(self,examples):
        test_dataset = KeyStrokeDataset(examples,None,False,self.charMap,
                                        self.userMap,self.pad_length)
        test_dataloader(test_dataset,batch_size=len(test_dataset))
        for batch_idx, batch in enumerate(self.val_dataloader):
            row = batch['row']
            label_probs = keynet.forward(row)
            pred_label = torch.argmax(label_probs,dim=1)
        
        return pred_label

In [None]:
class KeyStrokeDataset(Dataset):
    
    def __init__(self,data,labels,hasLabels,charMap,userMap,pad_length):
        self.data = data
        self.labels = labels
        self.charMap = charMap
        self.userMap = userMap
        self.hasLabels = hasLabels
        self.pad_length = pad_length
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self,index):
        type_arr = self.data[index]
        row = []
        for i in range(0,len(type_arr)):
            one_type = type_arr[i]
            array = np.zeros(len(self.charMap)+1)
            index = self.charMap[one_type['character']]
            array[index+1] = 1
            curr = pd.to_datetime(one_type['typed_at'])
            if i == 0:
                time_diff = 0
            else:
                time_diff = (curr - last).total_seconds()  
            last = curr
            array[0] = time_diff*1000
            row.append(array)
        row = torch.Tensor(row)
        
        if len(row) > self.pad_length:
            pad_tensor = row[:pad_length]
        else:
            pad_tensor = torch.zeros(self.pad_length,len(row[0]))
            pad_tensor[:len(row)] = row 
        
        pad_tensor = pad_tensor.unsqueeze(0)
        
        if self.hasLabels:
            label = torch.Tensor([self.userMap[self.labels[index]]]).type(torch.LongTensor)
        else:
            label = None
            
        sample = {'row':pad_tensor, 'label':label}
        
        return sample  

In [None]:
class KeyStrokeNet(nn.Module):

    def __init__(self, pad_length,vec_length,output_size):
        super(KeyStrokeNet, self).__init__()
        self.width = vec_length
        self.height = pad_length
        num_convs = 3
        self.pooling = torch.nn.MaxPool2d(kernel_size=2, stride=2)
        self.lrelu = torch.nn.LeakyReLU()
        in_chans,out_chans = 1,3
        conv_op = []
        for i in range(num_convs):
            conv_op.append(torch.nn.Conv2d(in_chans,out_chans,kernel_size = 5, padding = 2))
            in_chans,out_chans = out_chans,out_chans*2
            conv_op.append(self.pooling)
            self.width,self.height = self.width//2,self.height//2
            conv_op.append(self.lrelu)
        
        self.conv_layers = nn.Sequential(*conv_op)
            
        self.in_chans = in_chans
        self.fc = torch.nn.Linear(self.width*self.height*in_chans,output_size)

    def forward(self, x):
        out1 = self.conv_layers(x)
        out1 = out1.reshape(-1,self.in_chans*self.width*self.height)
        out2 = self.fc(out1)
        out2 = self.lrelu(out2)
        return out2

In [None]:
idu = IdentifyUser()