In [1]:
# Full dataset 41000 images

# FILES_DIR = "/home/ec2-user/SageMaker/movie_posters/img_41K"

## CHANGE 1: Read files from S3 instead of locally
FILES_DIR = "s3://air-example-data-2/movie-image-small-filesize-1GB"

# CHANGE 2: Point at a cluster 
%env ANYSCALE_HOST=https://console.anyscale-staging.com
CLUSTER_URL = "anyscale://workspace-project-demo/workspace-cluster-demo"
RUNTIME = {"working_dir": ".", "env_vars": {"RAY_SCHEDULER_EVENTS": "0"}}

# CHANGE 3: Increase workers from 4 to 20. 
NUM_WORKERS=20

import warnings
warnings.filterwarnings('ignore')

import numpy as np
import json

import ray
from ray.train.torch import TorchPredictor, TorchCheckpoint
from ray.train.batch_predictor import BatchPredictor
from ray.data.preprocessors import BatchMapper
from ray.data.datasource import ImageFolderDatasource
import anyscale

from torchvision.models.detection.ssd import ssd300_vgg16

from util import visualize_objects, convert_to_tensor, SSDPredictor

# TODO: Enable auto casting once we resolve call_model() output format
from ray.data.context import DatasetContext
ctx = DatasetContext.get_current();
ctx.enable_tensor_extension_casting = False

env: ANYSCALE_HOST=https://console.anyscale-staging.com


In [2]:
def batch_predict(files_dir):
    dataset = ray.data.read_datasource(
        ImageFolderDatasource(), root=files_dir, size=(300, 300), mode="RGB"
    )
    preprocessor = BatchMapper(convert_to_tensor)
    model = ssd300_vgg16(pretrained=True)
    ckpt = TorchCheckpoint.from_model(model=model, preprocessor=preprocessor)
    predictor = BatchPredictor.from_checkpoint(ckpt, SSDPredictor)

    return predictor.predict(dataset, 
                             batch_size=128,
                             min_scoring_workers=NUM_WORKERS,
                             max_scoring_workers=NUM_WORKERS, 
                             num_cpus_per_worker=4, 
                             num_gpus_per_worker=1, 
                             feature_columns=["image"], 
                             keep_columns=["image"])





In [3]:
ray.init(CLUSTER_URL, runtime_env=RUNTIME)

Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.

Parsing Ray Client arguments
Finished parsing arguments.

Choosing a project
Using the project workspace-project-demo:
  name:               workspace-project-demo
  project id:         prj_cNsZAtGnE6FU5tczBLqVsJv3

Preparing the cluster
Cluster workspace-cluster-demo is currently running.
Connecting to this cluster:
  cluster id:                   ses_Lz9BqdCTSjv99SNRMjPKwTRv
  cluster environment:          apt_9zd5xA9LDSNTibWFSgXmCTkx:3
  cluster environment id:       bld_LuYuxCsYgZG1e2afMLQx9YxE
  cluster compute:              demo_cluster_compute_af9265a4-13b4-4c99-8581-00ba8526b1b3
  cluster compute id:           cpt_YZbKiFjBda9HPxDc44bKRnkj
  idle termination:             120 minutes
  maximum uptime:               disabled
  link:                         https://console.anyscale-staging.com/projects/prj_cNsZAtGnE6FU5tczBLqVsJv3/clusters/ses_Lz9BqdCTSjv99SNRMjPKwTRv

Starting the interactive s

0,1
Python version:,3.8.5
Ray version:,3.0.0.dev0
Dashboard:,http://https://session-lz9bqdctsjv99snrmjpkwtrv.i.anyscaleuserdata-staging.com/auth/?token=agh0_CkcwRQIhANi21ZCVoKkIHTGnFFtfysDcoLRRO8Z1SKJNuziVXXF6AiAte1L7o79zvq3kjX5iZXZ0KDWwraW-GqY1ACOMcQTj4hJmEiAlBPw5uCiEeqKg553UhWeGNgxNvc_2tBVrwvBWh8uXyhgCIgNuL2E6DAjVyZXiBhDY9p-rA0IMCNXG_ZYGENj2n6sD-gEeChxzZXNfTHo5QnFkQ1RTanY5OVNOUk1qUEt3VFJ2&redirect_to=dashboard


In [4]:
prediction_results = batch_predict(FILES_DIR)


Read->Map_Batches: 100%|██████████| 480/480 [00:37<00:00, 12.91it/s]
Map Progress (20 actors 0 pending): 100%|██████████| 480/480 [01:18<00:00,  6.14it/s]
(raylet) Spilled 15587 MiB, 658 objects, write throughput 1407 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


In [6]:
visualize_objects(prediction_results)

Map: 100%|██████████| 2/2 [00:00<00:00,  5.20it/s]
