In [1]:
import torchdata.datapipes as dp
from torchvision import transforms
from torch.utils.data.datapipes.utils.decoder import imagehandler
from sklearn.model_selection import train_test_split
from pathlib import Path

# Demo use Case 1

This demo is to demonstrate a common use case:

* download csv
* split train val test
* download images based on csv column in their respective train/val/test folders

In [2]:
csv_url = 's3://ml-ops-datasets/sdk_data_demo/dataset_demo.csv'

In [3]:
def split_data_from_csv(csv_url, root_dir):

    def save_to_disk(rows): print(rows)

    csv_rows = list(dp.iter.IoPathFileOpener([csv_url], mode='rt').parse_csv())
    col_names = csv_rows.pop(0)

    train_csv_rows, test_csv_rows = train_test_split(csv_rows)

    datapipe = dp.iter.IterableWrapper(train_csv_rows).map(save_to_disk)

    return datapipe

    # x_dp, y_dp, _ = source_dp.unzip(sequence_length=3)

    # x_dp = (
    #     x_dp
    #     .open_file_by_iopath(mode="rb")
    #     .routed_decode(imagehandler('pil'))
    # )

    # out_dp = x_dp.zip(y_dp).map(collate_sample)
    

In [4]:
next(iter(split_data_from_csv(csv_url, 'tmp')))

['s3://ml-ops-datasets/MPE/surgeon/crops_per_group/train/nut/good/1628617593512_left_RT_nut_3.jpg', 'good', 'train']


In [5]:
class DataFromCSV():
    def __init__(self, csv_url):
        self.csv_rows = list(dp.iter.IoPathFileOpener([csv_url], mode='rt').parse_csv())
        self.col_names = self.csv_rows.pop(0)

    def __iter__():
        return None

datapipe = DataFromCSV(csv_url)

## Fixing s3 bucket stuff

In [39]:
import os
import iopath
from torchdata.datapipes.iter import IterDataPipe
from torch.utils.data.datapipes.utils.common import match_masks

def _create_default_pathmanager():
    from iopath.common.file_io import HTTPURLHandler, OneDrivePathHandler, PathManager

    pathmgr = PathManager()
    pathmgr.register_handler(HTTPURLHandler(), allow_override=True)
    pathmgr.register_handler(OneDrivePathHandler(), allow_override=True)
    # S3PathHandler is not included in 0.1.8
    try:
        from iopath.common.s3 import S3PathHandler

        pathmgr.register_handler(S3PathHandler(), allow_override=True)
    except ImportError:
        pass
    return pathmgr

class IoPathFileListerIterDataPipe(IterDataPipe[str]):

    def __init__(
        self,
        root: str,
        masks = "",
        *,
        pathmgr=None,
    ) -> None:
        if iopath is None:
            raise ModuleNotFoundError(
                "Package `iopath` is required to be installed to use this datapipe."
                "Please use `pip install iopath` or `conda install -c conda-forge iopath`"
                "to install the package"
            )

        self.bucket = '/'.join(S3URL.split('/')[:3]) + '/'
        self.root: str = root
        self.pathmgr = _create_default_pathmanager() if pathmgr is None else pathmgr
        self.masks = masks

    def register_handler(self, handler, allow_override=False):
        self.pathmgr.register_handler(handler, allow_override=allow_override)

    def __iter__(self):
        if self.pathmgr.isfile(self.root):
            yield self.root
        else:
            for file_name in self.pathmgr.ls(self.root):
                if match_masks(file_name, self.masks):
                    yield os.path.join(self.bucket, file_name)

In [45]:
S3URL = 's3://ml-ops-datasets/MPE/surgeon/crops_per_group/train/nut/'

sub_pipes = []
for sub_folder in IoPathFileListerIterDataPipe(root=S3URL):
    sub_pipes.append(IoPathFileListerIterDataPipe(root=sub_folder))

# list(sub_pipes[0])
pipeline = (
    dp.iter.Concater(*sub_pipes)
    .open_file_by_iopath(mode="rb")
    .routed_decode(imagehandler('pil'))
)



In [46]:
next(iter(pipeline))

('s3://ml-ops-datasets/MPE/surgeon/crops_per_group/train/nut/bad/1628597690568_left_RT_nut_1.jpg',
 <PIL.Image.Image image mode=RGB size=55x58 at 0x7FB653A81700>)

# Demo use Case 2

* download image folder
* split image folder train val test with directory hierarchy preserved

In [54]:
def collate_sample(data):
    path, image = data
    label_str = os.path.split(os.path.dirname(path))[1]
    return {"path": path, "image": image, "label": label_str}

In [55]:
S3URL = 's3://ml-ops-datasets/MPE/surgeon/crops_per_group/train/nut/'

sub_pipes = []
for sub_folder in IoPathFileListerIterDataPipe(root=S3URL):
    sub_pipes.append(IoPathFileListerIterDataPipe(root=sub_folder))

pipeline = (
    dp.iter.Concater(*sub_pipes)
    .open_file_by_iopath(mode="rb")
    .routed_decode(imagehandler('pil'))
    .map(collate_sample)
)


In [56]:
next(iter(pipeline))

{'path': 's3://ml-ops-datasets/MPE/surgeon/crops_per_group/train/nut/bad/1628597690568_left_RT_nut_1.jpg',
 'image': <PIL.Image.Image image mode=RGB size=55x58 at 0x7FB653094FA0>,
 'label': 'bad'}

# Demo use Case 3

In [18]:
csv_url = 's3://ml-ops-datasets/sdk_data_demo/dataset_demo.csv'

In [20]:
def collate_sample(data):
    (path, image), label = data
    return {"image": image, "label": int(label == 'good')}

def datapipe_from_csv(csv_url, split = 'train'):
    csv_rows = list(dp.iter.IoPathFileOpener([csv_url], mode='rt').parse_csv())
    col_names = csv_rows.pop(0)

    def _filter(x): return x[2] == split
    source_dp = (
        dp.iter.IterableWrapper(csv_rows)
        .filter(_filter)
    )

    x_dp, y_dp, _ = source_dp.unzip(sequence_length=3)

    x_dp = (
        x_dp
        .open_file_by_iopath(mode="rb")
        .routed_decode(imagehandler('pil'))
    )

    out_dp = x_dp.zip(y_dp).map(collate_sample)

    return out_dp

In [21]:
next(iter(datapipe_from_csv(csv_url, 'test')))

{'image': <PIL.Image.Image image mode=RGB size=69x71 at 0x7FA47E0D6760>,
 'label': 0}