# Ray Data on Docker Compose
Streaming GPU inference with Ray Data — built for GPU saturation and heterogeneous scheduling.

## Setup

Start the Ray stack and launch Jupyter:

```bash
# 1. Build images
docker compose build

# 2. Start MinIO + Ray + App
docker compose up -d minio minio-setup ray-head app

# 3. Upload sample data
./scripts/upload-data.sh

# 4. Launch Jupyter Lab
docker compose exec app jupyter lab --ip 0.0.0.0 --port 8888 --allow-root --no-browser --notebook-dir=/app/notebook
```

Then open http://localhost:8888 in your browser.

## What is Ray Data?

Ray Data is a streaming data framework designed for **GPU-heavy ML workloads**. Key concepts:

- **Datasets** — distributed, streaming collections of Arrow-backed rows
- **map_batches** — the core operation: apply a function to batches of data
- **ActorPoolStrategy** — persistent GPU workers with model loaded once per actor
- **Streaming execution** — bounded memory, backpressure-aware
- **Heterogeneous scheduling** — CPU preprocessing → GPU inference seamlessly

## Architecture

```
Client (app) → Ray Head (GPU execution) → MinIO (S3 storage)
```

The app connects to the Ray cluster as a client. Ray schedules tasks on the head node (or workers). Data reads/writes go through MinIO.

In [1]:
import os

import ray

# Initialize Ray with runtime environment that ensures AWS env vars are set
ray.init(
    "ray://ray-head:10001",
    runtime_env={
        "env_vars": {
            "AWS_ENDPOINT_URL": "http://minio:9000",
            "AWS_ACCESS_KEY_ID": "minioadmin",
            "AWS_SECRET_ACCESS_KEY": "minioadmin",
            "AWS_DEFAULT_REGION": "us-east-1",
            "AWS_REGION": "us-east-1",
        }
    },
)

resources = ray.cluster_resources()
print(f"Cluster resources:")
for k, v in sorted(resources.items()):
    print(f"  {k}: {v}")

print("\n✓ Ray initialized with MinIO configuration")

  from .autonotebook import tqdm as notebook_tqdm
2026-02-06 22:45:27,406	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-02-06 22:45:27,421	INFO client_builder.py:241 -- Passing the following kwargs to ray.init() on the server: log_to_driver


Cluster resources:
  CPU: 8.0
  GPU: 1.0
  accelerator_type:G: 1.0
  memory: 21260797952.0
  node:172.18.0.5: 1.0
  node:__internal_head__: 1.0
  object_store_memory: 4000000000.0

✓ Ray initialized with MinIO configuration


## Read Tabular Data

In [2]:
# NOTE: Ray client mode doesn't support passing PyArrow filesystem objects directly
# Instead, we use Ray remote tasks that create the filesystem on Ray workers
# This avoids serialization issues while still accessing MinIO

print("✓ Ready to read/write data from MinIO via Ray tasks")

✓ Ready to read/write data from MinIO via Ray tasks


In [3]:
# Read and display taxi data via Ray task
@ray.remote
def read_and_show_taxi_data():
    import pyarrow.parquet as pq
    import pyarrow.fs as pafs

    fs = pafs.S3FileSystem(
        endpoint_override="minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        scheme="http",
        region="us-east-1",
    )

    # List only parquet files (exclude CSV and other formats)
    file_info = fs.get_file_info(pafs.FileSelector("lake/taxi/", recursive=False))
    parquet_files = [
        f.path for f in file_info if f.is_file and f.path.endswith(".parquet")
    ]

    if not parquet_files:
        raise ValueError("No parquet files found in lake/taxi/")

    # Read all parquet files
    table = pq.read_table(parquet_files, filesystem=fs)

    # Return summary info
    return {
        "schema": str(table.schema),
        "count": len(table),
        "sample": table.slice(0, 5).to_pylist(),
    }


# Execute and display
result_ref = read_and_show_taxi_data.remote()
result = ray.get(result_ref)

print(f"Schema:\n{result['schema']}\n")
print(f"Count: {result['count']:,}\n")
print("Sample rows:")
for i, row in enumerate(result["sample"], 1):
    print(f"{i}. {row}")

# Store table reference for transformations
# For map_batches demo, we'll create dataset on-cluster
print("\n✓ Taxi data loaded successfully")

