In [None]:
%pip install -U "ray[data]"

In [135]:
import sys
import time
from collections import Counter

import ray

In [136]:
ray.shutdown()

In [137]:
runtime_env = {
    "pip": [
            "accelerate>=0.16.0",
            "transformers>=4.26.0",
            "tokenizers>=0.13.3",
            "numpy<1.24",  # remove when mlflow updates beyond 2.2
            
    ],
    "env_vars": {"HUGGING_FACE_HUB_TOKEN": "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"}
}

In [138]:
ray.init("ray://10.3.2.51:10001",runtime_env=runtime_env)

0,1
Python version:,3.10.8
Ray version:,2.7.0
Dashboard:,http://10.3.2.51:8265


In [139]:
from typing import Dict
import numpy as np
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me","what does cheese smell like?"]))

In [143]:

class HuggingFacePredictor:
    def __init__(self):
        from transformers import pipeline
        from transformers import AutoTokenizer
        from transformers import AutoModelForCausalLM
        import os
        import ray
        print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
        print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
        model_name = "meta-llama/Llama-2-7b-hf"
        max_memory_mapping = {0: "16GB", 1: "16GB"}
        model_4bit = AutoModelForCausalLM.from_pretrained(
            model_name, device_map="auto", max_memory=max_memory_mapping
        )
        # Set "cuda:0" as the device so the Huggingface pipeline uses GPU.
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = pipeline(
            "text-generation",
            model=model_4bit,
            tokenizer=self.tokenizer
        )
        print("done")

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        print("batch={}\n".format(batch["data"]))
        predictions = self.model(list(batch["data"]), max_length=200, num_return_sequences=1,eos_token_id=self.tokenizer.eos_token_id,do_sample=True)
        print(f"predictions={predictions}")
        batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
        return batch

In [144]:
@ray.remote
def run_remotely():
   
    predictions = ds.map_batches(
        HuggingFacePredictor,
        num_gpus=2,
        # Specify the batch size for inference.
        # Increase this for larger datasets.
        batch_size=1,
        # Set the ActorPool size to the number of GPUs in your cluster.
        compute=ray.data.ActorPoolStrategy(size=1),
        )
    return predictions.take_batch(5)

In [145]:
ray.get(run_remotely.remote())

[2m[36m(run_remotely pid=508)[0m Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(HuggingFacePredictor)] -> LimitOperator[limit=5]
[2m[36m(run_remotely pid=508)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(run_remotely pid=508)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(run_remotely pid=508)[0m MapBatches(HuggingFacePredictor): Waiting for 1 pool actors to start...
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 2023-09-28 12:31:07.572618: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia

[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m ray.get_gpu_ids(): [0, 1]
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m CUDA_VISIBLE_DEVICES: 0,1


Downloading (…)fetensors.index.json: 100%|██████████| 26.8k/26.8k [00:00<00:00, 21.2MB/s]
Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   0%|          | 0.00/9.98G [00:00<?, ?B/s][A
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   0%|          | 10.5M/9.98G [00:00<01:45, 94.0MB/s][A
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   0%|          | 41.9M/9.98G [00:00<00:54, 181MB/s] [A
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   1%|          | 73.4M/9.98G [00:00<00:47, 207MB/s][A
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   1%|          | 105M/9.98G [00:00<00:45, 218MB/s] [A
[2m[36m(_MapWorker pid=961, ip=10.3.0.48)[0m 
Downloading (…)of-00002.safetensors:   1%|▏         | 136M/9.98G [00:00<00:43, 225MB/s][A
[2m[36m(_Ma

RayTaskError: [36mray::run_remotely()[39m (pid=508, ip=10.3.2.51)
  File "/tmp/ipykernel_165227/2514000895.py", line 13, in run_remotely
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/dataset.py", line 2330, in take_batch
    res = next(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/iterator.py", line 181, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 32, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 538, in execute_to_iterator
    block_iter = itertools.chain([next(gen)], gen)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 51, in execute_to_legacy_block_iterator
    bundle_iter = execute_to_legacy_bundle_iterator(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 89, in execute_to_legacy_bundle_iterator
    bundle_iter = executor.execute(dag, initial_stats=stats)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 103, in execute
    self._topology, _ = build_streaming_topology(dag, self._options)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 300, in build_streaming_topology
    setup_state(dag)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 291, in setup_state
    parent_state = setup_state(parent)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 297, in setup_state
    op.start(options)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 109, in start
    ray.get(refs, timeout=DEFAULT_WAIT_FOR_MIN_ACTORS_SEC)
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, [36mray::_MapWorker.__init__()[39m (pid=961, ip=10.3.0.48, actor_id=80bf182c68c74845a921350d01000000, repr=MapWorker(MapBatches(HuggingFacePredictor)))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 366, in __init__
    self._map_transformer.init()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 113, in init
    self._init_fn()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 112, in init_fn
    ray.data._cached_fn = op_fn(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/util.py", line 67, in __init__
    super().__init__(*args, **kwargs)
  File "/tmp/ipykernel_165227/1899976724.py", line 17, in __init__
  File "/tmp/ray/session_2023-09-28_12-26-30_620027_8/runtime_resources/pip/5a380be25e50ff3ed8d55f2bc9e8376cd1c713c1/virtualenv/lib/python3.10/site-packages/transformers/pipelines/__init__.py", line 904, in pipeline
    raise Exception(
Exception: Impossible to guess which tokenizer to use. Please provide a PreTrainedTokenizer class or a path/identifier to a pretrained tokenizer.