In [None]:
import os
import torch
import numpy as np
import pandas as pd

class TrackingData:
    def __init__(self):
        self.gpu_device = torch.device("cuda:0")
        self.data_a = None
        self.data_b = None
        self.max_len = None
        self.train_x = None
        self.test_x = None
        self.train_y = None
        self.test_y = None
        self.pinned = False
        self.batch_size = 16

        self._read_data()
        self._pad_data()
        self._split_data()
        self._generate_temporal_data()
        self._pin_to_gpu()

    def _read_data(self):
        self.data_a = []
        self.data_b = []
        self.max_len = 0
        tracking_files = os.listdir("data/tracking")
        for tracking_file in tracking_files:
            user_data = pd.read_csv(f'data/tracking/{tracking_file}')
            user_data['x'] = np.cos(user_data['gaze coord phi'])*np.sin(user_data[" gaze coord theta"])
            user_data['y'] = np.sin(user_data['gaze coord phi'])*np.sin(user_data[" gaze coord theta"])
            user_data = user_data[["gaze coord phi", " gaze coord theta", "x", "y"]].to_numpy()
            if user_data.shape[0] > self.max_len:
                self.max_len = user_data.shape[0]
            if "MENU_A" in tracking_file:
                self.data_a.append(user_data)
            else:
                self.data_b.append(user_data)
        print(f"Data loaded, MENU_A:{len(self.data_a)}, "
              f"MENU_B:{len(self.data_b)}, "
              f"Max_len:{self.max_len}")

    def _pad_data(self):
        padded_a = []
        padded_b = []
        for user_data in self.data_a:
            padded_data = np.zeros((self.max_len, 4))
            padded_data[:user_data.shape[0]] = user_data
            padded_a.append(padded_data)
        for user_data in self.data_b:
            padded_data = np.zeros((self.max_len, 4))
            padded_data[:user_data.shape[0]] = user_data
            padded_b.append(padded_data)
        self.data_a = np.array(padded_a, dtype=np.float32).transpose((0, 2, 1))
        self.data_b = np.array(padded_b, dtype=np.float32).transpose((0, 2, 1))
        print(f"MENU_A:{self.data_a.shape}, "
              f"MENU_B:{self.data_b.shape}")

    def _split_data(self, size=5):
        self.train_x = np.concatenate((self.data_a[:self.data_a.shape[0]-size], self.data_b[:self.data_b.shape[0]-size]))
        self.test_x = np.concatenate((self.data_a[-size:], self.data_b[-size:]))
        self.train_y = np.concatenate((np.zeros(self.data_a.shape[0]-size), np.ones(self.data_b.shape[0]-size)))
        self.test_y = np.concatenate((np.zeros(size), np.ones(size)))
        self.data_a = None
        self.data_b = None
        print(f"train_x:{self.train_x.shape}, {self.train_x.dtype}, \n"
              f"test_x:{self.test_x.shape}, {self.train_x.dtype}, \n"
              f"train_y:{self.train_y.shape}, {self.train_x.dtype}, \n"
              f"test_y:{self.test_y.shape}, {self.train_x.dtype}")

    def _generate_temporal_data(self, temporal_step=15):
        print("Generating temporal data...")
        shift_amounts = np.linspace(start=0, stop=self.max_len, num=temporal_step, endpoint=False, dtype=np.int32)
        temporal_x = []
        temporal_y = []
        for shift_amount in shift_amounts:
            shifted_x = np.roll(self.train_x, -shift_amount, axis=2)
            shifted_x[:,:,:-shift_amount] = 0.0
            temporal_x.append(shifted_x)
            temporal_y.append(self.train_y)
        self.train_x = np.concatenate(temporal_x)
        self.train_y = np.concatenate(temporal_y)
        temporal_x = []
        temporal_y = []
        for shift_amount in shift_amounts:
            shifted_x = np.roll(self.test_x, -shift_amount, axis=2)
            shifted_x[:,:,:-shift_amount] = 0.0
            temporal_x.append(shifted_x)
            temporal_y.append(self.test_y)
        self.test_x = np.concatenate(temporal_x)
        self.test_y = np.concatenate(temporal_y)
        print(f"train_x:{self.train_x.shape}, {self.train_x.dtype}, \n"
              f"test_x:{self.test_x.shape}, {self.train_x.dtype}, \n"
              f"train_y:{self.train_y.shape}, {self.train_x.dtype}, \n"
              f"test_y:{self.test_y.shape}, {self.train_x.dtype}")

    def _to_torch(self, data):
        if self.pinned:
            return data
        return torch.from_numpy(data).to(self.gpu_device)

    def _pin_to_gpu(self):
        self.train_x = torch.from_numpy(self.train_x).to(self.gpu_device)
        self.test_x = torch.from_numpy(self.test_x).to(self.gpu_device)
        self.train_y = torch.from_numpy(self.train_y).to(self.gpu_device)
        self.test_y = torch.from_numpy(self.test_y).to(self.gpu_device)
        self.pinned = True

    def get_train_batch(self, batch_size=None):
        if batch_size is None:
            batch_size = self.batch_size
        batch_ix = np.random.choice(tracking_data.train_x.shape[0], batch_size)
        batch_x = self.train_x[batch_ix]
        batch_y = self.train_y[batch_ix]
        return self._to_torch(batch_x), self._to_torch(batch_y)

    def get_test_batch(self, batch_size=None):
        if batch_size is None:
            batch_size = self.batch_size
        batch_ix = np.random.choice(tracking_data.test_x.shape[0], batch_size)
        batch_x = self.test_x[batch_ix]
        batch_y = self.test_y[batch_ix]
        return self._to_torch(batch_x), self._to_torch(batch_y)

