## 1.函数并行

In [1]:
import time 
import ray
#关闭ray，不关闭直接初始化ray可能会报错
ray.shutdown()
#初始化ray
ray.init()

@ray.remote(num_returns=2)
def ParallelFunction(num):
    time.sleep(5)
    print("the input number:",num)
    return num - 1, 5

r1,r2 = ParallelFunction.remote(10)
r3,r4 = ParallelFunction.remote(9)

task_list = [r1,r2,r3,r4]

r1,r2,r3,r4 = ray.get(task_list)

print("task1_return:",r1,r2)
print("task2_return:",r3,r4)

2021-01-27 13:02:17,279	INFO services.py:1169 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


task1_return: 9 5
task2_return: 8 5
[2m[36m(pid=4480)[0m the input number: 9
[2m[36m(pid=4478)[0m the input number: 10


## 2.类并行

In [1]:
import time 
import ray
#关闭ray
ray.shutdown()
#初始化ray
ray.init()

@ray.remote
class ParallelClass():
    # ray类并行无法获取init方法中的返回值
    def __init__(self,num):
        print("实现分布初始化功能",num)
        self.num = num
        #return num - 1, num + 1
    
    @ray.method(num_returns=2)
    def ClassMethod(self,num):
        num = num+1
        print("这是并行类方法信息:",num)
        print("这是并行类初始化函数信息:",self.num-1)
        return num,self.num
    
C1  = ParallelClass.remote(10)
C2  = ParallelClass.remote(5)

r1,r2 = C1.ClassMethod.remote(100)
r3,r4 = C2.ClassMethod.remote(50)

task = [r1,r2,r3,r4]
#获取并行初始化结果
r1,r2,r3,r4 = ray.get(task)
print(r1,r2,r3,r4)
        
        

2021-01-27 13:19:38,587	INFO services.py:1169 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


101 10 51 5
[2m[36m(pid=5472)[0m 实现分布初始化功能 5
[2m[36m(pid=5472)[0m 这是并行类方法信息: 51
[2m[36m(pid=5472)[0m 这是并行类初始化函数信息: 4
[2m[36m(pid=5473)[0m 实现分布初始化功能 10
[2m[36m(pid=5473)[0m 这是并行类方法信息: 101
[2m[36m(pid=5473)[0m 这是并行类初始化函数信息: 9


## 3.给并行类/函数分配计算资源

In [None]:
import torch 
import ray
import time

ray.shutdown()
ray.init()

print(35*"=","GPUfunction验证",35*"=")

@ray.remote(num_gpus=0.5,num_returns=2,max_calls=1)
def GPUfunction(num):
    device = torch.device('cuda')
    x = torch.tensor([num+1, num+2, num+3], device=device)
    time.sleep(5)
    return x*2, x/2

f1,f2 = GPUfunction.remote(10)
f3,f4 = GPUfunction.remote(5)

task = [f1,f2,f3,f4]
f1,f2,f3,f4 = ray.get(task)
print(f1,f2,f3,f4)

print(35*"=","GPUclass验证",35*"=")

"""
1.num_gpus控制一个远程类使用多少份GPU资源
2.指定cpu资源数量没作用,是不是因为我的服器只有一个CPU,这里的cpu算力划分是按cpu个数划分不是按单个cpu逻辑核心数划分
"""

@ray.remote(num_gpus=0.5)
class GPUclass():
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    @ray.method(num_returns=1)
    def GpuCount(self,num):
        count_gpu = torch.tensor([0]).to(self.device)
        num_gpu = torch.tensor([1]).to(self.device)
        count_cpu = 0
        num_cpu = 1
        time1 = time.time()
        for i in  range(num):
            count_gpu = num_gpu + count_gpu
            count_cpu = num_cpu + count_cpu
        time2 = time.time()
        cost = time2 - time1
        return cost


C1 = GPUclass.remote()
C2 = GPUclass.remote()

r1 = C1.GpuCount.remote(10000)
r2 = C2.GpuCount.remote(10000)

task = [r1,r2]

r1,r2 = ray.get(task)

print(r1,r2)


    

2021-01-27 16:29:03,463	INFO services.py:1169 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


0.06608915328979492 0.06443953514099121


. In total there are 1 pending tasks and 0 pending actors on this node. This is likely due to all cluster resources being claimed by actors. To resolve the issue, consider creating fewer actors or increase the resources available to this Ray cluster. You can ignore this message if this Ray cluster is expected to auto-scale.
