In [None]:
import xmlhocr as xh
import webdataset as wds
import iautils
import io
from itertools import islice
import sys
import tempfile
import os

os.environ["RAY_DEDUP_LOGS"] = "0"

In [None]:
exn = Exception

def get_samples_full(url):
    ds = wds.WebDataset(url).decode()
    try:
        for sample in ds:
            key = sample["__key__"]
            print(f"key={key}", sample.keys())
            identifier = bytes.fromhex(key).decode("utf-8")

            try:
                pages = list(xh.convert_abbyy_to_serial(sample["abbyy.gz"]))
            except exn as e:
                print("failed to parse abbyy", repr(e))
                continue
            try:
                images = iautils.Jp2Zip(io.BytesIO(sample["zip"]))
            except exn as e:
                print("failed to load zip", repr(e))
                continue

            if set(images.pages.keys()) == set(range(len(pages))):
                print("zero-based page numbers in images")
            elif set(images.pages.keys()) == set(range(1, len(pages)+1)):
                print("one-based page numbers in images")
            else:
                print("page number mismatch", len(pages), len(images))
                continue

            page_offset = 0 if 0 in images else 1

            for i in range(len(pages)):
                pagekey = f"{identifier}/{i:05d}"
                print(pagekey, file=sys.stderr)
                image = images.decode(i+page_offset)
                sample = {
                    "__key__": pagekey,
                    "jpg": image,
                    "words.json": pages[i],
                }
                yield sample
    except Exception as e:
        print("exception", repr(e))
        raise e

In [None]:
exn = Exception

def get_samples(url):
    ds = wds.WebDataset(url).decode()
    try:
        for sample in ds:
            key = sample["__key__"]
            identifier = bytes.fromhex(key).decode("utf-8")
            print(identifier, key)

            try:
                pages = list(xh.convert_abbyy_to_serial(sample["abbyy.gz"]))
            except exn as e:
                print("failed to parse abbyy", repr(e))
                continue

            for i in range(len(pages)):
                pagekey = f"{key}/{i:05d}"
                # print(pagekey, file=sys.stderr)
                sample = {
                    "__key__": pagekey,
                    "lin.json": pages[i],
                }
                yield sample
    except Exception as e:
        print("exception", repr(e))
        raise e


In [None]:
if False:
    for sample in islice(get_samples("gs://ocro-iaa/books/books-000000.tar"), 30, 40):
        break
    sample

In [None]:
!gsutil ls gs://ocro-iaa/books/ | shardsum

In [None]:
def convert_books_to_lin(srcurl, dsturl):
    print("=== converting", srcurl, "-->", dsturl, file=sys.stderr)
    with tempfile.NamedTemporaryFile() as f:
        dst = wds.TarWriter(f.name)
        for sample in get_samples(srcurl):
            dst.write(sample)
        dst.close()
        f.flush()
        assert os.system(f"gsutil cp {f.name} {dsturl}") == 0


In [None]:
# convert_books_to_lin("gs://ocro-iaa/books/books-000001.tar", "gs://ocro-iaa/lin/lin-000001.tar")

In [None]:
import ray

if not ray.is_initialized():
    ray.init()

@ray.remote(num_cpus=4, memory=int(8e9))
def process_shard(shard):
    srcurl = f"gs://ocro-iaa/books/books-{shard:06d}.tar"
    dsturl = f"gs://ocro-iaa/lin/lin-{shard:06d}.tar"
    try:
        convert_books_to_lin(srcurl, dsturl)
    except Exception as e:
        return (shard, repr(e))
    return (shard, True)

# List of filenames to process
fnames = list(range(0, 704))

# Submit tasks to Ray in parallel
results = [process_shard.remote(fname) for fname in fnames]

# Wait for all tasks to complete and retrieve results
results = ray.get(results)