# Sharding

This is a small Jupyter notebook illustrating how to take a dataset in the cloud (or on disk) and turn it into a set of sharded tar archives.

The process consists of two steps:

- get a file listing of the original dataset and group it by related files
- output one "recipe" for each shard that specifies where files come from and how they are stored in each .tar archive
- apply `tarp create` to these recipes to create the actual shards

The process of creating the "recipes" is serial but quick. The `tarp create` jobs can be run in parallel.

In [1]:
import os
import os.path
from itertools import groupby
import re
import sys

In [2]:
def ident(s):
    "Reduce a pathname to the video identifier."
    s = os.path.basename(s)
    s = re.sub(r'[.].*$', '', s)
    return s

def chunks(l, n):
    "Return an iterator over chunks of size n from the original iterator l."
    result = []
    for x in l:
        if len(result) >= n:
            yield result
            result = []
        result.append(x)
    if result != []:
        yield result

def flatten(l):
    "Flatten an iterator of iterators into a list."
    return [x for s in l for x in s]

In [3]:
# get a list of file names (we're just using aa* to keep the demo short)

samples = sorted(
    [s.strip() for s in os.popen("gsutil ls gs://lpr-yt8m/aa*").readlines()]
)

In [4]:
# group into lists of samples with the same basename
samples = [list(l[1]) for l in groupby(samples, ident)]

# only keep samples that contain a ".mp4"
samples = [l for l in samples if any(s.endswith(".mp4") for s in l)]

# chunk into groups of 100 samples
shards = chunks(samples, 100)

In [5]:
!mkdir -p shards

In [6]:
# write into separate shard files

for index, shard in enumerate(shards):
    print(f"shard {index}")
    shard = flatten(shard)
    with open(f"shards/shard-{index:06d}.txt", "w") as stream:
        for fname in shard:
            base = os.path.basename(fname)
            print(f"{base}\tpipe:curl https://storage.googleapis.com/lpr-yt8m/{base}", file=stream)

shard 0
shard 1
shard 2
shard 3
shard 4
shard 5
shard 6
shard 7
shard 8


In [7]:
!ls shards

shard-000000.tar  shard-000002.txt  shard-000005.txt  shard-000008.txt
shard-000000.txt  shard-000003.txt  shard-000006.txt
shard-000001.txt  shard-000004.txt  shard-000007.txt


# Creating the Shards

Each text file now contains an output file name (to be placed into the tar file) and a source file. 

The source file can either be a path, "text:something", in which case, "something" is literally placed into the tar file under the given path, or "pipe:something", in which case "something" is executed as a shell script and its stdout captured and stored in the output tar file.

In [8]:
!head shards/shard-000000.txt

aa-4wUIa9zE.annotations.xml	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.annotations.xml
aa-4wUIa9zE.description	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.description
aa-4wUIa9zE.dllog	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.dllog
aa-4wUIa9zE.info.json	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.info.json
aa-4wUIa9zE.mp4	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.mp4
aa-Dmn0MZzI.annotations.xml	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.annotations.xml
aa-Dmn0MZzI.description	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.description
aa-Dmn0MZzI.dllog	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.dllog
aa-Dmn0MZzI.info.json	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.info.json
aa-Dmn0MZzI.mp4	pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.mp4


In [9]:
!tarp create --count=20 shards/shard-000000.txt -o shards/shard-000000.tar

[info] 0 aa-4wUIa9zE.annotations.xml <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.annotations.xml
[info] 1 aa-4wUIa9zE.description <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.description
[info] 2 aa-4wUIa9zE.dllog <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.dllog
[info] 3 aa-4wUIa9zE.info.json <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.info.json
[info] 4 aa-4wUIa9zE.mp4 <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-4wUIa9zE.mp4
[info] 5 aa-Dmn0MZzI.annotations.xml <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.annotations.xml
[info] 6 aa-Dmn0MZzI.description <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.description
[info] 7 aa-Dmn0MZzI.dllog <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.dllog
[info] 8 aa-Dmn0MZzI.info.json <- pipe:curl https://storage.googleapis.com/lpr-yt8m/aa-Dmn0MZzI.info.json
[info] 9 aa-Dmn0MZzI.mp4 <- pipe:curl http

# Sharding in Parallel

If you want to shard in parallel, you probably want to run many of these jobs in parallel, and you don't what to store data locally. You can store both the source .txt files and the output .tar files in the cloud and use a command like this:

```Bash
gsutil cat gs://bucket/shard-$shard.txt |
tarp create - -o - |
gsutil cp - gs://bucket/shard-$shard.tar
```