Schema:
VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

Count: 2,964,624

Sample rows:
1. {'VendorID': 2, 'tpep_pickup_datetime': datetime.datetime(2024, 1, 1, 0, 57, 55), 'tpep_dropoff_datetime': datetime.datetime(2024, 1, 1, 1, 17, 43), 'passenger_count': 1, 'trip_distance': 1.72, 'RatecodeID': 1, 'store_and_fwd_flag': 'N', 'PULocationID': 186, 'DOLocationID': 79, 'payment_type': 2, 'fare_amount': 17.7, 'extra': 1.0, 'mta_tax': 0.5, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'improvement_surcharge': 1.0, 'total_amount': 22.7, 'congestion_surcharge': 2.5, 'Airport_fee': 0.0}
2. {'VendorID': 1, 'tpep_pickup_

## Basic Transformations

`map_batches` applies a function to each batch. For CPU transforms, no special config needed.

In [4]:
# Transformations demo - run on Ray cluster
@ray.remote
def transform_taxi_data():
    import pyarrow.parquet as pq
    import pyarrow.fs as pafs
    import pyarrow.compute as pc

    fs = pafs.S3FileSystem(
        endpoint_override="minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        scheme="http",
        region="us-east-1",
    )

    # Read parquet files
    file_info = fs.get_file_info(pafs.FileSelector("lake/taxi/", recursive=False))
    parquet_files = [
        f.path for f in file_info if f.is_file and f.path.endswith(".parquet")
    ]
    table = pq.read_table(parquet_files, filesystem=fs)

    # Add tip percentage column using PyArrow compute
    fare = table.column("fare_amount")
    tip = table.column("tip_amount")

    # Compute tip_pct: (tip / fare * 100) where fare > 0, else 0
    tip_pct = pc.if_else(
        pc.greater(fare, 0), pc.multiply(pc.divide(tip, fare), 100), 0.0
    )

    # Add column to table
    table = table.append_column("tip_pct", tip_pct)

    # Return sample with selected columns
    sample = table.select(["fare_amount", "tip_amount", "tip_pct"]).slice(0, 10)
    return sample.to_pylist()


result_ref = transform_taxi_data.remote()
transformed_data = ray.get(result_ref)

print("Transformed data with tip percentage:")
for i, row in enumerate(transformed_data, 1):
    print(
        f"{i}. Fare: ${row['fare_amount']:.2f}, Tip: ${row['tip_amount']:.2f}, Tip %: {row['tip_pct']:.1f}%"
    )

Transformed data with tip percentage:
1. Fare: $17.70, Tip: $0.00, Tip %: 0.0%
2. Fare: $10.00, Tip: $3.75, Tip %: 37.5%
3. Fare: $23.30, Tip: $3.00, Tip %: 12.9%
4. Fare: $10.00, Tip: $2.00, Tip %: 20.0%
5. Fare: $7.90, Tip: $3.20, Tip %: 40.5%
6. Fare: $29.60, Tip: $6.90, Tip %: 23.3%
7. Fare: $45.70, Tip: $10.00, Tip %: 21.9%
8. Fare: $25.40, Tip: $0.00, Tip %: 0.0%
9. Fare: $31.00, Tip: $0.00, Tip %: 0.0%
10. Fare: $3.00, Tip: $0.00, Tip %: 0.0%


## Read Images

In [5]:
# Read images via Ray task
@ray.remote
def list_and_read_images():
    import pyarrow.fs as pafs
    import numpy as np
    from PIL import Image
    import io

    fs = pafs.S3FileSystem(
        endpoint_override="minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        scheme="http",
        region="us-east-1",
    )

    # List all image files
    file_info = fs.get_file_info(pafs.FileSelector("bucket/images/", recursive=True))
    image_files = [
        f.path
        for f in file_info
        if f.is_file and f.path.endswith((".jpg", ".jpeg", ".png"))
    ]

    # Read images (limit for demo)
    images_data = []
    for path in image_files[:100]:
        try:
            with fs.open_input_file(path) as f:
                img_bytes = f.read()
                img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
                img_array = np.array(img)
                images_data.append(
                    {"image": img_array, "path": path, "shape": img_array.shape}
                )
        except Exception as e:
            print(f"Error reading {path}: {e}")

    return images_data


# Get images
images_data_ref = list_and_read_images.remote()
images_data = ray.get(images_data_ref)

print(f"Image count: {len(images_data)}")
print("\nSample images:")
for i, img_info in enumerate(images_data[:2], 1):
    print(f"{i}. Path: {img_info['path']}, Shape: {img_info['shape']}")

print(f"\n✓ Loaded {len(images_data)} images for GPU inference")

Image count: 100

Sample images:
1. Path: bucket/images/food_00000.jpg, Shape: (512, 384, 3)
2. Path: bucket/images/food_00001.jpg, Shape: (512, 512, 3)

✓ Loaded 100 images for GPU inference


## GPU Inference with ActorPoolStrategy

The `ImageClassifier` loads ResNet-50 **once per actor** and reuses it across batches.
This avoids the cost of loading a model for every batch.

In [6]:
# GPU Inference with Ray remote task
@ray.remote(num_gpus=1)
def classify_images_on_gpu(images_data):
    """Run image classification on GPU using ResNet-50."""
    import torch
    from torchvision.models import ResNet50_Weights, resnet50
    import numpy as np

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[GPU Inference] Loading ResNet-50 on {device}")

    weights = ResNet50_Weights.DEFAULT
    model = resnet50(weights=weights).to(device).eval()
    preprocess = weights.transforms()
    categories = weights.meta["categories"]

    print(f"[GPU Inference] Processing {len(images_data)} images...")

    predictions = []
    batch_size = 32

    for i in range(0, len(images_data), batch_size):
        batch = images_data[i : i + batch_size]

        # Preprocess batch
        tensors = torch.stack(
            [
                preprocess(torch.from_numpy(img_data["image"]).permute(2, 0, 1))
                for img_data in batch
            ]
        ).to(device)

        # Run inference
        with torch.no_grad():
            logits = model(tensors)

        # Get predictions
        top_idx = logits.argmax(dim=1).cpu().numpy()
        confidences = logits.softmax(dim=1).max(dim=1).values.cpu().numpy()

        for j, img_data in enumerate(batch):
            predictions.append(
                {
                    "path": img_data["path"],
                    "prediction": categories[top_idx[j]],
                    "confidence": float(confidences[j]),
                }
            )

    return predictions


# Run GPU inference
print("Starting GPU inference...")
predictions_ref = classify_images_on_gpu.remote(images_data)
predictions = ray.get(predictions_ref)

print(f"\n✓ Classified {len(predictions)} images")

Starting GPU inference...




[36m(classify_images_on_gpu pid=3079)[0m [GPU Inference] Loading ResNet-50 on cuda
[36m(classify_images_on_gpu pid=3079)[0m Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]
  2%|▏         | 2.12M/97.8M [00:00<00:04, 22.1MB/s]
  7%|▋         | 6.38M/97.8M [00:00<00:02, 35.3MB/s]
 13%|█▎        | 13.1M/97.8M [00:00<00:01, 51.3MB/s]
 19%|█▊        | 18.2M/97.8M [00:00<00:01, 52.1MB/s]
 24%|██▍       | 23.4M/97.8M [00:00<00:01, 52.6MB/s]
 29%|██▉       | 28.6M/97.8M [00:00<00:01, 53.0MB/s]
 35%|███▍      | 33.8M/97.8M [00:00<00:01, 53.1MB/s]
 40%|███▉      | 38.9M/97.8M [00:00<00:01, 53.2MB/s]
 45%|████▍     | 44.0M/97.8M [00:00<00:01, 53.4MB/s]
 50%|█████     | 49.1M/97.8M [00:01<00:00, 53.5MB/s]
 55%|█████▌    | 54.2M/97.8M [00:01<00:00, 53.4MB/s]
 61%|██████    | 59.9M/97.8M [00:01<00:00, 55.0MB/s]
 67%|██████▋   | 65.1M/97.8M [00:01<00:00, 54.0MB/s]
 73%|███████▎  | 71.2M/97.8M [00:01<00:00, 56.8MB/s]
 78%|███████▊  | 76.8M/97.8M [00:01<00:00, 56.0MB/s]
 84%|████████▍ | 82.1M/97.8M [00:01<00:00, 55.4MB/s]
 89%|████████▉ | 87.5M/97.8M [00:01<00:00, 55.2MB/s]
 95%|█████████▍| 92.9M/97.8M [00:01<00:00, 54.9MB/s]
10

[36m(classify_images_on_gpu pid=3079)[0m [GPU Inference] Processing 100 images...





✓ Classified 100 images


## Inspect Predictions

In [7]:
# Display predictions
print("Sample predictions:")
for i, pred in enumerate(predictions[:10], 1):
    print(f"{i}. {pred['prediction']:30s} (confidence: {pred['confidence']:.4f})")

# Class distribution
from collections import Counter
import numpy as np

pred_classes = [p["prediction"] for p in predictions]
confidences = [p["confidence"] for p in predictions]

print("\nTop-10 predicted classes:")
for cls, count in Counter(pred_classes).most_common(10):
    print(f"  {cls:30s}: {count}")

print(
    f"\nConfidence — avg: {np.mean(confidences):.4f}, "
    f"min: {np.min(confidences):.4f}, "
    f"max: {np.max(confidences):.4f}"
)

Sample predictions:
1. spatula                        (confidence: 0.2423)
2. French loaf                    (confidence: 0.3737)
3. eggnog                         (confidence: 0.0718)
4. plate                          (confidence: 0.5072)
5. French loaf                    (confidence: 0.1651)
6. tray                           (confidence: 0.1228)
7. chocolate sauce                (confidence: 0.1451)
8. pretzel                        (confidence: 0.1955)
9. pretzel                        (confidence: 0.1525)
10. French loaf                    (confidence: 0.1984)

Top-10 predicted classes:
  French loaf                   : 34
  dough                         : 15
  chocolate sauce               : 9
  eggnog                        : 8
  plate                         : 7
  pretzel                       : 4
  tray                          : 2
  burrito                       : 2
  meat loaf                     : 2
  espresso                      : 2

Confidence — avg: 0.2172, min: 0.0544, 

## Write Results

In [8]:
# Write predictions to S3
@ray.remote
def write_predictions_to_s3(predictions):
    import pyarrow as pa
    import pyarrow.parquet as pq
    import pyarrow.fs as pafs

    fs = pafs.S3FileSystem(
        endpoint_override="minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        scheme="http",
        region="us-east-1",
    )

    # Convert to Arrow table and write
    table = pa.Table.from_pylist(predictions)
    pq.write_to_dataset(table, root_path="bucket/notebook_predictions/", filesystem=fs)
    return len(predictions)


count_ref = write_predictions_to_s3.remote(predictions)
count = ray.get(count_ref)
print(f"Written {count:,} predictions to s3://bucket/notebook_predictions/")


# Read back to verify
@ray.remote
def read_and_show_predictions():
    import pyarrow.parquet as pq
    import pyarrow.fs as pafs

    fs = pafs.S3FileSystem(
        endpoint_override="minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        scheme="http",
        region="us-east-1",
    )

    table = pq.read_table("bucket/notebook_predictions/", filesystem=fs)
    return {"count": len(table), "sample": table.slice(0, 5).to_pylist()}


result_ref = read_and_show_predictions.remote()
result = ray.get(result_ref)

print(f"Read back {result['count']:,} rows")
print("\nSample rows:")
for i, row in enumerate(result["sample"], 1):
    print(f"{i}. {row}")

Written 100 predictions to s3://bucket/notebook_predictions/
Read back 100 rows

Sample rows:
1. {'path': 'bucket/images/food_00000.jpg', 'prediction': 'spatula', 'confidence': 0.24226126074790955}
2. {'path': 'bucket/images/food_00001.jpg', 'prediction': 'French loaf', 'confidence': 0.37366271018981934}
3. {'path': 'bucket/images/food_00002.jpg', 'prediction': 'eggnog', 'confidence': 0.0718255490064621}
4. {'path': 'bucket/images/food_00003.jpg', 'prediction': 'plate', 'confidence': 0.5072240233421326}
5. {'path': 'bucket/images/food_00004.jpg', 'prediction': 'French loaf', 'confidence': 0.16506265103816986}


## Cleanup

In [9]:
ray.shutdown()
print("Ray disconnected.")

Ray disconnected.
