<a href="https://colab.research.google.com/github/skore11/Laban_Pose_Conditional_GAN/blob/main/LabanPoseTransformation_Simplified.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install aiohttp nest_asyncio tqdm c3d numpy scipy torch matplotlib

In [None]:
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import requests
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
#from tqdm import tqdm
from tqdm.notebook import tqdm
from zipfile import ZipFile

cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if cuda else "cpu")
#cuda = False

In [None]:
# Given a 2D Matrix a, sort and compute the diffs of the rows.
# Only keep the row if the diff is greater than the
# tolerance
# Returns the index array corresponding to the rows to keep
def rough_unique(a, tol=1.2):
    i = np.argsort(a, axis=0)[:,0]
    d = np.append(tol*2, np.mean(abs(np.diff(a[i], axis=0)), axis=1))
    return i[(d/a.shape[1])>tol]

DATA_ZIP = "./local_data.zip" # where the data is downloaded to
DATA_DIR = "./local_data" # where the data is unzipped to
def get_data_zip(
        url="https://cyprus-data.s3.us-east-2.amazonaws.com/local_data.zip",
        chunk_size=8192,
    ):
    """
    Ensure that dataset is downloaded and unzipped
    """
    if not os.path.isfile(DATA_ZIP): # not already downloaded
        filesize = int(requests.head(url).headers["Content-Length"])
        progress = tqdm(
            unit="B",
            unit_scale=True,
            unit_divisor=1024,
            total=filesize,
            desc=DATA_ZIP)
        r = requests.get(url, stream=True)
        with open(DATA_ZIP, 'wb') as fp:
            for chunk in r.iter_content(chunk_size=chunk_size):
                wrote = fp.write(chunk)
                progress.update(wrote)
            progress.close()
    if not os.path.isdir(DATA_DIR): # not already unzipped
        with ZipFile(DATA_ZIP, 'r') as zp:
            files = zp.namelist()
            progress = tqdm(
                total=len(files),
                desc="Unzipping {}".format(DATA_DIR))
            for name in files:
                zp.extract(name)
                progress.update()
            progress.close()
    return

def cyprus_dataset():
    """
    Loads our augmented version of the Cyprus Dataset,
    returns two TensorDatasets, one for train and one for test
    """
    get_data_zip() # Ensure data is downloaded
    MAX_ZEROS = 3 # max number of 0's a single pose can have
    if not os.path.isdir(DATA_DIR):
        raise Exception("{} does not exist, make sure the\
                        data is downloaded and unzipped".format(DATA_DIR))

    # Get files from dataset
    npys = []
    for file in os.walk(DATA_DIR):
        npys = file[2]

    # Load the data
    labels_map = {
        'afraid':  0,
        'bored':   1,
        'excited': 2,
        'neutral': 3,
        'relaxed': 4,
    }
    poses = np.array([])
    labels = np.array([])
    for npy in tqdm(npys, desc="Loading data"):
        data = np.load(os.path.join(DATA_DIR,npy)).astype('float32')
        if data.shape[0] == 0 or data.shape[1] != N_DATA:
            continue
        # Remove similar poses
        uis = rough_unique(data, 2.0)
        data = data[uis]

        # Augment data with random scale and translations, both
        # together and separately
        f = 4
        rand_scale = lambda: np.random.uniform(low=0.8,high=1.2,size=(data.shape[0],1))
        rand_trans = lambda: \
            np.repeat(
                np.random.uniform(low=-100.0,high=100.0,size=(data.shape[0],3)),
            38, axis=1)
        scale = np.vstack([data*rand_scale() for i in range(f)])
        #trans = np.vstack([data+rand_trans() for i in range(f)])
        #both = np.vstack([(data*rand_scale())+rand_trans() for i in range(f)])
        data = np.vstack([data])

        # Remove similar poses
        #uis = rough_unique(data)
        #data = data[uis]

        label = labels_map[npy.split("_")[0]]

        if poses.any(): # there already is data, add to it
            poses = np.vstack([poses,data])
            # repeat label for all samples in file
            labels = np.concatenate([
                labels,
                np.repeat(label,data.shape[0])], axis=None)
        else: # first record, create arrays
            poses = np.array(data)
            labels = np.repeat(label,data.shape[0])

    # Remove similar poses
    uis = rough_unique(poses,2.0)
    poses = poses[uis]
    labels = labels[uis]

    # Shuffle
    shuf = np.random.permutation(poses.shape[0])
    poses = poses[shuf]
    labels = labels[shuf]

    cutoff = int(poses.shape[0] * 0.8)
    train_poses = torch.tensor(poses[:cutoff].astype('float32'), device=device)
    train_labels = torch.tensor(labels[:cutoff].astype('float32'), device=device)
    test_poses = torch.tensor(poses[cutoff:].astype('float32'), device=device)
    test_labels = torch.tensor(labels[cutoff:].astype('float32'), device=device)
    return TensorDataset(train_poses, train_labels), TensorDataset(test_poses, test_labels)


