# VeRL Ray API Tutorial

## Chapter 1: Ray Basics

In [1]:
import os

# turn off Megatron timer
os.environ['MEGATRON_USE_CUDA_TIMER'] = '0'
os.environ['MEGATRON_START_PROCESS_TIMER'] = 'False'
os.environ['NCCL_DEBUG'] = 'WARN'

In [2]:
import ray
import torch

In [3]:
# Build a local ray cluster. The head node and worker node are on this machine
ray.init()

2024-01-22 14:52:10,398	INFO worker.py:1655 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.2
Ray version:,2.3.0
Dashboard:,http://127.0.0.1:8265


[2m[36m(GPUAccumulator pid=1489193)[0m rank 0, value: tensor([1.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489357)[0m rank 2, value: tensor([3.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489358)[0m rank 3, value: tensor([4.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489356)[0m rank 1, value: tensor([2.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489837)[0m rank 0, value: tensor([2.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489999)[0m rank 3, value: tensor([5.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489998)[0m rank 2, value: tensor([4.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1489997)[0m rank 1, value: tensor([3.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1490000)[0m rank 0, value: tensor([3.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1490633)[0m rank 3, value: tensor([6.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1490635)[0m rank 5, value: tensor([8.], device='cuda:0')
[2m[36m(GPUAccumulator pid=1490636)[0m r

[2m[36m(MLPLayerWorker pid=1493490)[0m No modifications detected for re-loaded extension module fused_mix_prec_layer_norm_cuda, skipping build step...
[2m[36m(MLPLayerWorker pid=1493490)[0m Loading extension module fused_mix_prec_layer_norm_cuda...
[2m[36m(MLPLayerWorker pid=1493654)[0m No modifications detected for re-loaded extension module fused_mix_prec_layer_norm_cuda, skipping build step...
[2m[36m(MLPLayerWorker pid=1493654)[0m Loading extension module fused_mix_prec_layer_norm_cuda...
[2m[36m(MLPLayerWorker pid=1493653)[0m No modifications detected for re-loaded extension module fused_mix_prec_layer_norm_cuda, skipping build step...
[2m[36m(MLPLayerWorker pid=1493653)[0m Loading extension module fused_mix_prec_layer_norm_cuda...
[2m[36m(MLPLayerWorker pid=1493652)[0m No modifications detected for re-loaded extension module fused_mix_prec_layer_norm_cuda, skipping build step...
[2m[36m(MLPLayerWorker pid=1493652)[0m Loading extension module fused_mix_prec

Implement an Accumulator class.

In [4]:
@ray.remote
class Accumulator:
    def __init__(self):
        self.value = 0
        
    def add(self, x):
        self.value += x
        
    def get_value(self):
        return self.value

In [5]:
# Instantiate an accumulator. Accumulator can be viewed as a process, acting as an RPC service.
accumulator = Accumulator.remote()

In [6]:
value_ref = accumulator.get_value.remote() # Check the current value. Note that this function returns immediately and does not actually wait for the remote execution to complete.
# Get the value
value = ray.get(value_ref)
print(value)

0


In [7]:
# Accumulate, then check the result.
accumulator.add.remote(10)  # Similarly, the 'add' here will return immediately.
new_value = ray.get(accumulator.get_value.remote())
print(new_value)

10


## Chapter 2: Resource Pool and RayWorkerGroup
In the previous example, it was a simple single-process worker. 
In this example, we implement a worker with a GPU and form a RayWorkerGroup. Within this RayWorkerGroup, we implement a simple operation of an accumulator.

In [8]:
from single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup, merge_resource_pool
from single_controller.base import Worker

In [9]:
resource_pool = RayResourcePool([4], use_gpu=True)

In [10]:
@ray.remote
class GPUAccumulator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank

    def add(self, x):
        self.value += x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()


In [11]:
# Each worker's initial value is its rank, and then each rank's value is incremented by 1, so the values obtained on each rank are [1, 2, 3, 4]
class_with_args = RayClassWithInitArgs(cls=GPUAccumulator)
worker_group = RayWorkerGroup(resource_pool, class_with_args)
print(worker_group.execute_all_sync('add', x=[1,1,1,1]))

[tensor([1.]), tensor([2.]), tensor([3.]), tensor([4.])]


The principle of parameter passing: The input parameter is a list of length world_size, where each element in the list is dispatched respectively to each worker in the RayWorkerGroup. 
The return parameter is also a list, corresponding to the return value of each worker.

### GPU Resource Sharing

RayWorkerGroups mapped to the same resource pool share the GPU. In this example, we implement three resource pools: the first occupies 4 GPUs, the second also occupies 4 GPUs, and the last occupies all 8 GPUs. Among them, the first resource pool reuses the resource pool mentioned above.

In [12]:
# Create a new resource pool and then merge the newly created resource pool with the previous one.
resource_pool_1 = RayResourcePool([4], use_gpu=True, name_prefix='a')
resource_pool_merge = merge_resource_pool(resource_pool, resource_pool_1)

In [13]:
# Establish a RayWorkerGroup on the newly created resource pool.
worker_group_1 = RayWorkerGroup(resource_pool_1, class_with_args)
worker_group_merge = RayWorkerGroup(resource_pool_merge, class_with_args)

In [14]:
# Run 'add' on the second set of 4 GPUs; the result should be [2, 3, 4, 5].
output_1 = worker_group_1.execute_all_sync('add', x=[2,2,2,2])
print(output_1)

[tensor([2.]), tensor([3.]), tensor([4.]), tensor([5.])]


In [15]:
# Run 'add' on the merged set of 8 GPUs; the result should be [3, 4, 5, 6, 7, 8, 9, 10].
output_merge = worker_group_merge.execute_all_sync('add', x=[3,3,3,3,3,3,3,3])
print(output_merge)

[tensor([3.]), tensor([4.]), tensor([5.]), tensor([6.]), tensor([7.]), tensor([8.]), tensor([9.]), tensor([10.])]


In [16]:
print(worker_group.world_size, worker_group_1.world_size, worker_group_merge.world_size)

4 4 8


## Chapter 3: Data Dispatch, Execution and Collection

In the above example, we used the `execute_all_sync` function in the RayWorkerGroup to dispatch data from the driver to each worker. This is very inconvenient for coding. 
In this chapter, we use the form of function decorators to allow RayWorkerGroup to directly call functions written in the Worker, and to greatly simplify parameter passing.

In [17]:
from single_controller.ray.decorator import register, Dispatch, Execute

In [18]:
@ray.remote
class GPUAccumulatorDecorator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank
    
    # map from a single input to all the worker
    @register(Dispatch.ONE_TO_ALL)
    def add(self, x):
        print(x)
        self.value = self.value + x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()

In [19]:
class_with_args = RayClassWithInitArgs(cls=GPUAccumulatorDecorator)
gpu_accumulator_decorator = RayWorkerGroup(resource_pool_merge, class_with_args)

In [20]:
# As we can see, 10 is automatically dispatched to each Worker in this RayWorkerGroup.
print(gpu_accumulator_decorator.add(x=10))

[tensor([10.]), tensor([11.]), tensor([12.]), tensor([13.]), tensor([14.]), tensor([15.]), tensor([16.]), tensor([17.])]


### Custom Dispatch, Collection
Users can customize `dispatch` and `collection` function. You only need to write the `dispatch_fn` and `collect_fn` functions yourself. We also support executing RPC only on rank_zero, with specific examples provided below.

In [21]:
from single_controller.ray.decorator import register, Dispatch, collect_all_to_all, Execute

In [22]:
def two_to_all_dispatch_fn(worker_group, *args, **kwargs):
    """
    Assume the input is a list of 2. Duplicate the input interleaved and pass to each worker.
    """
    for arg in args:
        assert len(arg) == 2
        for i in range(worker_group.world_size - 2):
            arg.append(arg[i % 2])
    for k, v in kwargs.items():
        assert len(v) == 2
        for i in range(worker_group.world_size - 2):
            v.append(v[i % 2])
    return args, kwargs


@ray.remote
class TestActor(Worker):
    # TODO: pass *args and **kwargs is bug prone and not very convincing
    def __init__(self, x) -> None:
        super().__init__()
        self._x = x

    def foo(self, y):
        return self._x + y

    @register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.RANK_ZERO)
    def foo_rank_zero(self, x, y):
        return self._x + y + x

    @register(dispatch_mode={'dispatch_fn': two_to_all_dispatch_fn, 'collect_fn': collect_all_to_all})
    def foo_custom(self, x, y):
        return self._x + y + x

In [23]:
class_with_args = RayClassWithInitArgs(cls=TestActor, x=2)
worker_group = RayWorkerGroup(resource_pool, class_with_args)

In [24]:
output_ref = worker_group.foo_custom(x=[1, 2], y=[5, 6])
assert output_ref == [8, 10, 8, 10]

output_ref = worker_group.foo_rank_zero(x=1, y=2)
assert output_ref == 5

## Chapter 4: MegatronRayWorkerGroup

Finally, we implement a `MegatronRayWorkerGroup`, within which we create a Megatron and then run a tensor parallel (tp) split Llama mlp layer. Here, we use a complex dispatch mode, `Megatron_COMPUTE`. This dispatch mode assumes that user passes the data partitioned by DP dimension. The data is dispatched to all tp/pp ranks within the same dp group, and ultimately only collects output data from tp=0 and the last pp. In this way, for users that only write code on the driver, the Megatron behind the RPC becomes transparent.

In [25]:
from single_controller.ray import MegatronRayWorkerGroup
from single_controller.megatron.worker import MegatronWorker
from omegaconf import OmegaConf

In [26]:
@ray.remote
class MLPLayerWorker(MegatronWorker):
    @register(Dispatch.ONE_TO_ALL)
    def init_model(self, config):
        from omegaconf import OmegaConf
        from verl.models.llama.megatron.layers import ParallelLlamaMLP
        megatron_config = OmegaConf.create({'sequence_parallel_enabled': False})
        self.parallel_layer = ParallelLlamaMLP(config=config, megatron_config=megatron_config)
    
    @register(Dispatch.ONE_TO_ALL)
    def get_weights(self):
        output = {}
        for key, val in self.parallel_layer.named_parameters():
            output[key] = val
        return output
    
    @register(Dispatch.MEGATRON_COMPUTE)
    def run_layer(self, x):
        x = x.to('cuda')
        y = self.parallel_layer(x)
        return y

In [27]:
layer_cls = RayClassWithInitArgs(cls=MLPLayerWorker)
layer_worker_group = MegatronRayWorkerGroup(resource_pool=resource_pool,
                                            ray_cls_with_init=layer_cls,
                                            default_megatron_kwargs={
                                                'tensor_model_parallel_size': 4,
                                                'pipeline_model_parallel_size': 1,
                                                'num_layers_per_virtual_pipeline_stage': None
                                            })


In [28]:
print(layer_worker_group.world_size, layer_worker_group.tp_size, layer_worker_group.pp_size, layer_worker_group.dp_size)

4 4 1 1


In [29]:
ffn_hidden_size = 11008
batch_size = 16
seq_len = 2048
hidden_size = 4096

config = OmegaConf.create({
    'hidden_size': hidden_size,
    'intermediate_size': ffn_hidden_size,
    'hidden_act': 'silu',
    'pretraining_tp': 1,
    'tp': layer_worker_group.tp_size,
})

In [30]:
x = torch.rand(size=(seq_len, batch_size, hidden_size), dtype=torch.float32)

In [31]:
layer_worker_group.init_model(config)

[None, None, None, None]

In [32]:
output = layer_worker_group.run_layer([x])  # This must be a list of size 1, ensuring that the input equals the data parallel (dp).
print(output[0].shape)

torch.Size([2048, 16, 4096])


In [27]:
print(gpu_accumulator_decorator.world_size)

0


In [33]:
# Shutdown ray cluster
ray.shutdown()