In [None]:
%pylab inline

In [None]:
import webdataset as wds
import glob
import os
import re
import msgpack as mp
import tempfile
from itertools import islice
import sys

os.environ["WDS_CACHE"] = "/work/cache"
os.environ["WDS_VERBOSE_CACHE"] = "1"
os.environ["GOPEN_VERBOSE"] = "1"

In [None]:
def rename_files(fname):
    prefix, suffix = fname.rsplit(".", 1)
    prefix = re.sub("[.]", ",", prefix)
    return prefix + "." + suffix

ds = wds.WebDataset("pipe:gsutil cp gs://ocro-arxiv/pdfs/arxiv-pdfs-{000000..001038}.tar /tmp/$$.tar && cat /tmp/$$.tar && rm -f /tmp/$$.tar", rename_files=rename_files)
ds = wds.WebDataset("gs://ocro-arxiv/pdfs/arxiv-pdfs-{000000..001038}.tar", rename_files=rename_files)
pdfsample = next(iter(ds))

In [None]:
for k, v in pdfsample.items():
    print(k, repr(v)[:50])

In [None]:
class ShellError(Exception):
    def __init__(self, status, *args):
        super().__init__(*args)
        self.status = status

def run(x):
    # print("#", x, file=sys.stderr)
    status = os.system(x)
    if status != 0:
        raise ShellError(status)

In [None]:
def read_bin(fname):
    with open(fname, "rb") as stream:
        return stream.read()

def expand_pdf(data, prefix):
    tdir = "temp"
    dpi = 300

    with tempfile.TemporaryDirectory() as tdir:
        with open(f"{tdir}/doc.pdf", "wb") as stream:
            stream.write(data)

        run(f"cd {tdir} && pdftk doc.pdf burst")

        pages = sorted(glob.glob(tdir+"/pg_????.pdf"))

        for pg in pages:
            base = pg[:-4]
            assert os.system(f"pdftoppm -r {dpi} -jpeg {pg} -singlefile -jpegopt quality=95 -o {base}") == 0

        for pg in pages:
            base = pg[:-4]
            run(f"pdftoppm -r {dpi} -jpeg {pg} -singlefile -jpegopt quality=100 -o {base}")    

        for pg in pages:
            base = pg[:-4]
            run(f"pdftotext {pg}")

        for pg in pages:
            base = pg[:-4]
            run(f"pdftotree {pg} > {base}.temp && mv {base}.temp {base}.hocr")

        for pageno, hocr in enumerate(sorted(glob.glob(tdir+"/*.hocr"))):
            base = hocr[:-5]
            sample = dict(
                __key__=f"{prefix}/{pageno}",
                hocr=read_bin(hocr),
                txt=read_bin(base+".txt"),
                pdf=read_bin(base+".pdf"),
                jpg=read_bin(base+".jpg")
            )
            yield sample

result = list(expand_pdf(pdfsample["pdf"], pdfsample["__key__"]))

In [None]:
if True:
    for k, v in result[0].items():
        print(k, repr(v)[:50])
    print(result[0]["hocr"].decode("utf-8"))
    print(result[0]["txt"].decode("utf-8"))
    import io
    from imageio import imread
    image = imread(io.BytesIO(result[0]["jpg"]))
    imshow(image)

In [None]:
def map_expand_pdfs(ds):
    for pdfsample in ds:
        key = pdfsample["__key__"]
        print("***", key)
        try:
            for sample in expand_pdf(pdfsample["pdf"], pdfsample["__key__"]):
                yield sample
        except ShellError as exn:
            print(f"{key}: shell error {exn.status}")

In [None]:
%%writefile _
expanded = ds.compose(map_expand_pdfs)
dl = wds.WebLoader(expanded, num_workers=8, batch_size=None).shuffle(5000)
destination = "gs://ocro-arxiv/hocr"
def upload(fname):
    run(f"sync; sleep 1; gsutil --quiet -m cp {fname} {destination}/{fname}")
    os.unlink(fname)
sink = wds.ShardWriter("arxiv-hocr-%06d.tar", maxcount=300, maxsize=500e6, post=upload, encoder=None)
for sample in dl:
    sink.write(sample)

In [None]:
def gsexists(gspath):
    assert gspath.startswith("gs://")
    return os.system(f"gsutil ls {gspath} > /dev/null") == 0

def upload(fname, gspath):
    run(f"sync; sleep 1; gsutil --quiet -m cp {fname} {gspath}")
    os.unlink(fname)

def process_shard(shardno):
    fname = f"arxiv-hocr-{shardno:06d}.tar"
    destination = "gs://ocro-arxiv/hocr"
    gspath = f"{destination}/{fname}"
    if gsexists(gspath):
        print(f"EXISTS: {gspath}")
        return
    sname = f"gs://ocro-arxiv/pdfs/arxiv-pdfs-{shardno:06d}.tar"
    print("converting", sname, "->", gspath)
    ds = wds.WebDataset(sname, rename_files=rename_files)
    expanded = ds.compose(map_expand_pdfs).shuffle(1000)
    sink = wds.TarWriter(fname, encoder=None)
    for sample in ds:
        sink.write(sample)
    sink.close()
    upload(fname, gspath)

process_shard(0)

In [None]:
import ray
if not ray.is_initialized():
    ray.init(num_cpus=16)

In [None]:
results = [ray.remote(num_cpus=2)(process_shard).remote(i) for i in range(0, 1039)]
results2 = ray.get(results)