In [None]:
# default_exp core

# core

> Unpack and load the [AMASS][] dataset for training with a PyTorch iterator.

To do:

1. ~~Move ProgressParallel to a different notebook, something weird happens with it in here~~
2. ~~Print info when recomputing npz lens~~
3. ~~Information about file length must be split by directory, otherwise it's impossible to do train/val/test splits~~
4. ~~Implement local cache shuffle with LMDB~~
5. ~~Safe DataLoader class~~
6. ~~Inspect and plot features, embed in notebook~~
7. Refactor feature info and plots into features notebook
8. ~~Inspect bug to determine why dataset loading fails on colab on second try~~
9. ~~Description of features as being in "exponential coordinates" is wrong, they are angle-axis vectors~~
10. `unpack_body_models` is not a good name for that function, as it unpacks pose sequences rather than body models

[amass]: https://amass.is.tue.mpg.de/

In [None]:
# hide
from nbdev.showdoc import *

# Unpack Tar Files

> Console script to unpack all tar files found in a specified directory and put them in another directory, then create a symlink to be able to find the unpacked data later

## Checksum Directories

> Checksum directories to only unpack tar files when target directory either doesn't exist or has been incorrectly unpacked.

It would probably be sufficient to check if the target directory exists, but this is more thorough.

In [None]:
# export
# https://stackoverflow.com/a/54477583/6937913
# function to evaluate the hash of an entire directory system to verify downloading and unpacking was correct
import hashlib
from _hashlib import HASH as Hash
from pathlib import Path
from typing import Union


