In [21]:
import ray

ray.shutdown()
ray.init()


2023-06-03 08:48:40,327	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.13
Ray version:,2.4.0
Dashboard:,http://127.0.0.1:8265


Ray 允许您在集群中将函数作为远程任务运行。使用修饰@ray.remote

[0, 1, 4, 9]


在使用@ray.remote标记函数后，该函数的执行将具有以下影响：

- 远程调用：可以在集群中的任何节点上调用该函数，而不需要在本地（即调用方）执行该函数。这使得分布式计算应用程序更易于编写和部署。

- 并行执行：由于该函数在集群中的某个节点上执行，因此可以并行地执行多个该函数的实例，以提高计算效率。

- 数据传输：由于远程函数的执行发生在不同的节点上，因此需要将数据从调用方节点传输到执行节点，在函数执行完成后再将结果返回到调用方节点。因此，数据传输的成本和延迟可能会对执行时间产生影响。

需要注意的是，将函数标记为远程函数并不一定总是对性能有益，因为远程调用和数据传输都会增加计算的开销。因此，需要根据具体应用场景和问题来考虑是否需要使用远程函数。

### 关键概念
- Tasks：Ray 允许任意函数在单独的 Python worker 上异步执行。这些异步 Ray 函数称为“Tasks”。Ray 使Tasks能够根据 CPU、GPU 和自定义资源指定其资源需求。集群调度程序使用这些资源请求在集群中分配Tasks以并行执行。
- Actor :将 Ray API 从函数（任务）扩展到类。参与者本质上是一个有状态的工作者（或服务）。当一个新的 actor 被实例化时，一个新的 worker 被创建，并且 actor 的方法被安排在那个特定的 worker 上并且可以访问和改变那个 worker 的状态。与任务一样，参与者支持 CPU、GPU 和自定义资源要求。
- Objects： 在 Ray 中，任务和参与者在Objects上创建和计算。我们将这些Objects称为remote objects，因为它们可以存储在 Ray 集群中的任何位置，我们使用Objects引用来引用它们。远程Objects缓存在 Ray 的分布式共享内存 Objects存储中，集群中每个节点都有一个Objects存储。在集群设置中，远程Objects可以存在于一个或多个节点上，而与持有Objects ref(s) 的人无关。
- Placement Groups: Placement Groups允许用户跨多个节点原子地保留资源组（即，组调度）。然后，它们可用于安排 Ray 任务和 actors，这些任务和 actors 尽可能靠近本地 (PACK) 或分散 (SPREAD)。Placement Groups一般用于帮派调度actor，也有辅助任务。

## π 的蒙特卡洛估计

原理:该方法通过在 2x2 正方形内随机采样点来工作。我们可以使用以原点为中心的单位圆内包含的点的比例来估计圆的面积与正方形的面积之比。鉴于我们知道真实的比率是 π/4，我们可以将我们的估计比率乘以 4 来近似 π 的值。我们为计算此近似值而采样的点越多，该值就越接近 π 的真实值。

In [22]:
import math
import time
import random
import psutil # memory usage

In [23]:
#定义 Progress Actor
@ray.remote
class ProgressActor:
    def __init__(self, total_num_samples: int):
        self.total_num_samples = total_num_samples
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )
    
#定义 Worker Actor
@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        # Report progress every 1 million samples.
        if (i + 1) % 1_000_000 == 0:
            # This is async.
            progress_actor.report_progress.remote(task_id, i + 1)

    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
    return num_inside



In [28]:
#初始部署参数
#创建进度 Actor
# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 16
NUM_SAMPLES_PER_TASK = 5_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK

# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)

# 创建并执行所有采样任务的时间戳
start_time = time.time()

# Create and execute all sampling tasks in parallel.
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
    for i in range(NUM_SAMPLING_TASKS)
]

#调用 Progress Actor
# Query progress periodically.
# while True:
#     progress = ray.get(progress_actor.get_progress.remote())
#     print(f"Progress: {int(progress * 100)}%")

#     if progress == 1:
#         break

#     time.sleep(1)


# 最后,从远程采样任务中获取圆内的样本数并计算 π。

# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
estimate_pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {estimate_pi}")

# Print the total execution time.
ray.get(results)

# 计算平均任务执行时间
execution_time = time.time() - start_time
average_execution_time = execution_time / NUM_SAMPLING_TASKS

# 计算吞吐量
throughput = TOTAL_NUM_SAMPLES / execution_time

print(f"NUM_SAMPLING_TASKS = {NUM_SAMPLING_TASKS},NUM_SAMPLES_PER_TASK = {NUM_SAMPLES_PER_TASK} 时,")
print("Average task execution time: {average_execution_time} seconds")
print(f"Throughput: {throughput} samples/second")

# 获取当前进程的内存使用情况
memory_usage = psutil.Process().memory_info().rss
print(f"Memory usage: {memory_usage} bytes")


# 计算估计值的相对误差
true_pi = math.pi
relative_error = abs((true_pi - estimate_pi) / true_pi)
print(f"Relative Error: {relative_error}")

Estimated value of π is: 3.1418251
NUM_SAMPLING_TASKS = 16,NUM_SAMPLES_PER_TASK = 5000000 时,Average task execution time: 0.45083485543727875 seconds
Throughput: 11090535.569061857 samples/second
Memory usage: 136962048 bytes
Relative Error: 7.398999037683116e-05
