Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero infinity xpu support #4130

Merged
merged 19 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion accelerator/abstract_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ def LongTensor(self):
...

@abc.abstractmethod
def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
...

@abc.abstractmethod
def is_pinned(self, tensor):
...

@abc.abstractmethod
Expand Down
5 changes: 4 additions & 1 deletion accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,12 @@ def IntTensor(self):
def LongTensor(self):
return torch.LongTensor

def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor

def is_pinned(self, tensor):
return tensor.is_pinned()

def op_builder_dir(self):
try:
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed
Expand Down
5 changes: 4 additions & 1 deletion accelerator/cuda_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,12 @@ def IntTensor(self):
def LongTensor(self):
return torch.cuda.LongTensor

def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()

def is_pinned(self, tensor):
return tensor.is_pinned()

def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith('cuda:'):
Expand Down
5 changes: 4 additions & 1 deletion accelerator/mps_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,12 @@ def IntTensor(self):
def LongTensor(self):
return

def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()

def is_pinned(self, tensor):
return tensor.is_pinned()

def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith("mps"):
Expand Down
5 changes: 4 additions & 1 deletion accelerator/npu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ def IntTensor(self):
def LongTensor(self):
return torch.npu.LongTensor

def pin_memory(self, tensor):
def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory()

def is_pinned(self, tensor):
return tensor.is_pinned()

def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith('npu:'):
Expand Down
9 changes: 6 additions & 3 deletions csrc/aio/common/deepspeed_aio_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ static int _do_io_complete(const long long int min_completes,
std::vector<std::chrono::duration<double>>& reap_times)
{
const auto start_time = std::chrono::high_resolution_clock::now();
const auto n_completes = io_getevents(
aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr);
long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);

assert(n_completes >= min_completes);
return n_completes;
}
Expand Down
4 changes: 3 additions & 1 deletion csrc/aio/py_lib/deepspeed_aio_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
_num_bytes(num_bytes),
_validate(validate)
{
_cpu_buffer = _buffer.is_cuda() ? _buffer.to(torch::kCPU).pin_memory() : _buffer;
_cpu_buffer = (_buffer.is_cuda() || _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory()
: _buffer;
_contiguous_buffer = _cpu_buffer.contiguous();
}

Expand All @@ -33,6 +34,7 @@ char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr
void io_op_desc_t::fini()
{
if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
if (_read_op && _buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); }
}

deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config)
Expand Down
3 changes: 2 additions & 1 deletion deepspeed/runtime/swap_tensor/async_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from deepspeed import comm as dist
from deepspeed.utils.logging import logger
from deepspeed.runtime.swap_tensor.utils import swap_out_tensors, SwapBuffer
from deepspeed.accelerator import get_accelerator

INVALID_BUFFER_INDEX = -1
ASYNC_SWAPPER_WAIT_TIMER = 'async_swap_gradient_wait'
Expand Down Expand Up @@ -37,7 +38,7 @@ def has_buffers(self):

def add_buffers(self, buffer_list):
assert len(self.all_buffers) == 0
assert all([buffer.is_pinned() for buffer in buffer_list])
assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list])
dtype = buffer_list[0].dtype
assert all([buffer.dtype == dtype for buffer in buffer_list])

Expand Down
5 changes: 3 additions & 2 deletions deepspeed/runtime/swap_tensor/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from deepspeed.runtime.swap_tensor.utils import swap_in_tensors, swap_out_tensors, \
MIN_AIO_BYTES, AIO_ALIGNED_BYTES, get_sized_buffers
from deepspeed.runtime.swap_tensor.utils import SwapBufferManager, SwapBufferPool
from deepspeed.accelerator import get_accelerator


class FlattenedTensorSwapInfo(object):
Expand Down Expand Up @@ -90,7 +91,7 @@ def get_swap_gradient_paths(self):
return [grad.path for grad in self.swapped_gradients.values()]

def get_unpinned_state_tensors(self):
return [t for t in self.tensors if not t.is_pinned()]
return [t for t in self.tensors if not get_accelerator().is_pinned(t)]