def md5_update_from_file(filename: Union[str, Path], hash: Hash) -> Hash:
    assert Path(filename).is_file()
    with open(str(filename), "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash.update(chunk)
    return hash


def md5_file(filename: Union[str, Path]) -> str:
    return str(md5_update_from_file(filename, hashlib.md5()).hexdigest())


def md5_update_from_dir(directory: Union[str, Path], hash: Hash) -> Hash:
    assert Path(directory).is_dir()
    for path in sorted(Path(directory).iterdir(), key=lambda p: str(p).lower()):
        hash.update(path.name.encode())
        if path.is_file():
            hash = md5_update_from_file(path, hash)
        elif path.is_dir():
            hash = md5_update_from_dir(path, hash)
    return hash


def md5_dir(directory: Union[str, Path]) -> str:
    return str(md5_update_from_dir(directory, hashlib.md5()).hexdigest())

In [None]:
# export
hashes = {
    "ACCAD.tar.bz2": {
        "unpacks_to": "ACCAD",
        "hash": "193442a2ab66cb116932b8bce08ecb89",
    },
    "BMLhandball.tar.bz2": {
        "unpacks_to": "BMLhandball",
        "hash": "8947df17dd59d052ae618daf24ccace3",
    },
    "BMLmovi.tar.bz2": {
        "unpacks_to": "BMLmovi",
        "hash": "6dfb134273f284152aa2d0838d7529d5",
    },
    "CMU.tar.bz2": {"unpacks_to": "CMU", "hash": "f04bc3f37f3eafebfb12ba0cf706ca72"},
    "DFaust67.tar.bz2": {
        "unpacks_to": "DFaust_67",
        "hash": "7e5f11ed897da72c5159ef3c747383b8",
    },
    "EKUT.tar.bz2": {"unpacks_to": "EKUT", "hash": "221ee4a27a03afd1808cbb11af067879"},
    "HumanEva.tar.bz2": {
        "unpacks_to": "HumanEva",
        "hash": "ca781438b08caafd8a42b91cce905a03",
    },
    "KIT.tar.bz2": {"unpacks_to": "KIT", "hash": "3813500a3909f6ded1a1fffbd27ff35a"},
    "MPIHDM05.tar.bz2": {
        "unpacks_to": "MPI_HDM05",
        "hash": "f76da8deb9e583c65c618d57fbad1be4",
    },
    "MPILimits.tar.bz2": {
        "unpacks_to": "MPI_Limits",
        "hash": "72398ec89ff8ac8550813686cdb07b00",
    },
    "MPImosh.tar.bz2": {
        "unpacks_to": "MPI_mosh",
        "hash": "a00019cac611816b7ac5b7e2035f3a8a",
    },
    "SFU.tar.bz2": {"unpacks_to": "SFU", "hash": "cb10b931509566c0a49d72456e0909e2"},
    "SSMsynced.tar.bz2": {
        "unpacks_to": "SSM_synced",
        "hash": "7cc15af6bf95c34e481d58ed04587b58",
    },
    "TCDhandMocap.tar.bz2": {
        "unpacks_to": "TCD_handMocap",
        "hash": "c500aa07973bf33ac1587a521b7d66d3",
    },
    "TotalCapture.tar.bz2": {
        "unpacks_to": "TotalCapture",
        "hash": "b2c6833d3341816f4550799b460a1b27",
    },
    "Transitionsmocap.tar.bz2": {
        "unpacks_to": "Transitions_mocap",
        "hash": "705e8020405357d9d65d17580a6e9b39",
    },
    "EyesJapanDataset.tar.bz2": {
        "unpacks_to": "Eyes_Japan_Dataset",
        "hash": "d19fc19771cfdbe8efe2422719e5f3f1",
    },
    "BMLrub.tar.bz2": {
        "unpacks_to": "BioMotionLab_NTroje",
        "hash": "8b82ffa6c79d42a920f5dde1dcd087c3",
    },
    "DanceDB.tar.bz2": {
        "unpacks_to": "DanceDB",
        "hash": "9ce35953c4234489036ecb1c26ae38bc",
    },
}

## Parallel Unpacking with Joblib

> Unpacks tar files in multiple jobs to speed up unpacking the dataset.



In [None]:
# exports
import json
import argparse
import functools
import os
from shutil import unpack_archive
import joblib
from tqdm.auto import tqdm
from llamass.tqdm import ProgressParallel


def lazy_unpack(tarpath, outdir):
    # check if this has already been unpacked by looking for hash file
    tarpath, outdir = Path(tarpath), Path(outdir)
    unpacks_to = hashes[tarpath.name]["unpacks_to"]
    hashpath = outdir / Path(unpacks_to + ".hash")
    # if the hash exists and it's correct then assume the directory is correctly unpacked
    if hashpath.exists():
        with open(hashpath) as f:
            h = f.read()  # read hash
        if h == hashes[tarpath.name]["hash"]:
            return None
    else:
        # if there's no stored hash or it doesn't match, unpack the tar file
        unpack_archive(tarpath, outdir)
        # calculate the hash of the unpacked directory and check it's the same
        h = md5_dir(outdir / unpacks_to)
        _h = hashes[tarpath.name]["hash"]
        assert h == _h, f"Directory {outdir/unpacks_to} hash {h} != {_h}"
        # save the calculated hash
        with open(hashpath, "w") as f:
            f.write(h)


def unpack_body_models(tardir, outdir, n_jobs=1, verify=False, verbose=False):
    tar_root, _, tarfiles = [x for x in os.walk(tardir)][0]
    tarfiles = [x for x in tarfiles if "tar" in x.split(".")]
    tarpaths = [os.path.join(tar_root, tar) for tar in tarfiles]
    for tarpath in tarpaths:
        if verbose:
            print(f"{tarpath} extracting to {outdir}")
    unpack = lazy_unpack if verify else unpack_archive
    ProgressParallel(n_jobs=n_jobs)(
        (joblib.delayed(unpack)(tarpath, outdir) for tarpath in tarpaths),
        total=len(tarpaths),
    )


def fast_amass_unpack():
    parser = argparse.ArgumentParser(
        description="Unpack all the body model tar files in a directory to a target directory"
    )
    parser.add_argument(
        "tardir",
        type=str,
        help="Directory containing tar.bz2 body model files",
    )
    parser.add_argument(
        "outdir",
        type=str,
        help="Output directory",
    )
    parser.add_argument(
        "--verify",
        action="store_true",
        help="Verify the output by calculating a checksum, "
        "ensures that each tar file will only be unpacked once.",
    )
    parser.add_argument(
        "-n",
        default=1,
        type=int,
        help="Number of jobs to run the tar unpacking with",
    )
    args = parser.parse_args()
    unpack_body_models(args.tardir, args.outdir, n_jobs=args.n, verify=args.verify)

Test unpacking the sample data always yields the same result:

In [None]:
import tempfile
import hashlib

# https://stackoverflow.com/a/3431838/6937913
def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()


md5sums = {
    "amass_sample.npz": "d0b546b3619c8579ade39e3a8ccdc4e2",
    "dmpl_sample.npz": "576bb76b2a6328dc5c276c4150c466f0",
}

with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    for r, d, f in os.walk(tmpdirname):
        npz_files = [x for x in f if "npz" in x.split(".")]
        npz_paths = [os.path.join(tmpdirname, r, x) for x in npz_files]
    _md5sums = {os.path.split(fpath)[-1]: md5(fpath) for fpath in npz_paths}

for k in md5sums:
    assert md5sums[k] == _md5sums[k]

  0%|          | 0/1 [00:00<?, ?it/s]

Testing that `verify=True` works as expected. Can redefine `hashes` here for testing without breaking the exported library because this cell doesn't get exported by `nbdev`.

In [None]:
import time

hashes = {
    "sample.tar.bz2": {
        "unpacks_to": "sample",
        "hash": "b5a86fe22ed2799d79101a532eb0ff27",
    }
}

with tempfile.TemporaryDirectory() as tmpdirname:
    start = time.time()
    unpack_body_models("sample_data/", tmpdirname, 8, verify=True)
    unpacking_time = time.time() - start
    start = time.time()
    unpack_body_models("sample_data/", tmpdirname, 8, verify=True)
    skip_time = time.time() - start
    assert unpacking_time > skip_time

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

# Loading Functions

> Load the pose data directly from the `npz` files after unpacking.

Based on the [AMASS tutorial notebooks][amass], I would like to iterate over the dataset using a PyTorch Dataloader.

Steps to load:

1. Index all of the `npz` files in the AMASS directory
2. Iterate through all of them in sequence
    1. Load the `npz` file
    1. Cut out acceptable motion sequence in center of each file (typically middle 80% of motion sequence)
    2. _Optionally_ shuffle the dataset
    2. Iterate over this sequence along the first dimension
3. When running with `num_workers > 0`, give each worker a different random set of `npz` files to load

[amass]: https://github.com/nghorbani/amass/tree/master/notebooks

In [None]:
# export
import gzip
import json
import random
import math
import warnings
import numpy as np
import torch
import torch.utils.data as tudata

Looking at the sample data:

In [None]:
with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    for r, d, f in os.walk(tmpdirname):
        npz_files = [x for x in f if "npz" in x.split(".")]
        npz_paths = [os.path.join(tmpdirname, r, x) for x in npz_files]
    for npz_path in npz_paths:
        cdata = np.load(npz_path)
        print(npz_path)
        print("  ", [k for k in cdata.keys()])
        print("  ", [(k, cdata[k].shape) for k in cdata.keys()])

  0%|          | 0/1 [00:00<?, ?it/s]

/tmp/tmpos4dxm6z/sample/subdir/amass_sample.npz
   ['poses', 'gender', 'mocap_framerate', 'betas', 'marker_data', 'dmpls', 'marker_labels', 'trans']
   [('poses', (601, 156)), ('gender', ()), ('mocap_framerate', ()), ('betas', (16,)), ('marker_data', (601, 85, 3)), ('dmpls', (601, 8)), ('marker_labels', (85,)), ('trans', (601, 3))]
/tmp/tmpos4dxm6z/sample/subdir/dmpl_sample.npz
   ['poses', 'gender', 'mocap_framerate', 'betas', 'marker_data', 'dmpls', 'marker_labels', 'trans']
   [('poses', (235, 156)), ('gender', ()), ('mocap_framerate', ()), ('betas', (16,)), ('marker_data', (235, 67, 3)), ('dmpls', (235, 8)), ('marker_labels', (67,)), ('trans', (235, 3))]


In [None]:
# hide
def plot_file_sizes(unpacked_directory, bins):
    import matplotlib.pyplot as plt

    plt.style.use("ggplot")

    def walk_npz_paths(npz_directory):
        npz_paths = []
        for r, d, f in os.walk(npz_directory, followlinks=True):
            npz_files = [
                x for x in f if "npz" in x.split(".") and Path(x).name != "shape.npz"
            ]
            npz_paths += [os.path.join(npz_directory, r, x) for x in npz_files]
        return tuple(npz_paths)

    npz_paths = walk_npz_paths(unpacked_directory)

    file_sizes = {}
    for f in tqdm(npz_paths):
        file_sizes[f] = os.stat(f).st_size

    x = np.array(list(file_sizes.values()))
    bins = np.logspace(np.log10(min(x)), np.log10(max(x)), bins)
    plt.hist(x, bins=bins)
    plt.xscale("log")
    plt.xlabel("File Size (bytes)")
    plt.title("Size of numpy archives in AMASS")
    plt.savefig("images/amass_file_sizes.png")


# uncomment and run on unpacked AMASS directory to regenerate plot
# plot_file_sizes('/nobackup/gngdb/repos/amass/data', bins=50)

The AMASS dataset is composed of 14,096 `.npz` archives (at time of writing). The size of archives varies over two orders of magnitude, between 0.1MB and 10MB.

![A histogram of the file sizes in AMASS](./images/amass_file_sizes.png "AMASS File Sizes Histogram")

Other statistics we might want to know:

* Length of motion sequence in each of these files
* ...

## What do the fields mean?

![Screenshot quote from the AMASS paper](./images/amass-quote.png "AMASS Quote")

AMASS `npz` files contain 5 fields (`'poses', 'gender', 'betas', 'dmpls', 'trans'`), what do they mean?

* `poses` are [SMPLH][] vectors, which are a representation of pose based on [SMPL][] with additional information about the positions of the hands. What are SMPLH vectors composed of?
    * 52 joints, each represented with 3 parameters, 22 for the body and 30 for the hands
    * Encoded with 3 rotational degrees of freedom as [angle-axis rotation vectors][aa]
* `gender` is the reported gender of the actor (it's not clear if MPI has used their [gender classifier][gender] here)
* `betas` are "identity-dependent shape parameters"
* `dmpls` are soft tissue deformations described in the [original SMPL paper][smplpaper]
* `trans` I think this is the $\gamma$ 3D parameter representing the translation of the root coordinate system, it is required to describe the pose and should probably be concatenated to the pose vector as described in the [AMASS paper][amasspaper].

[gender]: https://github.com/nghorbani/homogenus
[SMPLH]: https://mano.is.tue.mpg.de/
[SMPL]: https://smpl.is.tue.mpg.de/
[smplpaper]: https://files.is.tue.mpg.de/black/papers/SMPL2015.pdf#page=12
[amasspaper]: https://files.is.tue.mpg.de/black/papers/amass.pdf
[aa]: https://en.wikipedia.org/wiki/Axis%E2%80%93angle_representation

## `npz` File Iterator

> Iterates over all the paths of all the `npz` files in AMASS.

In [None]:
# exports
def npz_paths(npz_directory):
    npz_directory = Path(npz_directory).resolve()
    npz_paths = []
    for r, d, f in os.walk(npz_directory, followlinks=True):
        for fname in f:
            if "npz" == fname.split(".")[-1] and fname != "shape.npz":
                yield os.path.join(npz_directory, r, fname)

In [None]:
with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    for npz_path in npz_paths(tmpdirname):
        assert Path(npz_path).exists()

  0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
from contextlib import contextmanager

@contextmanager
def symlink(target, source):
    source.symlink_to(target)
    try:
        yield source
    finally:
        source.unlink()

with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    tmpdirname = Path(tmpdirname)
    with symlink(tmpdirname/'sample', Path('sym')) as symlink_loc:
        for npz_path in npz_paths(symlink_loc):
            assert Path(npz_path).exists(), npz_path

  0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
# hide
# len([npz_path for npz_path in npz_paths('/nobackup/gngdb/repos/amass/data')])

## Inferring Dataset Size

> A function to calculate dataset size, with the result stored in this package.

The result of this calculation is stored in this package, the dataset loader will try to load this file or recreate it itself, so you can skip that step by copying it into the directory where you have unpacked the data.

In [None]:
#exports
def npz_len(npz_path, strict=True):
    cdata = np.load(npz_path)
    h = md5_file(npz_path)
    dirs = [hashes[h]['unpacks_to'] for h in hashes]
    if strict:
        m = []
        for p in Path(npz_path).parents:
            m += [d for d in dirs if p.name == d]
        assert len(m) == 1, f"Subdir of {npz_path} contains {len(m)} of {dirs}"
        subdir = m[0]
    else:
        subdir = Path(npz_path).parts[-2]
    return subdir, h, cdata["poses"].shape[0]

def npz_lens(unpacked_directory, n_jobs, strict=True):
    paths = [p for p in npz_paths(unpacked_directory)]
    return ProgressParallel(n_jobs=n_jobs)(
        [joblib.delayed(npz_len)(npz_path, strict=strict) for npz_path in paths], total=len(paths)
    )

def save_lens(save_path, npz_file_lens):
    with gzip.open(save_path, "wt") as f:
        f.write(json.dumps(npz_file_lens))

#npz_file_lens = npz_lens('/nobackup/gngdb/repos/amass/data', 10)
#save_lens('npz_file_lens.json.gz', npz_file_lens)

In [None]:
!du -hs npz_file_lens.json.gz

399K	npz_file_lens.json.gz


## Viable Indexes

For every `npz` file I need to pull out the viable indexes:

In [None]:
# exports
def keep_slice(n, keep):
    drop = (1.0 - keep) / 2.0
    return slice(int(n * drop), int(n * keep + n * drop))


def viable_slice(cdata, keep):
    """
    Inspects a dictionary loaded from `.npz` numpy dumps
    and creates a slice of the viable indexes.
    args:

        - `cdata`: dictionary containing keys:
            ['poses', 'gender', 'mocap_framerate', 'betas',
             'marker_data', 'dmpls', 'marker_labels', 'trans']
        - `keep`: ratio of the file to keep, between zero and 1.,
            drops leading and trailing ends of the arrays

    returns:

        - viable: slice that can access frames in the arrays:
            cdata['poses'], cdata['marker_data'], cdata['dmpls'], cdata['trans']
    """
    assert (
        keep > 0.0 and keep <= 1.0
    ), "Proportion of array to keep must be between zero and one"
    n = cdata["poses"].shape[0]
    return keep_slice(n, keep)

In [None]:
with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    for npz_path in npz_paths(tmpdirname):
        cdata = np.load(npz_path)
        print(npz_path)
        print("  ", viable_slice(cdata, 0.8))

  0%|          | 0/1 [00:00<?, ?it/s]

/tmp/tmpas2sa5g4/sample/subdir/amass_sample.npz
   slice(60, 540, None)
/tmp/tmpas2sa5g4/sample/subdir/dmpl_sample.npz
   slice(23, 211, None)


## `npz` Contents Iterator

> Loads an `.npz` file and iterates over the examples within.

In [None]:
# exports
def npz_contents(
    npz_path,
    clip_length,
    overlapping,
    keep=0.8,
    keys=("poses", "dmpls", "trans", "betas", "gender"),
    shuffle=False,
    seed=None,
):
    # cache this because we will often be accessing the same file multiple times
    cdata = np.load(npz_path)

    # slice of viable indices
    viable = viable_slice(cdata, keep)

    # slice iterator
    # every time the file is opened the non-overlapping slices will be the same
    # this may not be preferred, but loading overlapping means a lot of repetitive data
    def clip_slices(viable, clip_length, overlapping):
        i = 0
        step = 1 if overlapping else clip_length
        for i in range(viable.start, viable.stop, step):
            if i + clip_length < viable.stop:
                yield slice(i, i + clip_length)

    # buffer the iterator and shuffle here, when implementing that
    buf_clip_slices = [s for s in clip_slices(viable, clip_length, overlapping)]
    if shuffle:
        # this will be correlated over workers
        # seed should be passed drawn from torch Generator
        seed = seed if seed else random.randint(1e6)
        random.Random(seed).shuffle(buf_clip_slices)

    # iterate over slices
    for s in buf_clip_slices:
        data = {}
        # unpack and enforce data type
        to_load = [k for k in ("poses", "dmpls", "trans") if k in keys]
        for k in to_load:
            data[k] = cdata[k][s].astype(np.float32)
        if "betas" in keys:
            r = s.stop - s.start
            data["betas"] = np.repeat(
                cdata["betas"][np.newaxis].astype(np.float32), repeats=r, axis=0
            )
        if "gender" in keys:

            def gender_to_int(g):
                # casting gender to integer will raise a warning in future
                g = str(g.astype(str))
                return {"male": -1, "neutral": 0, "female": 1}[g]

            data["gender"] = np.array(
                [gender_to_int(cdata["gender"]) for _ in range(s.start, s.stop)]
            )
        yield data

In [None]:
def test_load_npz(clip_length, overlapping):
    with tempfile.TemporaryDirectory() as tmpdirname:
        unpack_body_models("sample_data/", tmpdirname, 8)
        for npz_path in npz_paths(tmpdirname):
            for data in npz_contents(npz_path, clip_length, overlapping):
                print([(k, data[k].shape) for k in data])
                for k in data:
                    assert data[k].shape[0] == clip_length
                break


test_load_npz(1, False)
test_load_npz(3, False)
test_load_npz(3, True)

  0%|          | 0/1 [00:00<?, ?it/s]

[('poses', (1, 156)), ('dmpls', (1, 8)), ('trans', (1, 3)), ('betas', (1, 16)), ('gender', (1,))]
[('poses', (1, 156)), ('dmpls', (1, 8)), ('trans', (1, 3)), ('betas', (1, 16)), ('gender', (1,))]


  0%|          | 0/1 [00:00<?, ?it/s]

[('poses', (3, 156)), ('dmpls', (3, 8)), ('trans', (3, 3)), ('betas', (3, 16)), ('gender', (3,))]
[('poses', (3, 156)), ('dmpls', (3, 8)), ('trans', (3, 3)), ('betas', (3, 16)), ('gender', (3,))]


  0%|          | 0/1 [00:00<?, ?it/s]

[('poses', (3, 156)), ('dmpls', (3, 8)), ('trans', (3, 3)), ('betas', (3, 16)), ('gender', (3,))]
[('poses', (3, 156)), ('dmpls', (3, 8)), ('trans', (3, 3)), ('betas', (3, 16)), ('gender', (3,))]


# PyTorch Dataset Class

> An iterable style PyTorch Dataset class to iterate over all of the `.npz` files storing motion sequences in a directory.

In [None]:
[p for p in Path('/nobackup/gngdb/repos/amass/data').glob('*') if p.is_dir()]

[Path('/nobackup/gngdb/repos/amass/data/val'),
 Path('/nobackup/gngdb/repos/amass/data/test'),
 Path('/nobackup/gngdb/repos/amass/data/train'),
 Path('/nobackup/gngdb/repos/amass/data/train_subset'),
 Path('/nobackup/gngdb/repos/amass/data/val_subset')]

In [None]:
# exports
class AMASS(tudata.IterableDataset):
    def __init__(
        self,
        amass_location,
        clip_length,
        overlapping,
        keep=0.8,
        transform=None,
        data_keys=("poses", "dmpls", "trans", "betas", "gender"),
        file_list_seed=0,
        shuffle=False,
        seed=None,
        strict=True
    ):
        assert clip_length > 0 and type(clip_length) is int
        self.transform = transform
        self.data_keys = data_keys
        self.amass_location = amass_location
        # these should be shuffled but pull shuffle argument out of dataloader worker arguments
        self._npz_paths = [npz_path for npz_path in npz_paths(amass_location)]
        random.Random(file_list_seed).shuffle(self._npz_paths)
        self._npz_paths = tuple(self._npz_paths)
        self.npz_paths = self._npz_paths
        self.clip_length = clip_length
        self.overlapping = overlapping
        self.keep = keep
        self.shuffle = shuffle
        self.seed = seed if seed else random.randint(0, 1e6)
        self.strict = strict

    def infer_len(self, n_jobs=4):
        # uses known dimensions of the npz files in the AMASS dataset to infer the length
        # with clip_length and overlapping settings stored
        lenfile = Path(self.amass_location) / Path("npz_file_lens.json.gz")
        # try to load file
        if lenfile.exists():
            with gzip.open(lenfile, "rt") as f:
                self.npz_lens = json.load(f)
                def filter_lens(npz_lens):
                    # filter out file length information to only existing dirs
                    datasets = [p.name for p in Path(self.amass_location).glob('*') if p.is_dir()]
                    return [(p, h, l) for p, h, l in npz_lens
                            if p in datasets]
                self.npz_lens = filter_lens(self.npz_lens)
        else:  # if it's not there, recompute it and create the file
            print(f'Inspecting {len(self.npz_paths)} files to determine dataset length'
                  f', saving the result to {lenfile}')
            self.npz_lens = npz_lens(self.amass_location, n_jobs, strict=self.strict)
            save_lens(lenfile, self.npz_lens)

        # using stored lengths to infer the total dataset length
        def lenslice(s):
            if self.overlapping:
                return (s.stop - s.start) - (self.clip_length - 1)
            else:
                return math.floor((s.stop - s.start) / self.clip_length)

        N = 0
        for p, h, l in self.npz_lens:
            s = keep_slice(l, keep=self.keep)
            N += lenslice(s)

        return N

    def __len__(self):
        if hasattr(self, "N"):
            return self.N
        else:
            self.N = self.infer_len()
            return self.N

    def __iter__(self):
        if self.shuffle:
            self.npz_paths = list(self.npz_paths)
            random.Random(self.seed).shuffle(self.npz_paths)
        for npz_path in self.npz_paths:
            for data in npz_contents(
                npz_path,
                self.clip_length,
                self.overlapping,
                keys=self.data_keys,
                keep=self.keep,
                shuffle=self.shuffle,
                seed=self.seed,
            ):
                self.seed += 1  # increment to vary shuffle over files
                yield {k: self.transform(data[k]) for k in data}

Test I can load some data with this Dataset:

In [None]:
with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    amass = AMASS(tmpdirname, overlapping=False, clip_length=1, transform=torch.tensor)
    for data in amass:
        for k in data:
            print(k, data[k].shape)
            assert type(data[k]) is torch.Tensor
        break
    print(len(amass))

  0%|          | 0/1 [00:00<?, ?it/s]

poses torch.Size([1, 156])
dmpls torch.Size([1, 8])
trans torch.Size([1, 3])
betas torch.Size([1, 16])
gender torch.Size([1])
Inspecting 2 files to determine dataset length, saving the result to /tmp/tmpleow1ugq/npz_file_lens.json.gz


  0%|          | 0/2 [00:00<?, ?it/s]

668


Test it works in a DataLoader to make batches:

In [None]:
with tempfile.TemporaryDirectory() as tmpdirname:
    unpack_body_models("sample_data/", tmpdirname, 8)
    amass = AMASS(tmpdirname, overlapping=False, clip_length=1, transform=torch.tensor)
    amasstrain = tudata.DataLoader(amass, batch_size=4)
    for i, data in enumerate(amasstrain):
        for k in data:
            print(k, data[k].shape)
        assert data["poses"].size(0) == 4, f'{data["poses"].size()}'
        break

  0%|          | 0/1 [00:00<?, ?it/s]

poses torch.Size([4, 1, 156])
dmpls torch.Size([4, 1, 8])
trans torch.Size([4, 1, 3])
betas torch.Size([4, 1, 16])
gender torch.Size([4, 1])


## Multi-process Data Loading

> To work with `num_workers > 0` I'm going to pass a different set of `npz` files to each worker using a `worker_init_fn`.

The following `worker_init_fn` must **always** be used when using `num_workers > 0` or data will be duplicated. To simplify I am providing a DataLoader class that bakes this `worker_init_fn` in when `num_workers > 0`.

In [None]:
# exports
def worker_init_fn(worker_id):
    worker_info = torch.utils.data.get_worker_info()

    # slice up dataset among workers
    dataset = worker_info.dataset
    overall_npz_paths = dataset._npz_paths
    step = int(len(overall_npz_paths) / float(worker_info.num_workers))
    n = len(overall_npz_paths)
    assert n >= worker_info.num_workers, (
        "Every worker must get at least one file:" f" {worker_info.num_workers} > {n}"
    )
    start, stop = 0, n
    for worker_idx, i in enumerate(range(start, stop, step)):
        if worker_idx == worker_info.id:
            worker_slice = slice(i, min(i + step, n + 1))
    dataset.npz_paths = overall_npz_paths[worker_slice]

    # set each workers seed
    dataset.seed = dataset.seed + worker_info.seed

class IterableLoader(tudata.DataLoader):
    def __init__(self, *args, **kwargs):
        kwargs['worker_init_fn'] = worker_init_fn
        super().__init__(*args, **kwargs)

In [None]:
def test_dataloader():
    with tempfile.TemporaryDirectory() as tmpdirname:
        unpack_body_models("sample_data/", tmpdirname, 8)
        amass = AMASS(
            tmpdirname, overlapping=False, clip_length=1, transform=torch.tensor
        )
        amasstrain = tudata.DataLoader(
            amass, batch_size=4, worker_init_fn=worker_init_fn, num_workers=2
        )
        for i, data in enumerate(amasstrain):
            for k in data:
                print(k, data[k].shape)
            assert data["poses"].size(0) == 4, f'{data["poses"].size()}'
            break
        amasstrain = IterableLoader(
            amass, batch_size=4, num_workers=2
        )
        for i, data in enumerate(amasstrain):
            for k in data:
                print(k, data[k].shape)
            assert data["poses"].size(0) == 4, f'{data["poses"].size()}'
            break

test_dataloader()

  0%|          | 0/1 [00:00<?, ?it/s]

poses torch.Size([4, 1, 156])
dmpls torch.Size([4, 1, 8])
trans torch.Size([4, 1, 3])
betas torch.Size([4, 1, 16])
gender torch.Size([4, 1])
poses torch.Size([4, 1, 156])
dmpls torch.Size([4, 1, 8])
trans torch.Size([4, 1, 3])
betas torch.Size([4, 1, 16])
gender torch.Size([4, 1])


In [None]:
def test_runtime(unpacked_dir, batch_size, num_workers):
    amass = AMASS(
        unpacked_dir, overlapping=False, clip_length=1, transform=torch.tensor, seed=0
    )
    amasstrain = DataLoader(
        amass, batch_size=batch_size, worker_init_fn=worker_init_fn, num_workers=num_workers
    )
    start = time.time()
    i = 0
    for data in tqdm(amasstrain):
        i += 1
        if i > 100:
            break
    elapsed = time.time() - start
    total_hours = ((elapsed / i) * len(amasstrain)) / (60 ** 2)
    return elapsed, elapsed / i, total_hours


# test_runtime('/nobackup/gngdb/repos/amass/data', 256, 8)
# test_runtime('/scratch/gobi1/gngdb/amass', 256, 8)

Rough results from testing runtime on full dataset on my workstation:

* Batch size 32:
    * 0: 260ms/batch, 35 hours per epoch
    * 2: 91ms/batch, 12 hours per epoch
    * 4: 58ms/batch, 7 hours 54 minutes per epoch
    * 8: 29ms/batch, 4 hours per epoch
    * 12 (number of cores): 3 hours 45 minutes per epoch
* Batch size 256:
    * 0: 910ms/batch, 16 hours per epoch
    * 2: 816ms/batch, 14 hours per epoch
    * 4: 457ms/batch, 8 hours per epoch
    * 8: 235ms/batch, 4 hours per epoch

## Shuffling

PyTorch DataLoaders don't support shuffling IterableDataset because it's assuming the data is coming in as an IID stream. For this problem, this means the shuffling has to be implemented elsewhere.

There are two parts to shuffle:

* The indexes accessing the arrays in each file
* The list of files to access

The first is easy and can be an option to the iterator over each file. It doesn't affect how each worker operates because no two workers should ever touch the same file.

The second is more difficult because each worker has a different list of files. Also, it's important that the order of the global list of files be random, because some files are larger than others and the randomness is to ensure that each worker has approximately the same number of examples in the files it has received. However, every worker initialises a separate dataset, so each dataset has to have access to the same list of files. I think the best way to ensure this at this point is to use a shared random seed to shuffle the list of files at initialisation.

In [None]:
def test_shuffling(num_workers):
    with tempfile.TemporaryDirectory() as tmpdirname:
        unpack_body_models("sample_data/", tmpdirname, 8)
        amass = AMASS(
            tmpdirname,
            overlapping=False,
            clip_length=1,
            transform=torch.tensor,
            shuffle=True,
            seed=0,
        )
        amasstrain = tudata.DataLoader(
            amass, batch_size=4, worker_init_fn=worker_init_fn, num_workers=num_workers
        )
        for i, data in enumerate(amasstrain):
            _data = data["poses"]
            break
        # second epoch shouldn't produce the same minibatch
        for i, data in enumerate(amasstrain):
            data = data["poses"]
            data, _data = data.numpy(), _data.numpy()
            assert not np.allclose(data, _data)
            break


for num_workers in range(3):
    test_shuffling(num_workers)

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

# Train/test Splits

> Console utility to split directories in a standard way

The original preprocessing script splits directories [the following way](https://github.com/nghorbani/amass/blob/master/src/amass/data/prepare_data.py#L235-L239):

In [None]:
original_amass_splits = {
    'val' : ['HumanEva', 'MPI_HDM05', 'SFU', 'MPI_mosh'],
    'test': ['Transitions_mocap', 'SSM_synced'],
    'train': ['CMU', 'MPI_Limits', 'TotalCapture', 'Eyes_Japan_Dataset', 'KIT', 'BML', 'EKUT', 'TCD_handMocap', 'ACCAD']
}

This doesn't allocated all the datasets composing AMASS to a split:

```
Datasets not allocated to a split: {'BMLmovi', 'BioMotionLab_NTroje', 'DanceDB', 'BMLhandball', 'DFaust_67'}
```

Obtained by evaluating the following cell on an unpacked directory of AMASS data:

In [None]:
def unallocated_splits(unpacked_dir):
    data_path = Path(unpacked_dir)
    all_subdirs = {f.name for f in data_path.iterdir() if f.is_dir()}
    unallocated = all_subdirs - {d for k in amass_splits for d in amass_splits[k]}
    print(f'Datasets not allocated to a split: {unallocated}')
# unallocated_splits('/nobackup/gngdb/repos/amass/data')

I'm going to include all the newer datasets in the training set.

In [None]:
# exports
amass_splits = {
    'val' : ['HumanEva', 'MPI_HDM05', 'SFU', 'MPI_mosh'],
    'test': ['Transitions_mocap', 'SSM_synced'],
    'train': ['CMU', 'MPI_Limits', 'TotalCapture', 'Eyes_Japan_Dataset',
              'KIT', 'BML', 'EKUT', 'TCD_handMocap', 'ACCAD',
              'BMLmovi', 'BioMotionLab_NTroje', 'DanceDB', 'BMLhandball', 'DFaust_67']
}

These functions move the directories into subdirectories that define the splits and undo this change.

In [None]:
#exports
def move_dirs_into_splits(amass_loc, splits, undo=False):
    amass_loc = Path(amass_loc)
    for k in amass_splits:
        split_dir = amass_loc / Path(k)
        if not split_dir.exists():
            os.mkdir(split_dir)
        for d in amass_splits[k]:
            d = Path(d)
            t = split_dir/d
            f = amass_loc/d
            try:
                if undo:
                    os.rename(t, f)
                else:
                    os.rename(f, t)
            except FileNotFoundError:
                warnings.warn(f'Could not find {d} for {k} split')
    
move_dirs_out_of_splits = functools.partial(move_dirs_into_splits, undo=True)

In [None]:
#exports
def console_split_dirs():
    parser = argparse.ArgumentParser(
        description="Split AMASS Dataset subdirs into train/val/test"
    )
    parser.add_argument(
        "amassloc",
        type=str,
        help="Location where AMASS has been unpacked",
    )
    parser.add_argument(
        "--undo",
        action="store_true",
        help="Undo move into subdirectories, put them all back in the root AMASS location",
    )
    args = parser.parse_args()
    move_dirs_into_splits(args.amassloc, amass_splits, undo=args.undo)

## SPL Train/Validation/Test

In [None]:
# spl
import urllib.request

train_url = "https://raw.githubusercontent.com/eth-ait/spl/master/preprocessing/training_fnames.txt"
val_url = "https://raw.githubusercontent.com/eth-ait/spl/master/preprocessing/validation_fnames.txt"
test_url = "https://raw.githubusercontent.com/eth-ait/spl/master/preprocessing/test_fnames.txt"

splits = {}
for url in [train_url, val_url, test_url]:
    split = url.split("/")[-1].split("_")[0]
    with urllib.request.urlopen(url) as f:
        file_list = f.read().decode('UTF-8').split("\n")
    splits[split] = file_list

for split, file_list in splits.items():
    print(split, " = ", set(f.split("/")[0] for f in file_list))

training  =  {'', 'CMU', 'Transition', 'ACCAD', 'SSM', 'BioMotion', 'JointLimit', 'HEva', 'MIXAMO', 'HDM05', 'CMU_Kitchen', 'Eyes'}
validation  =  {'', 'CMU', 'Transition', 'ACCAD', 'SSM', 'BioMotion', 'JointLimit', 'MIXAMO', 'HDM05', 'Eyes'}
test  =  {'', 'CMU', 'Transition', 'ACCAD', 'BioMotion', 'JointLimit', 'HEva', 'MIXAMO', 'HDM05', 'CMU_Kitchen', 'Eyes'}


In [None]:
# exports
spl_splits = dict(
    training  =  {'CMU_Kitchen', 'Eyes', 'HEva', '', 'MIXAMO', 'Transition', 'CMU', 'SSM', 'BioMotion', 'JointLimit', 'ACCAD', 'HDM05'},
    validation  =  {'Eyes', '', 'MIXAMO', 'Transition', 'CMU', 'SSM', 'BioMotion', 'JointLimit', 'ACCAD', 'HDM05'},
    test  =  {'CMU_Kitchen', 'Eyes', 'HEva', '', 'MIXAMO', 'Transition', 'CMU', 'BioMotion', 'JointLimit', 'ACCAD', 'HDM05'}
)

In [None]:
# vws
# iterate over every subdirectory in AMASS and open the json files containing information on file sizes
amass_loc = "/nobackup/gngdb/repos/amass/data/"
metadata = {}
for d in Path(amass_loc).iterdir():
    if d.is_dir():
        npz_len_file = d / "npz_file_lens.json.gz"
        with gzip.open(npz_len_file) as f:
            metadata[d.stem] = json.load(f)

In [None]:
# vws
for splits in [['train_subset', 'val_subset'], ['val', 'test', 'train']]:
    print(splits)
    total = sum(v for k in splits for n, h, v in metadata[k])
    for k in splits:
        subtotal = sum(v for n, h, v in metadata[k])
        print("  ", k, f"{subtotal}/{total} = {100.*subtotal/total:.2f}%")

['train_subset', 'val_subset']
   train_subset 41220/102240 = 40.32%
   val_subset 61020/102240 = 59.68%
['val', 'test', 'train']
   val 1320126/19775902 = 6.68%
   test 117679/19775902 = 0.60%
   train 18338097/19775902 = 92.73%


In [None]:
# hide
from nbdev.export import notebook2script

notebook2script()

Converted 00_core.ipynb.
Converted 01_tqdm.ipynb.
Converted 02_features.ipynb.
Converted 03_transforms.ipynb.
Converted 05_losses.ipynb.
Converted 06_dip.ipynb.
Converted index.ipynb.
