In [1]:
import os
import torch
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as TF

from enum import Enum
from torch.autograd import Variable
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

# Define global variables:
samples_path = os.environ.get('SAMPLES_PATH') or '.\\samples'

x_data_file = os.environ.get('X_DATA_FILE') or 'jupyter_landmarks.csv'
x_cols_start_index = os.environ.get('X_COLS_START_INDEX') or 1
x_cols_end_index = os.environ.get('X_COLS_END_INDEX') or 209

y_data_file = os.environ.get('Y_DATA_FILE') or 'unity_blendshapes.csv'
y_cols_start_index = os.environ.get('Y_COLS_START_INDEX') or 2
y_cols_end_index = os.environ.get('Y_COLS_END_INDEX') or 74

output_filename = os.environ.get('OUTPUT_FILENAME') or 'expressions.csv'

quat_domain = [-1, 1]
blend_domain = [0, 100]



In [2]:
class Modes(Enum):
    TRAIN = 'train'
    TEST = 'test'

In [3]:
def getDataFromCSV(mode, file, start, end):
    return pd.read_csv(f'{samples_path}\\{mode}\\{file}',
                               usecols = range(start, end))


In [4]:
def normallizeData(data):
    std_history = []
    mean_history = []

    for i in range(data.shape[1]):
        # Compute standard deviation
        std = torch.std(data[:, i])
        std = 0.001 if std == 0 else std

        # Save the std and mean history for denormalize later
        std_history.append(std)
        mean_history.append(torch.mean(data[:, i]))

        # Normalize the data
        data[:, i] = (data[:, i] - torch.mean(data[:, i])) / std
        
    return data, std_history, mean_history

In [5]:
def getData(mode):
    # Get data from csv files
    x = getDataFromCSV(mode, x_data_file, x_cols_start_index, x_cols_end_index)
    y = getDataFromCSV(mode, y_data_file, y_cols_start_index, y_cols_end_index)

    # Transforms the data to tensors
    x_tensor = torch.tensor(x.values, requires_grad=True).float()
    y_tensor = torch.tensor(y.values, requires_grad=True).float()

    # Normallize the data
    norm_x_tensor, _, _ = normallizeData(x_tensor)
    norm_y_tensor, y_std_history, y_mean_history = normallizeData(y_tensor)

    return norm_x_tensor, norm_y_tensor, y_std_history, y_mean_history

In [6]:
class LandmarksDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y


    def __getitem__(self, index):
        return (self.x[index], self.y[index])
    
    def __len__(self):
        return len(self.x)

In [7]:
def getDataset(mode):
    x, y, y_std_history, y_mean_history = getData(mode)
    return LandmarksDataset(x, y), y_std_history, y_mean_history

In [8]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(208, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 128)
        self.fc5 = nn.Linear(128, 72)
            
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x


In [9]:
def train(epochs, train_loader, net, optimizer, criterion, log_interval):
    for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = Variable(data), Variable(target)
            optimizer.zero_grad()
            net_out = net(data)
            loss = criterion(net_out, target)
            loss.backward()
            optimizer.step()
            if batch_idx % log_interval == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                           100. * batch_idx / len(train_loader), loss.item()))
                
    return net

In [10]:
def test(test_loader, net, criterion):
    test_loss = 0
    # correct = 0
    results = torch.tensor([]).float()

    for data, target in test_loader:
        data, target = Variable(data), Variable(target)
        net_out = net(data)
        results = torch.cat((results, net_out))
        # sum up batch loss
        test_loss += criterion(net_out, target).item()
        # pred = net_out.data.max(1)[1]  # get the index of the max log-probability
        # correct += pred.eq(target.data).sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}'.format(
        test_loss))

    return results, test_loss

In [11]:
def denormalizeData(data, std_history, mean_history):
    for i in range(data.shape[1]):
        std, mean = std_history[i], mean_history[i]
        data[:, i] = (data[:, i] + mean) * std
    
    return data

In [12]:
def saveResultsToOutputFile(mode, filename, results):
    # Create the output csv columns
    blend_cols = []
    quat_cols = ["Quaternion_x", "Quaternion_y", "Quaternion_z", "Quaternion_w"]
    for i in range (0, 68):
        blend_cols.append("Blendshape_{0}".format(i))
    output_cols = quat_cols + blend_cols
    
    # Convert the results to data frame 
    results = results.detach().numpy()
    results = pd.DataFrame(results, columns=output_cols)
    
    # Assign values outside boundary to boundary values
    results.loc[:, quat_cols] = results.loc[:, quat_cols].clip(quat_domain[0], quat_domain[1])
    results.loc[:, blend_cols] = results.loc[:, blend_cols].clip(blend_domain[0], blend_domain[1])

    # Save the results to the output csv file
    results.to_csv(f'{samples_path}\\{mode}\\{filename}')


In [13]:
def create_nn(batch_size=50, learning_rate=0.001, epochs=20,
              log_interval=10):
    
    # Create the train data loader
    train_dataset, _, _ = getDataset(Modes.TRAIN.value)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size,
        shuffle=True)

    # Create the test data loader
    test_dataset, std_history, mean_history = getDataset(Modes.TEST.value)
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size,
        shuffle=False)

    # Create the net
    net = Net()

    # Create an optimizer
    optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

    # Create a loss function
    criterion = nn.MSELoss(reduction='mean')
    # criterion = nn.L1Loss(reduction='mean')

    # Run the main training loop
    net = train(epochs, train_loader, net, optimizer, criterion, log_interval)
                
    # Run a test loop
    results, test_loss = test(test_loader, net, criterion)
    
    # De-normalize the data to the original domains
    results = denormalizeData(results, std_history, mean_history)
    
    # Save the results to output file
    saveResultsToOutputFile(Modes.TEST.value, output_filename, results)

In [14]:
if __name__ == "__main__":
    run_opt = 2
    if run_opt == 1:
        simple_gradient()
    elif run_opt == 2:
        create_nn()


Test set: Average loss: 0.0229
