## Loading data from HF Data to S3

### Initialize the settings
1. Dataset name from huggingface
2. Output S3 path

In [8]:
import sys 
sys.path.insert(0, '/home/ray/default')

import os
from util.utils import (
    generate_output_path
)

In [65]:
## https://huggingface.co/datasets/DBQ/Burberry.Product.prices.United.States?row=0
HF_DATA = "DBQ/Burberry.Product.prices.United.States"

# Output path to write output result. You can also change this to any cloud storage path,
# e.g. a specific S3 bucket.
output_path = generate_output_path(
    # `ANYSCALE_ARTIFACT_STORAGE` is the URI to the pre-generated folder for storing
    # your artifacts while keeping them separate them from Anyscale-generated ones.
    # See: https://docs.anyscale.com/workspaces/storage#object-storage-s3-or-gcs-buckets
    os.environ.get("ANYSCALE_ARTIFACT_STORAGE"),
    "BurberryData",
)
IMG_PATH = output_path + "/images"
DATA_PATH = output_path + "/data"

IMG_PATH_TEST = "/home/ray/default/data/images"


### Read and Process Data

In [73]:
from datasets import load_dataset
dataset = load_dataset(HF_DATA)

In [74]:

import ray.data
from datasets import load_dataset
ray_ds = ray.data.from_huggingface(dataset["train"])

In [75]:
ray_ds

MaterializedDataset(
   num_blocks=1,
   num_rows=3038,
   schema={
      website_name: string,
      competence_date: string,
      country_code: string,
      currency_code: string,
      brand: string,
      category1_code: string,
      category2_code: string,
      category3_code: string,
      product_code: int64,
      title: string,
      itemurl: string,
      imageurl: string,
      full_price: double,
      price: double,
      full_price_eur: double,
      price_eur: double,
      flg_discount: int64
   }
)

In [78]:
## https://docs.ray.io/en/latest/data/transforming-data.html
import os
from typing import Any, Dict
import ray

def validate_url(row: Dict[str, any]) -> bool:
    return row["imageurl"].startswith("https://")

def parse_image(row: Dict[str, Any]) -> Dict[str, Any]:
    from PIL import Image
    import numpy as np
    import requests
    row["image"] = np.array(Image.open(requests.get(row["imageurl"], stream=True).raw))
    return row

ray_ds_images = (
    ray_ds
    .filter(validate_url)
    .map(parse_image)
)

In [79]:
from ray.data.datasource import FilenameProvider

class ImageFilenameProvider(FilenameProvider):
    def __init__(self, file_format: str):
        self.file_format = file_format

    def get_filename_for_row(self, row, task_index, block_index, row_index):
        return row['imageurl'].split("/")[-1] + '.png'

In [80]:
ray_ds_images.write_images(
    path = IMG_PATH, 
    column="image",
    filename_provider=ImageFilenameProvider("png"),
    try_create_dir=False
)

2024-09-21 22:05:51,023	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-21_20-17-17_494561_3381/logs/ray-data
2024-09-21 22:05:51,024	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(validate_url)->Map(parse_image)->Write]


- Filter(validate_url)->Map(parse_image)->Write 1: 0 bundle [00:00, ? bundle/s]

Running 0: 0 bundle [00:00, ? bundle/s]

In [83]:
ray_ds\
    .filter(validate_url)\
    .write_parquet(
        path=DATA_PATH,
        try_create_dir=False
    )

2024-09-21 22:17:47,829	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-21_20-17-17_494561_3381/logs/ray-data
2024-09-21 22:17:47,829	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(validate_url)->Write]


- Filter(validate_url)->Write 1: 0 bundle [00:00, ? bundle/s]

Running 0: 0 bundle [00:00, ? bundle/s]

In [94]:
img_test = ray.data.read_images(IMG_PATH).limit(10)