def read_unswapped_gradients(self, dest_buffer):
num_elem_count = 0
Expand Down Expand Up @@ -216,7 +217,7 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info,
fp16_pinned_buffers, fp32_parameters):
assert len(fp32_parameters) == len(fp16_partitions_info)
assert len(fp32_parameters) == len(fp16_num_elems)
assert all([buffer.is_pinned() for buffer in fp16_pinned_buffers])
assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers])

fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
get_sized_buffers
from deepspeed.runtime.swap_tensor.async_swapper import AsyncTensorSwapper
from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper
from deepspeed.accelerator import get_accelerator

DEBUG_MODE = False

Expand Down Expand Up @@ -174,7 +175,7 @@ def _separate_pinned_tensors(self, swap_info):
unpinned_paths = []

for tensor, path in zip(swap_info.tensors, swap_info.swap_paths):
if tensor.is_pinned():
if get_accelerator().is_pinned(tensor):
pinned_tensors.append(tensor)
pinned_paths.append(path)
else:
Expand Down Expand Up @@ -206,7 +207,7 @@ def _swap_in_gradients(self, aio_handle, parameter, dest_buffer):
if not (swap_info and swap_info.has_gradients()):
return

assert dest_buffer.is_pinned()
assert get_accelerator().is_pinned(dest_buffer)
assert parameter.numel() <= dest_buffer.numel()

parameter.grad = dest_buffer.narrow(0, 0, parameter.numel())
Expand Down
18 changes: 11 additions & 7 deletions deepspeed/runtime/swap_tensor/partitioned_param_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ def _configure_aio(self, ds_config):

self.available_buffer_ids = [i for i in range(self.param_buffer_count)]
self.reserved_buffer_ids = []
self.buffers = get_accelerator().pin_memory(
torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count),
dtype=self.dtype,
requires_grad=False))
self.buffers = get_accelerator().pin_memory(torch.empty(int(self.aligned_elements_per_buffer *
self.param_buffer_count),
dtype=self.dtype,
requires_grad=False),
align_bytes=0)

self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH],
self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS],
Expand Down Expand Up @@ -313,7 +314,8 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None):
def swap_into_buffer(self, param, dest_buffer):
assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight"

require_swap_buffer = not (dest_buffer.is_pinned() and self._is_io_aligned(dest_buffer.numel()))
require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer)
and self._is_io_aligned(dest_buffer.numel()))

if require_swap_buffer:
assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}."
Expand Down Expand Up @@ -378,8 +380,10 @@ def _is_io_aligned(self, numel):

def reserve_partitioned_swap_space(self, partition_num_elems):
aligned_numel = sum([self._io_aligned_numel(numel) for numel in partition_num_elems])
self.partitioned_swap_buffer = get_accelerator().pin_memory(
torch.zeros(aligned_numel, device='cpu', dtype=self.dtype))
self.partitioned_swap_buffer = get_accelerator().pin_memory(torch.zeros(aligned_numel,
device='cpu',
dtype=self.dtype),
align_bytes=0)
self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer])

def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params):
Expand Down
5 changes: 3 additions & 2 deletions deepspeed/runtime/swap_tensor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_swap_path(self, offset):
class SwapBufferPool(object):

def __init__(self, buffers):
assert all([buf.is_pinned() for buf in buffers])
assert all([get_accelerator().is_pinned(buf) for buf in buffers])
self.buffers = [SwapBuffer(buf) for buf in buffers]
self.current_index = 0

Expand Down Expand Up @@ -184,7 +184,8 @@ def __init__(self, num_elems, count, dtype):
self.count = count
self.dtype = dtype
self.all_buffers = [
get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype)) for _ in range(count)
get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), align_bytes=0)
for _ in range(count)
]
self.free_buffer_index = [i for i in range(count)]
self.used_buffer_index = {}
Expand Down
2 changes: 1 addition & 1 deletion op_builder/async_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def is_compatible(self, verbose=True):
# which is a function provided by libaio that is used in the async_io op.
# If needed, one can define -I and -L entries in CFLAGS and LDFLAGS
# respectively to specify the directories for libaio.h and libaio.so.
aio_compatible = self.has_function('io_submit', ('aio', ))
aio_compatible = self.has_function('io_pgetevents', ('aio', ))
if verbose and not aio_compatible:
self.warning(f"{self.NAME} requires the dev libaio .so object and headers but these were not found.")

Expand Down
Loading