In [4]:
from mpify import in_torchddp, ddp_rank, ddp_worldsize
import os
import torch
import torch.distributed as dist

"""
    Applying mpify to the examples in PyTorch Distributed tutorial at
    https://pytorch.org/tutorials/intermediate/dist_tuto.html
"""

"""Blocking point-to-point communication."""
def run_blocking(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        for r in range(1,size):
            tensor += 1
            # Send the tensor to process r
            dist.send(tensor=tensor, dst=r)
            print(f'Rank 0 started blocking send to rank {r}', flush=True)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0], flush=True)

"""Non-blocking point-to-point communication."""
def run_nonblocking(rank, size):
    print(f"Rank {rank}, name: {__name__}")
    # import torch
    # import torch.distributed as dist
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        for r in range(1,size):
            tensor = tensor+1
            # Send the tensor to process r
            req = dist.isend(tensor=tensor, dst=r)
            print(f'Rank 0 started nonblocking send to rank {r}', flush=True)
            req.wait() # Must call req.wait() before next iteration, to avoid data corruption
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print(f'Rank {rank} started nonblocking receive', flush=True)
        req.wait()
    print('Rank ', rank, ' has data ', tensor[0], flush=True)

""" All-Reduce example."""
def run_allreduce(rank, size):
    """ Simple point-to-point communication. """
    # group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM) #, group=group)
    print('AllReduce: Rank ', rank, ' has data ', tensor[0], flush=True)

def runner(fn):
    return fn(ddp_rank(), ddp_worldsize())


In [5]:
size = 5 # torch.cuda.device_count() if torch.cuda.is_available() else 5
    
imports='''
from mpify import in_torchddp, ddp_rank, ddp_worldsize
import os
import torch
import torch.distributed as dist
'''

# ideal:
#   mpify.env(imports=['torch', 'numpy as np', 'from x import y as z'], need="obja objB objC funcA")
#   ranch(size, run_nonblocking, imports=[import_specs], need={} )
in_torchddp(size, runner, run_nonblocking, imports=imports)

Passing env to contextualize(): {}
Rank 0, name: __main__
Rank 0 started nonblocking send to rank 1
Rank 0 started nonblocking send to rank 2
Rank 0 started nonblocking send to rank 3
Rank 0 started nonblocking send to rank 4
Rank  0  has data  tensor(4.)


In [4]:
def foo(*args, **kwargs):
    

'runner'