# RAY

In [1]:
import ray
import pandas as pd

ray.init(include_dashboard=True)

2024-05-17 10:48:23,230	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.12
Ray version:,2.20.0
Dashboard:,http://127.0.0.1:8265


### Управление объектами в Object Store

In [2]:
y = "Hello world"
y_ref = ray.put(y)

print(y_ref)
print(ray.get(y_ref))

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505)
Hello world


### Tasks

In [3]:
@ray.remote(num_cpus=1)
def func(a: int) -> int:
    return a + 1

# аргумент автоматически добавляется в хранилище
res_ref = func.remote(2)
print(ray.get(res_ref))

# можно добавить вручную
y = 1
y_ref = ray.put(y)
res_ref = func.remote(y_ref)
print(ray.get(res_ref))

3
2


### Actors

In [4]:
# если нужен объект, который существует в единственном экземпляре
@ray.remote
class Actor:
    def __init__(self):
        self.d = dict()    

    def get(self, k: int) -> str:
        return self.d[k]

    def set(self, k: int, v: str):        
        self.d[k] = v


actor_ref = Actor.remote()
actor_ref.set.remote(2, "Hello")

res_ref = actor_ref.get.remote(2)
print(ray.get(res_ref))

Hello


### Datasets

In [5]:
ds = ray.data.range(10000)
ds = ray.data.read_csv("../data/currencies.csv")
ds.show(5)

ds = ds.repartition(2)

ds_new = ds.map(
    lambda record: {"USD" : 2 * record["USD"] }
)
ds_new.show(5)

def _mapper(df: pd.DataFrame) -> pd.DataFrame:
    df["USD"] *= 2
    return df

ds_new = ds.map_batches(
    _mapper, 
    batch_format="pandas"
)
ds_new.show(5)

2024-05-17 01:26:39,884	INFO dataset.py:2370 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-05-17 01:26:39,897	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-05-17_01-25-52_565171_11830/logs/ray-data
2024-05-17 01:26:39,898	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> LimitOperator[limit=5]


[dataset]: Run `pip install tqdm` to enable progress reporting.


2024-05-17 01:26:40,646	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-05-17_01-25-52_565171_11830/logs/ray-data
2024-05-17 01:26:40,646	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Map(<lambda>)] -> LimitOperator[limit=5]


{'DATE': datetime.date(2017, 4, 1), 'EUR': 59.8107, 'USD': 55.9606, 'GBP': 69.7605}
{'DATE': datetime.date(2017, 4, 4), 'EUR': 59.8953, 'USD': 56.1396, 'GBP': 70.3429}
{'DATE': datetime.date(2017, 4, 5), 'EUR': 60.2427, 'USD': 56.5553, 'GBP': 70.3548}
{'DATE': datetime.date(2017, 4, 6), 'EUR': 59.6948, 'USD': 55.894, 'GBP': 69.4986}
{'DATE': datetime.date(2017, 4, 7), 'EUR': 60.0827, 'USD': 56.4369, 'GBP': 70.3655}


2024-05-17 01:26:40,911	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-05-17_01-25-52_565171_11830/logs/ray-data
2024-05-17 01:26:40,912	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[MapBatches(_mapper)] -> LimitOperator[limit=5]


{'USD': 111.9212}
{'USD': 112.2792}
{'USD': 113.1106}
{'USD': 111.788}
{'USD': 112.8738}
{'DATE': datetime.date(2017, 4, 1), 'EUR': 59.8107, 'USD': 111.9212, 'GBP': 69.7605}
{'DATE': datetime.date(2017, 4, 4), 'EUR': 59.8953, 'USD': 112.2792, 'GBP': 70.3429}
{'DATE': datetime.date(2017, 4, 5), 'EUR': 60.2427, 'USD': 113.1106, 'GBP': 70.3548}
{'DATE': datetime.date(2017, 4, 6), 'EUR': 59.6948, 'USD': 111.788, 'GBP': 69.4986}
{'DATE': datetime.date(2017, 4, 7), 'EUR': 60.0827, 'USD': 112.8738, 'GBP': 70.3655}