In [None]:
N_POINTS = 38*3
N_FEATURES = 0
N_DATA = N_POINTS+N_FEATURES
N_CLASSES = 5
N_NOISE = 1024

def block(i, o, norm=True):
    layers =[nn.Linear(i, o)]
    if norm:
        layers.append(nn.BatchNorm1d(o, 0.8))
    layers.append(nn.PReLU())
    return layers

class Generator(nn.Module):
    """
    Similar to Generator and Discriminator from Generative Tweening
    """
    def __init__(self):
        super(Generator, self).__init__()
        self.label_embedding = nn.Embedding(N_CLASSES, N_CLASSES)

        self.model = nn.Sequential(
            *block(N_DATA+N_NOISE+N_CLASSES, 256, norm=False),
            *block(256, 512),
            *block(512, 1024),
            *block(1024, 2048),
            nn.Dropout(0.2),
            *block(2048, 2048),
            nn.Dropout(0.2),
            *block(2048, 1024),
            *block(1024, 512),
            *block(512, 256),
            *block(256, N_DATA, norm=False),
        )

    def forward(self, pose, label, noise):
        x = torch.cat((pose, self.label_embedding(label), noise), -1)
        y = self.model(x)
        return y

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.label_embedding = nn.Embedding(N_CLASSES, N_CLASSES)
        self.model = nn.Sequential(
            *block(N_DATA+N_CLASSES, 256, norm=False),
            *block(256, 512, norm=False),
            nn.Dropout(0.5),
            *block(512, 512, norm=False),
            nn.Dropout(0.5),
            *block(512, 512, norm=False),
            nn.Dropout(0.5),
            *block(512, 1024, norm=False),
            nn.Dropout(0.5),
            *block(1024, 1024, norm=False),
            *block(1024, 1, norm=False),
        )

    def forward(self, gen_pose, label):
        x = torch.cat((gen_pose, self.label_embedding(label)), -1)
        y = self.model(x)
        return y


In [None]:
# For general setup and training approach:
#   https://github.com/eriklindernoren/PyTorch-GAN/blob/master/implementations/cgan/cgan.py

# Training inits
batch_size = 2**9 if cuda else 2**6
epochs = 5000
lr = 0.001
b1 = 0.5
b2 = 0.999

generator = Generator()
discriminator = Discriminator()
adversarial_loss = nn.MSELoss()

FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor

# If GPU available, init for cuda
generator.to(device)
discriminator.to(device)
adversarial_loss.to(device)

# Create optimizers after moving model to GPU
#opt_G = torch.optim.Adam(generator.parameters(), lr=0.0008, betas=(b1, b2))
#opt_D = torch.optim.Adam(discriminator.parameters(), lr=0.0016, betas=(b1, b2))
opt_G = torch.optim.RMSprop(generator.parameters(), lr=0.0005)
opt_D = torch.optim.RMSprop(discriminator.parameters(), lr=0.0010)


In [None]:
# Load data, display pie chart to show split
train, test = cyprus_dataset()

# Count occurences of each label
fig, axs = plt.subplots(2, 1)
fig.suptitle("Training/Test split for {:,} frames".format(len(train) + len(test)))
train_count = {}
for _, label in tqdm(train, desc="Analyzing training data"):
    train_count[int(label)] = train_count.get(int(label), 0) + 1

pie_labels = ['afraid','bored','excited','neutral','relaxed']
pcts = [train_count[k] / len(train) for k in sorted(train_count.keys())]
axs[0].pie(
    pcts,
    labels=[pie_labels[k] + " {:.2f}%".format(pcts[k]*100) for k in sorted(train_count.keys())],
    normalize=True)
axs[0].set_title("Train Data ({:,} frames)".format(len(train)))

test_count = {}
for _, label in tqdm(test, desc="Analyzing testing data"):
    test_count[int(label)] = test_count.get(int(label), 0) + 1

pcts = [test_count[k] / len(test) for k in sorted(test_count.keys())]
axs[1].pie(
    pcts,
    labels=[pie_labels[k] + " {:.2f}%".format(pcts[k]*100) for k in sorted(test_count.keys())],
    normalize=True)
axs[1].set_title("Test Data ({:,} frames)".format(len(test)))
#plt.show()
plt.tight_layout()
plt.savefig("./data_pie_chart.png")
plt.close()
print()


In [None]:
dl = DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=0)
print("Training started: {:,} total poses, batch size {:,}".format(len(train), batch_size))
op = tqdm(range(epochs), position=1, desc="Epoch")

g_losses = np.array([])
d_losses = np.array([])
d_real_losses = np.array([])
d_fake_losses = np.array([])

torch.backends.cudnn.benchmark = True

