Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ RuntimeProfiler*
checkpoint-*/
data*/
!mindnlp/data/
!mindnlp/core/utils/data/
!mindtorch/utils/data/
!mindnlp/dataset/
!docs/api/data/
!data2vec/
Expand Down
8 changes: 7 additions & 1 deletion mindtorch/_C/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,10 @@ def _log_api_usage_once(*args):
pass

ScriptDict = dict
ScriptList = list
ScriptList = list

class _DistStoreError(RuntimeError): pass

def _get_accelerator():
device_target = mindspore.get_context("device_target")
return device_(DEVICE_MAP[device_target])
117 changes: 117 additions & 0 deletions mindtorch/_C/_distributed_c10d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pickle
from typing import List, Any
from datetime import timedelta

import mindtorch
from mindtorch import Tensor
from mindtorch.distributed import Store, TCPStore
from mindtorch.distributed.c10d import Backend, ReduceOp


class ProcessGroup:
pass

class ProcessGroupGloo(Backend):
def __init__(
self,
store: Store,
rank: int,
size: int,
timeout: timedelta
) -> None:
super().__init__(rank, size)
self.store = store
self.ranks = []
self.pg = None

def name(self) -> str:
return 'gloo'

def allreduce(self, tensors: List[Tensor], opts: Any) -> Any:
if mindtorch.distributed.is_initialized():
self._allreduce_new_pg(tensors[0], opts)
else:
self._allreduce_use_store(tensors, opts)

def _allreduce_new_pg(self, tensor, opts):
# Get all global ranks
if len(self.ranks) == 0:
rank_bytes = pickle.dumps(mindtorch.distributed.get_rank())
self.store.set(f'__ar_rank_local_to_global_{self.rank_}', rank_bytes)
for local_rank in range(self.size_):
global_rank = pickle.loads(self.store.get(f'__ar_rank_local_to_global_{local_rank}'))
self.ranks.append(global_rank)

if self.pg is None:
self.pg = mindtorch.distributed.new_group(self.ranks, backend='gloo')

mindtorch.distributed.all_reduce(tensor, op=opts.reduceOp, group=self.pg, async_op=False)

def _allreduce_use_store(self, tensors: List[Tensor], opts: Any) -> Any:
tensor = tensors[0]
tensor_bytes = pickle.dumps(tensor)
self.store.set(f'__ar_data_{self.rank_}', tensor_bytes)

# Gather all tensors
gathered = []
for i in range(self.size_):
data = self.store.get(f'__ar_data_{i}')
gathered.append(pickle.loads(data))
stacked = mindtorch.stack(gathered)

reduce_op = opts.reduceOp
if reduce_op == ReduceOp.SUM:
result = stacked.sum(dim=0)
elif reduce_op == ReduceOp.MAX:
if stacked.dtype == mindtorch.int32:
result = stacked.to(mindtorch.int64).max(dim=0).values.to(mindtorch.int32)
else:
result = stacked.max(dim=0).values
elif reduce_op == ReduceOp.MIN:
if stacked.dtype == mindtorch.int32:
result = stacked.to(mindtorch.int64).min(dim=0)[0].to(mindtorch.int32)
else:
result = stacked.min(dim=0)[0]
elif reduce_op == ReduceOp.PRODUCT:
result = stacked.prod(dim=0)
else:
raise ValueError(f'Unsupported reduce operation: {reduce_op}')

tensors[0].copy_(result)
self._synchronize_and_cleanup()

def _synchronize_and_cleanup(self):
if self.rank_ == 0:
# Wait for the completion of allreduce() execution for other ranks and remove the tensor_i key
# to prevent subsequent allreduce() exceptions.
for i in range(1, self.size_):
self.store.get(f'__ar_finish_1_{i}')
for i in range(self.size_):
self.store.delete_key(f'__ar_data_{i}')
self.store.delete_key(f'__ar_finish_1_{i}')

# Ensure that other ranks wait for the deletion of tensor_i key to complete.
self.store.set('__ar_finish_all', '')

# Ensure that rank 0 exits last to prevent errors in other ranks.
for i in range(1, self.size_):
self.store.get(f'__ar_finish_2_{i}')
self.store.delete_key(f'__ar_finish_2_{i}')
self.store.delete_key('__ar_finish_all')
else:
self.store.set(f'__ar_finish_1_{self.rank_}', '')
self.store.get('__ar_finish_all')
self.store.set(f'__ar_finish_2_{self.rank_}', '')

def _set_sequence_number_for_group(self):
pass


class ProcessGroupHCCL:
def __init__(self, group_name):
self.group_name = group_name

def get_hccl_comm_name(self, global_rank):
return self.group_name

class Options: ...
2 changes: 1 addition & 1 deletion mindtorch/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def is_available() -> bool:
# set_debug_level,
# set_debug_level_from_env,
Store,
# TCPStore,
TCPStore,
Work as _Work,
)

Expand Down
2 changes: 1 addition & 1 deletion mindtorch/distributed/c10d/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .store import Store
from .store import Store, TCPStore, FileStore
from .prefix_store import PrefixStore
from .types import *
from .process_group import ProcessGroup
Expand Down
45 changes: 44 additions & 1 deletion mindtorch/distributed/c10d/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import time
from typing import List, Optional, Callable
from abc import ABC, abstractmethod
from datetime import timedelta
try:
from mindspore.mint.distributed.distributed import TCPStore as MsTCPStore
except:
MsTCPStore = None

class Store:
kDefaultTimeout = 300 # in seconds
Expand Down Expand Up @@ -98,3 +102,42 @@ def __copy__(self):

def __move__(self):
raise NotImplementedError("Moving not allowed")

class TCPStore(Store):
def __init__(
self,
host_name: str,
port: int,
world_size: Optional[int] = None,
is_master: bool = False,
timeout: timedelta = timedelta(seconds=300),
wait_for_workers: bool = True,
multi_tenant: bool = False,
master_listen_fd: Optional[int] = None,
use_libuv: bool = True
) -> None:
super().__init__(timeout)
self.ms_store = MsTCPStore(host_name, port, world_size, is_master, timeout, wait_for_workers, multi_tenant, master_listen_fd, use_libuv)

@property
def host(self) -> str:
return self.ms_store.host

@property
def port(self) -> int:
return self.ms_store.port

def set(self, key: str, value: str) -> None:
self.ms_store.set(key, value)

def add(self, key: str, value: int) -> int:
return self.ms_store.add(key, value)

def get(self, key: str) -> bytes:
return self.ms_store.get(key)

def delete_key(self, key: str) -> bool:
return self.ms_store.delete_key(key)

class FileStore(Store):
def __init__(self, path: str, numWorkers: int = ...): ...
2 changes: 1 addition & 1 deletion mindtorch/distributed/distributed_c10d.py
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,7 @@ def _new_process_group_helper(
"created, please use a different group name"
)

if device_id is not None and (device_id.index is None or device_id.type != "cuda"):
if device_id is not None and (device_id.index is None):
raise ValueError(
"init_process_group device_id parameter must be a cuda device with an "
"id, e.g. cuda:0, not just cuda or cpu"
Expand Down
77 changes: 0 additions & 77 deletions mindtorch/distributed/elastic/__init__.py

This file was deleted.

41 changes: 0 additions & 41 deletions mindtorch/distributed/elastic/agent/server/__init__.py

This file was deleted.

Loading