In [None]:
tracking_data = TrackingData()

In [None]:
print(tracking_data.data_a[:,0,:].mean(), tracking_data.data_a[:,0,:].min(), tracking_data.data_a[:,0,:].max(), "\n",
      tracking_data.data_a[:,1,:].mean(), tracking_data.data_a[:,1,:].min(), tracking_data.data_a[:,1,:].max(), "\n",
      tracking_data.data_a[:,2,:].mean(), tracking_data.data_a[:,2,:].min(), tracking_data.data_a[:,2,:].max(), "\n",
      tracking_data.data_a[:,3,:].mean(), tracking_data.data_a[:,3,:].min(), tracking_data.data_a[:,3,:].max())

In [None]:
print(tracking_data.data_b[:,0,:].mean(), tracking_data.data_b[:,0,:].min(), tracking_data.data_b[:,0,:].max(), "\n",
      tracking_data.data_b[:,1,:].mean(), tracking_data.data_b[:,1,:].min(), tracking_data.data_b[:,1,:].max(), "\n",
      tracking_data.data_b[:,2,:].mean(), tracking_data.data_b[:,2,:].min(), tracking_data.data_b[:,2,:].max(), "\n",
      tracking_data.data_b[:,3,:].mean(), tracking_data.data_b[:,3,:].min(), tracking_data.data_b[:,3,:].max())

In [None]:
import torch.nn as nn

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.block = nn.Sequential(nn.Conv1d(2, 4, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(4, 8, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(8, 16, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(16, 32, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(128, 64, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(64, 32, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(32, 16, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(16, 8, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(8, 4, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(4, 2, kernel_size=3, stride=2, padding=1), nn.ReLU(inplace=True),
                                   nn.Conv1d(2, 1, kernel_size=5, stride=1, padding=0),
                                   )

    def forward(self, x):
        x = self.block(x).flatten()
        return x

device = torch.device("cuda:0")
net = ConvNet().to(device)

In [None]:
import torch.optim as optim

optimizer = optim.Adam(net.parameters(), lr=1e-4)
loss_criterion = nn.BCEWithLogitsLoss()

for i in range(10):
    optimizer.zero_grad()
    batch_x, batch_y = tracking_data.get_train_batch(batch_size=128)
    net_out = net(batch_x)
    loss = loss_criterion(net_out, batch_y)
    loss.backward()
    optimizer.step()
    if i % 1 == 0:
        print(f"loss:{loss.item()}")

In [None]:
net_out = net(tracking_data.test_x)
loss_test = loss_criterion(net_out, tracking_data.test_y)
loss_test.item()

In [None]:
sigmoid = nn.Sigmoid()
preds = (sigmoid(net_out) > 0.5).to(dtype=torch.float32)
acc = (tracking_data.test_y == preds).to(dtype=torch.float32).mean()
acc

In [None]:
import time

torch.save(net.state_dict(), f"models/{int(time.time())}_loss_{loss_test.item():.4f}_acc_{acc:.4f}.pth")

In [None]:
sigmoid = nn.Sigmoid()
for i in range(tracking_data.test_x.shape[0] // 10):
    net_out = net(tracking_data.test_x[i*10:(i+1)*10])
    loss_test = loss_criterion(net_out, tracking_data.test_y[i*10:(i+1)*10]).item()
    preds = (sigmoid(net_out) > 0.5).to(dtype=torch.float32)
    acc = (tracking_data.test_y[i*10:(i+1)*10] == preds).to(dtype=torch.float32).mean()
    print(f"i:{i}, loss:{loss_test:.4f}, acc:{acc:.4f}")