for epoch in op:
    ip = tqdm(
        enumerate(dl),
        position=2,
        total=int(len(train)/batch_size)+1,
        desc="[G loss: ???] [D loss: ???]",
        leave=False)

    generator.train()
    discriminator.train()
    for i, (data, labels) in ip:
        valid = FloatTensor(np.full((data.shape[0],1), 1.0)) # label smoothing
        fake = FloatTensor(np.full((data.shape[0],1), 0.0))

        # Train generator
        opt_G.zero_grad()
        noise = FloatTensor(np.random.normal(0, 1, (data.shape[0], N_NOISE)))
        gen_labels = labels[torch.randperm(data.shape[0])].long()
        gen_output = generator(data, gen_labels, noise)

        # How well are we fooling the discriminator
        # (matching the desired label)?
        validity = discriminator(gen_output, gen_labels)
        g_loss = adversarial_loss(validity, valid)

        g_loss.backward()
        opt_G.step()

        # Train Discriminator
        opt_D.zero_grad()

        # How well can the discriminator label real poses?
        real_validity = discriminator(data[:,:N_POINTS], labels.long())
        d_real_loss = adversarial_loss(real_validity, valid)

        # Can the discriminator detect the generated poses?
        fake_validity = discriminator(gen_output.detach(), gen_labels.long())
        d_fake_loss = adversarial_loss(fake_validity, fake)

        d_loss = (d_real_loss + d_fake_loss) / 2
        d_loss.backward()
        opt_D.step()

        # Update progress bar
        ip.set_description(
            desc="[G loss: %f] [D loss: %f]"
                % (g_loss.item(), d_loss.item())
        )

        # Track metrics
        g_losses = np.append(g_losses, g_loss.item())
        d_losses = np.append(d_losses, d_loss.item())
        d_real_losses = np.append(d_real_losses, d_real_loss.item())
        d_fake_losses = np.append(d_fake_losses, d_fake_loss.item())

    ip.close()

    if epoch % 50 == 0:
        generator.eval()
        noise = FloatTensor(np.random.normal(0, 1, (5, N_NOISE)))
        gen_input = data[0].repeat(5,1)
        gen_output = generator(
            gen_input, LongTensor(np.array([0,1,2,3,4])), noise).cpu().detach().numpy()
        np.save("output_{}.npy".format(epoch), np.vstack([
            data[0].cpu().detach().numpy()[:114],
            gen_output]))

        torch.save({
            'epoch': epoch,
            'generator_state_dict': generator.state_dict(),
            'discriminator_state_dict': discriminator.state_dict(),
            'opt_G_state_dict': opt_G.state_dict(),
            'opt_D_state_dict': opt_D.state_dict(),
            'g_loss': g_loss,
            'd_loss': d_loss,
            }, "checkpoint_{}.pt".format(epoch))

    l = g_losses.shape[0]
    o = 500
    if epoch != epochs-1:
        if (len(train)//batch_size)*epoch > o:
            xs = np.arange(l-o,l)
            plt.plot(xs, g_losses[-o:], label="G Losses")
            plt.plot(xs, d_losses[-o:], label="D Losses")
            plt.plot(xs, d_real_losses[-o:], label="D Real Losses")
            plt.plot(xs, d_fake_losses[-o:], label="D Fake Losses")
            plt.xlabel("Training step (Batch)")
            plt.ylabel("Loss")
            plt.legend()
            #plt.show()
            plt.savefig("losses_checkpoint.png")
            plt.close()
            np.save("g_losses_checkpoint.npy", g_losses)
            np.save("d_losses_checkpoint.npy", d_losses)
        else:
            xs = np.arange(l)
            plt.plot(xs, g_losses, label="G Losses")
            plt.plot(xs, d_losses, label="D Losses")
            plt.plot(xs, d_real_losses, label="D Real Losses")
            plt.plot(xs, d_fake_losses, label="D Fake Losses")
            plt.xlabel("Training step (Batch)")
            plt.ylabel("Loss")
            plt.legend()
            #plt.show()
            plt.savefig("losses_checkpoint.png")
            plt.close()
            np.save("g_losses_checkpoint.npy", g_losses)
            np.save("d_losses_checkpoint.npy", d_losses)

torch.save({
    'epoch': epoch,
    'generator_state_dict': generator.state_dict(),
    'discriminator_state_dict': discriminator.state_dict(),
    'opt_G_state_dict': opt_G.state_dict(),
    'opt_D_state_dict': opt_D.state_dict(),
    'g_loss': g_loss,
    'd_loss': d_loss,
    }, "final_state.pt")
np.save("g_losses_final.npy", g_losses)
np.save("d_losses_final.npy", d_losses)
print("Saved models!")


In [None]:
# TODO: display poses
# TODO: Play with losses + optimizers
# TODO: speed up training

In [None]:
xs = np.arange(g_losses.shape[0])
plt.plot(xs, g_losses, label="G Losses")
plt.plot(xs, d_losses, label="D Losses")
plt.plot(xs, d_real_losses, label="D Real Losses")
plt.plot(xs, d_fake_losses, label="D Fake Losses")
plt.xlabel("Training step (Batch)")
plt.ylabel("Loss")
plt.legend()
#plt.show()
plt.savefig("losses_final.png")
plt.close()