From 6fc7fcb4b8a0a296b9513590d49be4ae2a33096d Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Wed, 9 Aug 2023 22:14:23 -0700 Subject: [PATCH 01/13] zero infinity xpu support --- csrc/aio/common/deepspeed_aio_common.cpp | 13 +++++-- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 35 ++++++++++++++++--- csrc/aio/py_lib/deepspeed_aio_thread.h | 3 ++ csrc/aio/py_lib/deepspeed_py_aio_handle.cpp | 2 +- .../runtime/swap_tensor/async_swapper.py | 3 +- .../runtime/swap_tensor/optimizer_utils.py | 8 ++--- .../partitioned_optimizer_swapper.py | 5 +-- .../swap_tensor/partitioned_param_swapper.py | 2 +- deepspeed/runtime/swap_tensor/utils.py | 2 +- 9 files changed, 57 insertions(+), 16 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_common.cpp b/csrc/aio/common/deepspeed_aio_common.cpp index f35760a99a5c..ce9ce640ab15 100644 --- a/csrc/aio/common/deepspeed_aio_common.cpp +++ b/csrc/aio/common/deepspeed_aio_common.cpp @@ -109,16 +109,25 @@ static void _do_io_submit_block(const long long int n_iocbs, assert(submit_ret > 0); } +const char *ds_acc_env_var = "DS_ACCELERATOR"; static int _do_io_complete(const long long int min_completes, const long long int max_completes, std::unique_ptr& aio_ctxt, std::vector>& reap_times) { const auto start_time = std::chrono::high_resolution_clock::now(); - const auto n_completes = io_getevents( + long long int n_completes; + if(std::getenv(ds_acc_env_var) == "xpu") + { + n_completes = io_pgetevents( + aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), NULL, NULL); + } + else + { + n_completes = io_getevents( aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr); + } reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time); - assert(n_completes >= min_completes); return n_completes; } diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 055db8798a6b..a77e5d28b2a9 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -22,17 +22,44 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _fd(fd), _filename(filename), _num_bytes(num_bytes), - _validate(validate) + _validate(validate), + _use_aligned_buffer(false) { - _cpu_buffer = _buffer.is_cuda() ? _buffer.to(torch::kCPU).pin_memory() : _buffer; - _contiguous_buffer = _cpu_buffer.contiguous(); + const char *ds_acc_env_var = "DS_ACCELERATOR"; + if (_buffer.is_cuda()) + { + _cpu_buffer = _buffer.to(torch::kCPU).pin_memory(); + _contiguous_buffer = _cpu_buffer.contiguous(); + } + else if (std::getenv(ds_acc_env_var) == "xpu") + { + _cpu_buffer = _buffer.to(torch::kCPU); + _contiguous_buffer = _cpu_buffer.contiguous(); + posix_memalign((void**)&_aligned_buffer, (size_t)sysconf(_SC_PAGESIZE), _num_bytes); + memcpy(_aligned_buffer, _contiguous_buffer.data_ptr(), _num_bytes); + _use_aligned_buffer = true; + } + else + { + _cpu_buffer = _buffer; + _contiguous_buffer = _cpu_buffer.contiguous(); + } } -char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr(); } +char* io_op_desc_t::data_ptr() const { + if (_use_aligned_buffer) + return _aligned_buffer; + else (char*)_contiguous_buffer.data_ptr(); +} void io_op_desc_t::fini() { if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); } + if (_read_op && _use_aligned_buffer) + { + memcpy(_contiguous_buffer.data_ptr(), _aligned_buffer, _num_bytes); + _buffer.copy_(_contiguous_buffer.to(torch::kXPU)); + } } deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config) diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.h b/csrc/aio/py_lib/deepspeed_aio_thread.h index 20799ecbb018..66fcccd52498 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.h +++ b/csrc/aio/py_lib/deepspeed_aio_thread.h @@ -22,6 +22,9 @@ struct io_op_desc_t { torch::Tensor _contiguous_buffer; const bool _validate; + char* _aligned_buffer; + bool _use_aligned_buffer; + io_op_desc_t(const bool read_op, const torch::Tensor& buffer, const int fd, diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp index c21e92de9449..d12d9f8242ff 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp @@ -173,7 +173,7 @@ void deepspeed_aio_handle_t::_stop_threads() int deepspeed_aio_handle_t::wait() { - assert(_num_pending_ops > 0); + // assert(_num_pending_ops > 0); auto num_completed_ops = 0; while (_num_pending_ops > 0) { diff --git a/deepspeed/runtime/swap_tensor/async_swapper.py b/deepspeed/runtime/swap_tensor/async_swapper.py index 152b5b70a515..b808721537fe 100644 --- a/deepspeed/runtime/swap_tensor/async_swapper.py +++ b/deepspeed/runtime/swap_tensor/async_swapper.py @@ -10,6 +10,7 @@ from deepspeed import comm as dist from deepspeed.utils.logging import logger from deepspeed.runtime.swap_tensor.utils import swap_out_tensors, SwapBuffer +from deepspeed.accelerator import get_accelerator INVALID_BUFFER_INDEX = -1 ASYNC_SWAPPER_WAIT_TIMER = 'async_swap_gradient_wait' @@ -37,7 +38,7 @@ def has_buffers(self): def add_buffers(self, buffer_list): assert len(self.all_buffers) == 0 - assert all([buffer.is_pinned() for buffer in buffer_list]) + assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list]) dtype = buffer_list[0].dtype assert all([buffer.dtype == dtype for buffer in buffer_list]) diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index 12be256f8055..a01742f6a1f2 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -15,7 +15,7 @@ from deepspeed.runtime.swap_tensor.utils import swap_in_tensors, swap_out_tensors, \ MIN_AIO_BYTES, AIO_ALIGNED_BYTES, get_sized_buffers from deepspeed.runtime.swap_tensor.utils import SwapBufferManager, SwapBufferPool - +from deepspeed.accelerator import get_accelerator class FlattenedTensorSwapInfo(object): @@ -90,7 +90,7 @@ def get_swap_gradient_paths(self): return [grad.path for grad in self.swapped_gradients.values()] def get_unpinned_state_tensors(self): - return [t for t in self.tensors if not t.is_pinned()] + return [t for t in self.tensors if not get_accelerator().is_pinned(t)] def read_unswapped_gradients(self, dest_buffer): num_elem_count = 0 @@ -216,7 +216,7 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info, fp16_pinned_buffers, fp32_parameters): assert len(fp32_parameters) == len(fp16_partitions_info) assert len(fp32_parameters) == len(fp16_num_elems) - assert all([buffer.is_pinned() for buffer in fp16_pinned_buffers]) + assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers]) fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems) @@ -363,7 +363,7 @@ def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, p for dst, src in zip(compute_buffers, src_tensors): dst.data.copy_(src.data) - swap_lengths = [self._io_aligned_numel(t.numel()) for t in src_tensors] + swap_lengths = [self._io_aligned_numel(unpinned_tensors[-1].numel())] * len(src_tensors) swap_buffers = get_sized_buffers(pinned_buffers, swap_lengths) swap_paths = dest_paths[i:(i + swap_tensor_count)] diff --git a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py index 677bc2aa4a8e..e7bf06043fd7 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py @@ -17,6 +17,7 @@ get_sized_buffers from deepspeed.runtime.swap_tensor.async_swapper import AsyncTensorSwapper from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper +from deepspeed.accelerator import get_accelerator DEBUG_MODE = False @@ -174,7 +175,7 @@ def _separate_pinned_tensors(self, swap_info): unpinned_paths = [] for tensor, path in zip(swap_info.tensors, swap_info.swap_paths): - if tensor.is_pinned(): + if get_accelerator().is_pinned(tensor): pinned_tensors.append(tensor) pinned_paths.append(path) else: @@ -206,7 +207,7 @@ def _swap_in_gradients(self, aio_handle, parameter, dest_buffer): if not (swap_info and swap_info.has_gradients()): return - assert dest_buffer.is_pinned() + assert get_accelerator().is_pinned(dest_buffer) assert parameter.numel() <= dest_buffer.numel() parameter.grad = dest_buffer.narrow(0, 0, parameter.numel()) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 4109e0954148..b68fc2c5f088 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -313,7 +313,7 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None): def swap_into_buffer(self, param, dest_buffer): assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight" - require_swap_buffer = not (dest_buffer.is_pinned() and self._is_io_aligned(dest_buffer.numel())) + require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer) and self._is_io_aligned(dest_buffer.numel())) if require_swap_buffer: assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}." diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 50a88f74351a..43bf9ab75bf0 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -96,7 +96,7 @@ def get_swap_path(self, offset): class SwapBufferPool(object): def __init__(self, buffers): - assert all([buf.is_pinned() for buf in buffers]) + assert all([get_accelerator().is_pinned(buf) for buf in buffers]) self.buffers = [SwapBuffer(buf) for buf in buffers] self.current_index = 0 From f5f9fc2c81552f345751d62deaf36b5d8cf18207 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Mon, 28 Aug 2023 03:53:47 -0700 Subject: [PATCH 02/13] remove env var depends --- csrc/aio/common/deepspeed_aio_common.cpp | 11 +------ csrc/aio/py_lib/deepspeed_aio_thread.cpp | 29 +++++-------------- csrc/aio/py_lib/deepspeed_py_aio_handle.cpp | 2 +- .../runtime/swap_tensor/optimizer_utils.py | 7 +++-- .../partitioned_optimizer_swapper.py | 4 ++- 5 files changed, 18 insertions(+), 35 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_common.cpp b/csrc/aio/common/deepspeed_aio_common.cpp index ce9ce640ab15..28315ae429e9 100644 --- a/csrc/aio/common/deepspeed_aio_common.cpp +++ b/csrc/aio/common/deepspeed_aio_common.cpp @@ -116,17 +116,8 @@ static int _do_io_complete(const long long int min_completes, std::vector>& reap_times) { const auto start_time = std::chrono::high_resolution_clock::now(); - long long int n_completes; - if(std::getenv(ds_acc_env_var) == "xpu") - { - n_completes = io_pgetevents( + long long int n_completes = io_pgetevents( aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), NULL, NULL); - } - else - { - n_completes = io_getevents( - aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr); - } reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time); assert(n_completes >= min_completes); return n_completes; diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index a77e5d28b2a9..e09248bf9ebd 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -25,41 +25,28 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _validate(validate), _use_aligned_buffer(false) { - const char *ds_acc_env_var = "DS_ACCELERATOR"; - if (_buffer.is_cuda()) + _cpu_buffer = (_buffer.is_cuda() | _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory() : _buffer; + _contiguous_buffer = _cpu_buffer.contiguous(); + if (at::detail::getXPUHooks().hasXPU()) { - _cpu_buffer = _buffer.to(torch::kCPU).pin_memory(); - _contiguous_buffer = _cpu_buffer.contiguous(); - } - else if (std::getenv(ds_acc_env_var) == "xpu") - { - _cpu_buffer = _buffer.to(torch::kCPU); - _contiguous_buffer = _cpu_buffer.contiguous(); posix_memalign((void**)&_aligned_buffer, (size_t)sysconf(_SC_PAGESIZE), _num_bytes); memcpy(_aligned_buffer, _contiguous_buffer.data_ptr(), _num_bytes); _use_aligned_buffer = true; } - else - { - _cpu_buffer = _buffer; - _contiguous_buffer = _cpu_buffer.contiguous(); - } } -char* io_op_desc_t::data_ptr() const { +char* io_op_desc_t::data_ptr() const +{ if (_use_aligned_buffer) return _aligned_buffer; - else (char*)_contiguous_buffer.data_ptr(); + else return (char*)_contiguous_buffer.data_ptr(); } void io_op_desc_t::fini() { if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); } - if (_read_op && _use_aligned_buffer) - { - memcpy(_contiguous_buffer.data_ptr(), _aligned_buffer, _num_bytes); - _buffer.copy_(_contiguous_buffer.to(torch::kXPU)); - } + if (_read_op && _use_aligned_buffer) { memcpy(_contiguous_buffer.data_ptr(), _aligned_buffer, _num_bytes); } + if (_read_op && _buffer.is_xpu()){ _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } } deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config) diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp index d12d9f8242ff..c21e92de9449 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp @@ -173,7 +173,7 @@ void deepspeed_aio_handle_t::_stop_threads() int deepspeed_aio_handle_t::wait() { - // assert(_num_pending_ops > 0); + assert(_num_pending_ops > 0); auto num_completed_ops = 0; while (_num_pending_ops > 0) { diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index a01742f6a1f2..28e4e69aff7b 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -348,7 +348,7 @@ def _get_swap_paths(self, parameters, num_elems): swap_paths = [info.swap_paths[0] for info in swap_info_list] return swap_paths - def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, pinned_buffers): + def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, pinned_buffers, aligned_numel=None): swap_buffer_count = len(pinned_buffers) unpinned_tensor_count = len(unpinned_tensors) @@ -363,7 +363,10 @@ def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, p for dst, src in zip(compute_buffers, src_tensors): dst.data.copy_(src.data) - swap_lengths = [self._io_aligned_numel(unpinned_tensors[-1].numel())] * len(src_tensors) + if aligned_numel is not None: + swap_lengths = [aligned_numel] * len(src_tensors) + else: + swap_lengths = [self._io_aligned_numel(t.numel()) for t in src_tensors] swap_buffers = get_sized_buffers(pinned_buffers, swap_lengths) swap_paths = dest_paths[i:(i + swap_tensor_count)] diff --git a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py index e7bf06043fd7..a2e04bd39900 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py @@ -108,10 +108,12 @@ def swap_out_optimizer_state(self, parameter, async_swap=False): if len(unpinned_tensors) > 0: pinned_buffers = self.swap_buffer_manager.allocate_all(num_elems=self.largest_numel, dtype=self.dtype) + swap_info_numel = self._io_aligned_numel(swap_info.numel()) self._swap_out_unpinned_tensors(aio_handle=self.aio_handle, unpinned_tensors=unpinned_tensors, dest_paths=unpinned_paths, - pinned_buffers=pinned_buffers) + pinned_buffers=pinned_buffers, + aligned_numel=swap_info_numel) self.allocated_swap_buffers += pinned_buffers for t in unpinned_tensors: From 033df2d48e6ce9c2ba976b9f8e77194a9b19f775 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 00:56:18 -0700 Subject: [PATCH 03/13] client align mem --- csrc/aio/common/deepspeed_aio_common.cpp | 1 - csrc/aio/py_lib/deepspeed_aio_thread.cpp | 17 ++--------------- csrc/aio/py_lib/deepspeed_aio_thread.h | 3 --- deepspeed/runtime/swap_tensor/async_swapper.py | 2 +- .../runtime/swap_tensor/optimizer_utils.py | 11 ++++------- .../partitioned_optimizer_swapper.py | 8 +++----- .../swap_tensor/partitioned_param_swapper.py | 10 +++++----- deepspeed/runtime/swap_tensor/utils.py | 4 ++-- 8 files changed, 17 insertions(+), 39 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_common.cpp b/csrc/aio/common/deepspeed_aio_common.cpp index 28315ae429e9..2e37e8851e4f 100644 --- a/csrc/aio/common/deepspeed_aio_common.cpp +++ b/csrc/aio/common/deepspeed_aio_common.cpp @@ -109,7 +109,6 @@ static void _do_io_submit_block(const long long int n_iocbs, assert(submit_ret > 0); } -const char *ds_acc_env_var = "DS_ACCELERATOR"; static int _do_io_complete(const long long int min_completes, const long long int max_completes, std::unique_ptr& aio_ctxt, diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index e09248bf9ebd..4d189464e6a2 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -22,30 +22,17 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _fd(fd), _filename(filename), _num_bytes(num_bytes), - _validate(validate), - _use_aligned_buffer(false) + _validate(validate) { _cpu_buffer = (_buffer.is_cuda() | _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory() : _buffer; _contiguous_buffer = _cpu_buffer.contiguous(); - if (at::detail::getXPUHooks().hasXPU()) - { - posix_memalign((void**)&_aligned_buffer, (size_t)sysconf(_SC_PAGESIZE), _num_bytes); - memcpy(_aligned_buffer, _contiguous_buffer.data_ptr(), _num_bytes); - _use_aligned_buffer = true; - } } -char* io_op_desc_t::data_ptr() const -{ - if (_use_aligned_buffer) - return _aligned_buffer; - else return (char*)_contiguous_buffer.data_ptr(); -} +char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr(); } void io_op_desc_t::fini() { if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); } - if (_read_op && _use_aligned_buffer) { memcpy(_contiguous_buffer.data_ptr(), _aligned_buffer, _num_bytes); } if (_read_op && _buffer.is_xpu()){ _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } } diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.h b/csrc/aio/py_lib/deepspeed_aio_thread.h index 66fcccd52498..20799ecbb018 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.h +++ b/csrc/aio/py_lib/deepspeed_aio_thread.h @@ -22,9 +22,6 @@ struct io_op_desc_t { torch::Tensor _contiguous_buffer; const bool _validate; - char* _aligned_buffer; - bool _use_aligned_buffer; - io_op_desc_t(const bool read_op, const torch::Tensor& buffer, const int fd, diff --git a/deepspeed/runtime/swap_tensor/async_swapper.py b/deepspeed/runtime/swap_tensor/async_swapper.py index b808721537fe..965bbd1839a5 100644 --- a/deepspeed/runtime/swap_tensor/async_swapper.py +++ b/deepspeed/runtime/swap_tensor/async_swapper.py @@ -38,7 +38,7 @@ def has_buffers(self): def add_buffers(self, buffer_list): assert len(self.all_buffers) == 0 - assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list]) + assert all([get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) for buffer in buffer_list]) dtype = buffer_list[0].dtype assert all([buffer.dtype == dtype for buffer in buffer_list]) diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index 28e4e69aff7b..539d50f85964 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -90,7 +90,7 @@ def get_swap_gradient_paths(self): return [grad.path for grad in self.swapped_gradients.values()] def get_unpinned_state_tensors(self): - return [t for t in self.tensors if not get_accelerator().is_pinned(t)] + return [t for t in self.tensors if not (get_accelerator().is_pinned(t) or get_accelerator().is_aligned(t))] def read_unswapped_gradients(self, dest_buffer): num_elem_count = 0 @@ -216,7 +216,7 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info, fp16_pinned_buffers, fp32_parameters): assert len(fp32_parameters) == len(fp16_partitions_info) assert len(fp32_parameters) == len(fp16_num_elems) - assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers]) + assert all([get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer)for buffer in fp16_pinned_buffers]) fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems) @@ -348,7 +348,7 @@ def _get_swap_paths(self, parameters, num_elems): swap_paths = [info.swap_paths[0] for info in swap_info_list] return swap_paths - def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, pinned_buffers, aligned_numel=None): + def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, pinned_buffers): swap_buffer_count = len(pinned_buffers) unpinned_tensor_count = len(unpinned_tensors) @@ -363,10 +363,7 @@ def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, p for dst, src in zip(compute_buffers, src_tensors): dst.data.copy_(src.data) - if aligned_numel is not None: - swap_lengths = [aligned_numel] * len(src_tensors) - else: - swap_lengths = [self._io_aligned_numel(t.numel()) for t in src_tensors] + swap_lengths = [self._io_aligned_numel(t.numel()) for t in src_tensors] swap_buffers = get_sized_buffers(pinned_buffers, swap_lengths) swap_paths = dest_paths[i:(i + swap_tensor_count)] diff --git a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py index a2e04bd39900..31cb374cbe1d 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py @@ -108,12 +108,10 @@ def swap_out_optimizer_state(self, parameter, async_swap=False): if len(unpinned_tensors) > 0: pinned_buffers = self.swap_buffer_manager.allocate_all(num_elems=self.largest_numel, dtype=self.dtype) - swap_info_numel = self._io_aligned_numel(swap_info.numel()) self._swap_out_unpinned_tensors(aio_handle=self.aio_handle, unpinned_tensors=unpinned_tensors, dest_paths=unpinned_paths, - pinned_buffers=pinned_buffers, - aligned_numel=swap_info_numel) + pinned_buffers=pinned_buffers) self.allocated_swap_buffers += pinned_buffers for t in unpinned_tensors: @@ -177,7 +175,7 @@ def _separate_pinned_tensors(self, swap_info): unpinned_paths = [] for tensor, path in zip(swap_info.tensors, swap_info.swap_paths): - if get_accelerator().is_pinned(tensor): + if get_accelerator().is_pinned(tensor) or get_accelerator().is_aligned(tensor): pinned_tensors.append(tensor) pinned_paths.append(path) else: @@ -209,7 +207,7 @@ def _swap_in_gradients(self, aio_handle, parameter, dest_buffer): if not (swap_info and swap_info.has_gradients()): return - assert get_accelerator().is_pinned(dest_buffer) + assert get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer) assert parameter.numel() <= dest_buffer.numel() parameter.grad = dest_buffer.narrow(0, 0, parameter.numel()) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index b68fc2c5f088..13c8e2ca3223 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -104,10 +104,10 @@ def _configure_aio(self, ds_config): self.available_buffer_ids = [i for i in range(self.param_buffer_count)] self.reserved_buffer_ids = [] - self.buffers = get_accelerator().pin_memory( + self.buffers = get_accelerator().align_memory(get_accelerator().pin_memory( torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count), dtype=self.dtype, - requires_grad=False)) + requires_grad=False))) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -313,7 +313,7 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None): def swap_into_buffer(self, param, dest_buffer): assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight" - require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer) and self._is_io_aligned(dest_buffer.numel())) + require_swap_buffer = not ((get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer)) and self._is_io_aligned(dest_buffer.numel())) if require_swap_buffer: assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}." @@ -378,8 +378,8 @@ def _is_io_aligned(self, numel): def reserve_partitioned_swap_space(self, partition_num_elems): aligned_numel = sum([self._io_aligned_numel(numel) for numel in partition_num_elems]) - self.partitioned_swap_buffer = get_accelerator().pin_memory( - torch.zeros(aligned_numel, device='cpu', dtype=self.dtype)) + self.partitioned_swap_buffer = get_accelerator().align_memory(get_accelerator().pin_memory( + torch.zeros(aligned_numel, device='cpu', dtype=self.dtype))) self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer]) def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params): diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 43bf9ab75bf0..a446ad09dbd5 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -96,7 +96,7 @@ def get_swap_path(self, offset): class SwapBufferPool(object): def __init__(self, buffers): - assert all([get_accelerator().is_pinned(buf) for buf in buffers]) + assert all([get_accelerator().is_pinned(buf) or get_accelerator().is_aligned(buf) for buf in buffers]) self.buffers = [SwapBuffer(buf) for buf in buffers] self.current_index = 0 @@ -184,7 +184,7 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype)) for _ in range(count) + get_accelerator().align_memory(get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype))) for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] self.used_buffer_index = {} From acbe13c9ca8afa2581310e130d5f092ba20f32c2 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 01:08:17 -0700 Subject: [PATCH 04/13] sync with all accelerators' --- accelerator/abstract_accelerator.py | 12 ++++++++++++ accelerator/cpu_accelerator.py | 9 +++++++++ accelerator/cuda_accelerator.py | 9 +++++++++ accelerator/mps_accelerator.py | 9 +++++++++ accelerator/npu_accelerator.py | 9 +++++++++ 5 files changed, 48 insertions(+) diff --git a/accelerator/abstract_accelerator.py b/accelerator/abstract_accelerator.py index fe0e66768d45..c4e2b3ef7fa5 100644 --- a/accelerator/abstract_accelerator.py +++ b/accelerator/abstract_accelerator.py @@ -224,6 +224,18 @@ def LongTensor(self): @abc.abstractmethod def pin_memory(self, tensor): ... + + @abc.abstractmethod + def is_pinned(self, tensor): + ... + + @abc.abstractmethod + def align_memory(self, tensor): + ... + + @abc.abstractmethod + def is_aligned(self, tensor): + ... @abc.abstractmethod def on_accelerator(self, tensor): diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index 11518d31e069..8c39d8fb51b4 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -224,6 +224,15 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor + + def is_pinned(self, tensor): + return tensor.is_pinned() + + def align_memory(self, tensor): + return tensor + + def is_aligned(self, tensor): + return False def op_builder_dir(self): try: diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 9c1e0d22785e..87474204f867 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -205,6 +205,15 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() + + def is_pinned(self, tensor): + return tensor.is_pinned() + + def align_memory(self, tensor): + return tensor + + def is_aligned(self, tensor): + return False def on_accelerator(self, tensor): device_str = str(tensor.device) diff --git a/accelerator/mps_accelerator.py b/accelerator/mps_accelerator.py index 68e3b6df05c9..56ba3e18e54b 100644 --- a/accelerator/mps_accelerator.py +++ b/accelerator/mps_accelerator.py @@ -188,6 +188,15 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() + + def is_pinned(self, tensor): + return tensor.is_pinned() + + def align_memory(self, tensor): + return tensor + + def is_aligned(self, tensor): + return False def on_accelerator(self, tensor): device_str = str(tensor.device) diff --git a/accelerator/npu_accelerator.py b/accelerator/npu_accelerator.py index 5678a0266386..dd00884aaa74 100644 --- a/accelerator/npu_accelerator.py +++ b/accelerator/npu_accelerator.py @@ -190,6 +190,15 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() + + def is_pinned(self, tensor): + return tensor.is_pinned() + + def align_memory(self, tensor): + return tensor + + def is_aligned(self, tensor): + return False def on_accelerator(self, tensor): device_str = str(tensor.device) From 9ee2dae37876dd449e972a7c01f9458c734c01b5 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 01:24:23 -0700 Subject: [PATCH 05/13] format fix --- accelerator/abstract_accelerator.py | 6 +++--- accelerator/cpu_accelerator.py | 6 +++--- accelerator/cuda_accelerator.py | 6 +++--- accelerator/mps_accelerator.py | 6 +++--- accelerator/npu_accelerator.py | 6 +++--- deepspeed/runtime/swap_tensor/async_swapper.py | 3 ++- deepspeed/runtime/swap_tensor/optimizer_utils.py | 6 +++++- deepspeed/runtime/swap_tensor/partitioned_param_swapper.py | 6 ++++-- deepspeed/runtime/swap_tensor/utils.py | 3 ++- 9 files changed, 28 insertions(+), 20 deletions(-) diff --git a/accelerator/abstract_accelerator.py b/accelerator/abstract_accelerator.py index c4e2b3ef7fa5..b5a5b0659895 100644 --- a/accelerator/abstract_accelerator.py +++ b/accelerator/abstract_accelerator.py @@ -224,15 +224,15 @@ def LongTensor(self): @abc.abstractmethod def pin_memory(self, tensor): ... - + @abc.abstractmethod def is_pinned(self, tensor): ... - + @abc.abstractmethod def align_memory(self, tensor): ... - + @abc.abstractmethod def is_aligned(self, tensor): ... diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index 8c39d8fb51b4..c09366be055d 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -224,13 +224,13 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor - + def is_pinned(self, tensor): return tensor.is_pinned() - + def align_memory(self, tensor): return tensor - + def is_aligned(self, tensor): return False diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 87474204f867..4947beabb752 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -205,13 +205,13 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() - + def is_pinned(self, tensor): return tensor.is_pinned() - + def align_memory(self, tensor): return tensor - + def is_aligned(self, tensor): return False diff --git a/accelerator/mps_accelerator.py b/accelerator/mps_accelerator.py index 56ba3e18e54b..01bb0d1e2360 100644 --- a/accelerator/mps_accelerator.py +++ b/accelerator/mps_accelerator.py @@ -188,13 +188,13 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() - + def is_pinned(self, tensor): return tensor.is_pinned() - + def align_memory(self, tensor): return tensor - + def is_aligned(self, tensor): return False diff --git a/accelerator/npu_accelerator.py b/accelerator/npu_accelerator.py index dd00884aaa74..a98535ea0968 100644 --- a/accelerator/npu_accelerator.py +++ b/accelerator/npu_accelerator.py @@ -190,13 +190,13 @@ def LongTensor(self): def pin_memory(self, tensor): return tensor.pin_memory() - + def is_pinned(self, tensor): return tensor.is_pinned() - + def align_memory(self, tensor): return tensor - + def is_aligned(self, tensor): return False diff --git a/deepspeed/runtime/swap_tensor/async_swapper.py b/deepspeed/runtime/swap_tensor/async_swapper.py index 965bbd1839a5..e1ce40856988 100644 --- a/deepspeed/runtime/swap_tensor/async_swapper.py +++ b/deepspeed/runtime/swap_tensor/async_swapper.py @@ -38,7 +38,8 @@ def has_buffers(self): def add_buffers(self, buffer_list): assert len(self.all_buffers) == 0 - assert all([get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) for buffer in buffer_list]) + assert all( + [get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) for buffer in buffer_list]) dtype = buffer_list[0].dtype assert all([buffer.dtype == dtype for buffer in buffer_list]) diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index 539d50f85964..0d1a2586b100 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -17,6 +17,7 @@ from deepspeed.runtime.swap_tensor.utils import SwapBufferManager, SwapBufferPool from deepspeed.accelerator import get_accelerator + class FlattenedTensorSwapInfo(object): def __init__(self, path, length, offset): @@ -216,7 +217,10 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info, fp16_pinned_buffers, fp32_parameters): assert len(fp32_parameters) == len(fp16_partitions_info) assert len(fp32_parameters) == len(fp16_num_elems) - assert all([get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer)for buffer in fp16_pinned_buffers]) + assert all([ + get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) + for buffer in fp16_pinned_buffers + ]) fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 13c8e2ca3223..70c59ca798d2 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -107,7 +107,7 @@ def _configure_aio(self, ds_config): self.buffers = get_accelerator().align_memory(get_accelerator().pin_memory( torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count), dtype=self.dtype, - requires_grad=False))) + requires_grad=False))) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -313,7 +313,9 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None): def swap_into_buffer(self, param, dest_buffer): assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight" - require_swap_buffer = not ((get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer)) and self._is_io_aligned(dest_buffer.numel())) + require_swap_buffer = not ( + (get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer)) + and self._is_io_aligned(dest_buffer.numel())) if require_swap_buffer: assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}." diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index a446ad09dbd5..2947aaaa09c5 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -184,7 +184,8 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().align_memory(get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype))) for _ in range(count) + get_accelerator().align_memory(get_accelerator().pin_memory( + torch.zeros(num_elems, device='cpu', dtype=dtype))) for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] self.used_buffer_index = {} From 0e8ea36571dd45abd5cf1bbb8bc9853bc9f7d250 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 08:03:28 -0700 Subject: [PATCH 06/13] add align in pin_memory api --- accelerator/abstract_accelerator.py | 10 +--------- accelerator/cpu_accelerator.py | 8 +------- accelerator/cuda_accelerator.py | 8 +------- accelerator/mps_accelerator.py | 8 +------- accelerator/npu_accelerator.py | 8 +------- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 5 +++-- deepspeed/runtime/swap_tensor/async_swapper.py | 3 +-- deepspeed/runtime/swap_tensor/optimizer_utils.py | 7 ++----- .../swap_tensor/partitioned_optimizer_swapper.py | 4 ++-- .../runtime/swap_tensor/partitioned_param_swapper.py | 12 +++++------- deepspeed/runtime/swap_tensor/utils.py | 2 +- 11 files changed, 19 insertions(+), 56 deletions(-) diff --git a/accelerator/abstract_accelerator.py b/accelerator/abstract_accelerator.py index b5a5b0659895..1583a097712a 100644 --- a/accelerator/abstract_accelerator.py +++ b/accelerator/abstract_accelerator.py @@ -222,21 +222,13 @@ def LongTensor(self): ... @abc.abstractmethod - def pin_memory(self, tensor): + def pin_memory(self, tensor, align=False): ... @abc.abstractmethod def is_pinned(self, tensor): ... - @abc.abstractmethod - def align_memory(self, tensor): - ... - - @abc.abstractmethod - def is_aligned(self, tensor): - ... - @abc.abstractmethod def on_accelerator(self, tensor): ... diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index c09366be055d..e9f5677d5657 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -222,18 +222,12 @@ def IntTensor(self): def LongTensor(self): return torch.LongTensor - def pin_memory(self, tensor): + def pin_memory(self, tensor, align=False): return tensor def is_pinned(self, tensor): return tensor.is_pinned() - def align_memory(self, tensor): - return tensor - - def is_aligned(self, tensor): - return False - def op_builder_dir(self): try: # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 4947beabb752..1344f3f4970c 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -203,18 +203,12 @@ def IntTensor(self): def LongTensor(self): return torch.cuda.LongTensor - def pin_memory(self, tensor): + def pin_memory(self, tensor, align=False): return tensor.pin_memory() def is_pinned(self, tensor): return tensor.is_pinned() - def align_memory(self, tensor): - return tensor - - def is_aligned(self, tensor): - return False - def on_accelerator(self, tensor): device_str = str(tensor.device) if device_str.startswith('cuda:'): diff --git a/accelerator/mps_accelerator.py b/accelerator/mps_accelerator.py index 01bb0d1e2360..b8b7a71b4ecf 100644 --- a/accelerator/mps_accelerator.py +++ b/accelerator/mps_accelerator.py @@ -186,18 +186,12 @@ def IntTensor(self): def LongTensor(self): return - def pin_memory(self, tensor): + def pin_memory(self, tensor, align=False): return tensor.pin_memory() def is_pinned(self, tensor): return tensor.is_pinned() - def align_memory(self, tensor): - return tensor - - def is_aligned(self, tensor): - return False - def on_accelerator(self, tensor): device_str = str(tensor.device) if device_str.startswith("mps"): diff --git a/accelerator/npu_accelerator.py b/accelerator/npu_accelerator.py index a98535ea0968..e8e3b2176309 100644 --- a/accelerator/npu_accelerator.py +++ b/accelerator/npu_accelerator.py @@ -188,18 +188,12 @@ def IntTensor(self): def LongTensor(self): return torch.npu.LongTensor - def pin_memory(self, tensor): + def pin_memory(self, tensor, align=False): return tensor.pin_memory() def is_pinned(self, tensor): return tensor.is_pinned() - def align_memory(self, tensor): - return tensor - - def is_aligned(self, tensor): - return False - def on_accelerator(self, tensor): device_str = str(tensor.device) if device_str.startswith('npu:'): diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 4d189464e6a2..e9c6a8505858 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -24,7 +24,8 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _num_bytes(num_bytes), _validate(validate) { - _cpu_buffer = (_buffer.is_cuda() | _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory() : _buffer; + _cpu_buffer = (_buffer.is_cuda() || _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory() + : _buffer; _contiguous_buffer = _cpu_buffer.contiguous(); } @@ -33,7 +34,7 @@ char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr void io_op_desc_t::fini() { if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); } - if (_read_op && _buffer.is_xpu()){ _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } + if (_read_op && _buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } } deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config) diff --git a/deepspeed/runtime/swap_tensor/async_swapper.py b/deepspeed/runtime/swap_tensor/async_swapper.py index e1ce40856988..b808721537fe 100644 --- a/deepspeed/runtime/swap_tensor/async_swapper.py +++ b/deepspeed/runtime/swap_tensor/async_swapper.py @@ -38,8 +38,7 @@ def has_buffers(self): def add_buffers(self, buffer_list): assert len(self.all_buffers) == 0 - assert all( - [get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) for buffer in buffer_list]) + assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list]) dtype = buffer_list[0].dtype assert all([buffer.dtype == dtype for buffer in buffer_list]) diff --git a/deepspeed/runtime/swap_tensor/optimizer_utils.py b/deepspeed/runtime/swap_tensor/optimizer_utils.py index 0d1a2586b100..86e43c98e7e5 100644 --- a/deepspeed/runtime/swap_tensor/optimizer_utils.py +++ b/deepspeed/runtime/swap_tensor/optimizer_utils.py @@ -91,7 +91,7 @@ def get_swap_gradient_paths(self): return [grad.path for grad in self.swapped_gradients.values()] def get_unpinned_state_tensors(self): - return [t for t in self.tensors if not (get_accelerator().is_pinned(t) or get_accelerator().is_aligned(t))] + return [t for t in self.tensors if not get_accelerator().is_pinned(t)] def read_unswapped_gradients(self, dest_buffer): num_elem_count = 0 @@ -217,10 +217,7 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info, fp16_pinned_buffers, fp32_parameters): assert len(fp32_parameters) == len(fp16_partitions_info) assert len(fp32_parameters) == len(fp16_num_elems) - assert all([ - get_accelerator().is_pinned(buffer) or get_accelerator().is_aligned(buffer) - for buffer in fp16_pinned_buffers - ]) + assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers]) fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems) diff --git a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py index 31cb374cbe1d..e7bf06043fd7 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_optimizer_swapper.py @@ -175,7 +175,7 @@ def _separate_pinned_tensors(self, swap_info): unpinned_paths = [] for tensor, path in zip(swap_info.tensors, swap_info.swap_paths): - if get_accelerator().is_pinned(tensor) or get_accelerator().is_aligned(tensor): + if get_accelerator().is_pinned(tensor): pinned_tensors.append(tensor) pinned_paths.append(path) else: @@ -207,7 +207,7 @@ def _swap_in_gradients(self, aio_handle, parameter, dest_buffer): if not (swap_info and swap_info.has_gradients()): return - assert get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer) + assert get_accelerator().is_pinned(dest_buffer) assert parameter.numel() <= dest_buffer.numel() parameter.grad = dest_buffer.narrow(0, 0, parameter.numel()) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 70c59ca798d2..3f1872f14a1d 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -104,10 +104,10 @@ def _configure_aio(self, ds_config): self.available_buffer_ids = [i for i in range(self.param_buffer_count)] self.reserved_buffer_ids = [] - self.buffers = get_accelerator().align_memory(get_accelerator().pin_memory( + self.buffers = get_accelerator().pin_memory( torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count), dtype=self.dtype, - requires_grad=False))) + requires_grad=False)) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -313,9 +313,7 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None): def swap_into_buffer(self, param, dest_buffer): assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight" - require_swap_buffer = not ( - (get_accelerator().is_pinned(dest_buffer) or get_accelerator().is_aligned(dest_buffer)) - and self._is_io_aligned(dest_buffer.numel())) + require_swap_buffer = not get_accelerator().is_pinned(dest_buffer) and self._is_io_aligned(dest_buffer.numel()) if require_swap_buffer: assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}." @@ -380,8 +378,8 @@ def _is_io_aligned(self, numel): def reserve_partitioned_swap_space(self, partition_num_elems): aligned_numel = sum([self._io_aligned_numel(numel) for numel in partition_num_elems]) - self.partitioned_swap_buffer = get_accelerator().align_memory(get_accelerator().pin_memory( - torch.zeros(aligned_numel, device='cpu', dtype=self.dtype))) + self.partitioned_swap_buffer = get_accelerator().pin_memory( + torch.zeros(aligned_numel, device='cpu', dtype=self.dtype)) self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer]) def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params): diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 2947aaaa09c5..e22613919dc5 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -96,7 +96,7 @@ def get_swap_path(self, offset): class SwapBufferPool(object): def __init__(self, buffers): - assert all([get_accelerator().is_pinned(buf) or get_accelerator().is_aligned(buf) for buf in buffers]) + assert all([get_accelerator().is_pinned(buf) for buf in buffers]) self.buffers = [SwapBuffer(buf) for buf in buffers] self.current_index = 0 From 418030f1b5edabb6d1038b220317fa5fcb1f7389 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 08:08:08 -0700 Subject: [PATCH 07/13] add missing brackets --- deepspeed/runtime/swap_tensor/partitioned_param_swapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 3f1872f14a1d..11974df2689a 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -313,7 +313,8 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None): def swap_into_buffer(self, param, dest_buffer): assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight" - require_swap_buffer = not get_accelerator().is_pinned(dest_buffer) and self._is_io_aligned(dest_buffer.numel()) + require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer) + and self._is_io_aligned(dest_buffer.numel())) if require_swap_buffer: assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}." From de74ad9ab45684224f28500f0a036e73a73b7724 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 08:11:49 -0700 Subject: [PATCH 08/13] remove align --- deepspeed/runtime/swap_tensor/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index e22613919dc5..43bf9ab75bf0 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -184,8 +184,7 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().align_memory(get_accelerator().pin_memory( - torch.zeros(num_elems, device='cpu', dtype=dtype))) for _ in range(count) + get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype)) for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] self.used_buffer_index = {} From 1b9cdf19c8204bef620f9c1363898b1490615797 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 08:55:10 -0700 Subject: [PATCH 09/13] modify pin_memory api --- accelerator/abstract_accelerator.py | 2 +- accelerator/cpu_accelerator.py | 2 +- accelerator/cuda_accelerator.py | 2 +- accelerator/mps_accelerator.py | 2 +- accelerator/npu_accelerator.py | 2 +- .../swap_tensor/partitioned_param_swapper.py | 15 +++++++++------ deepspeed/runtime/swap_tensor/utils.py | 3 ++- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/accelerator/abstract_accelerator.py b/accelerator/abstract_accelerator.py index 1583a097712a..6fdde79d2a71 100644 --- a/accelerator/abstract_accelerator.py +++ b/accelerator/abstract_accelerator.py @@ -222,7 +222,7 @@ def LongTensor(self): ... @abc.abstractmethod - def pin_memory(self, tensor, align=False): + def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): ... @abc.abstractmethod diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index e9f5677d5657..36cd779968f3 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -222,7 +222,7 @@ def IntTensor(self): def LongTensor(self): return torch.LongTensor - def pin_memory(self, tensor, align=False): + def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): return tensor def is_pinned(self, tensor): diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 1344f3f4970c..98d7915b0f52 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -203,7 +203,7 @@ def IntTensor(self): def LongTensor(self): return torch.cuda.LongTensor - def pin_memory(self, tensor, align=False): + def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/accelerator/mps_accelerator.py b/accelerator/mps_accelerator.py index b8b7a71b4ecf..4e811338d419 100644 --- a/accelerator/mps_accelerator.py +++ b/accelerator/mps_accelerator.py @@ -186,7 +186,7 @@ def IntTensor(self): def LongTensor(self): return - def pin_memory(self, tensor, align=False): + def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/accelerator/npu_accelerator.py b/accelerator/npu_accelerator.py index e8e3b2176309..dcff8dc02181 100644 --- a/accelerator/npu_accelerator.py +++ b/accelerator/npu_accelerator.py @@ -188,7 +188,7 @@ def IntTensor(self): def LongTensor(self): return torch.npu.LongTensor - def pin_memory(self, tensor, align=False): + def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 11974df2689a..e6f9b52839f7 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -104,10 +104,11 @@ def _configure_aio(self, ds_config): self.available_buffer_ids = [i for i in range(self.param_buffer_count)] self.reserved_buffer_ids = [] - self.buffers = get_accelerator().pin_memory( - torch.empty(int(self.aligned_elements_per_buffer * self.param_buffer_count), - dtype=self.dtype, - requires_grad=False)) + self.buffers = get_accelerator().pin_memory(torch.empty(int(self.aligned_elements_per_buffer * + self.param_buffer_count), + dtype=self.dtype, + requires_grad=False), + sc_page_align=True) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -379,8 +380,10 @@ def _is_io_aligned(self, numel): def reserve_partitioned_swap_space(self, partition_num_elems): aligned_numel = sum([self._io_aligned_numel(numel) for numel in partition_num_elems]) - self.partitioned_swap_buffer = get_accelerator().pin_memory( - torch.zeros(aligned_numel, device='cpu', dtype=self.dtype)) + self.partitioned_swap_buffer = get_accelerator().pin_memory(torch.zeros(aligned_numel, + device='cpu', + dtype=self.dtype), + sc_page_align=True) self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer]) def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params): diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 43bf9ab75bf0..b11597c19592 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -184,7 +184,8 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype)) for _ in range(count) + get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), sc_page_align=True) + for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] self.used_buffer_index = {} From 7eb65c97fb2696ebedaf7c62440bc1f6e4777f4d Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Tue, 19 Sep 2023 08:59:08 -0700 Subject: [PATCH 10/13] modify pin_memory api to use only on align para --- accelerator/abstract_accelerator.py | 2 +- accelerator/cpu_accelerator.py | 2 +- accelerator/cuda_accelerator.py | 2 +- accelerator/mps_accelerator.py | 2 +- accelerator/npu_accelerator.py | 2 +- deepspeed/runtime/swap_tensor/partitioned_param_swapper.py | 4 ++-- deepspeed/runtime/swap_tensor/utils.py | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/accelerator/abstract_accelerator.py b/accelerator/abstract_accelerator.py index 6fdde79d2a71..ab300f3c3051 100644 --- a/accelerator/abstract_accelerator.py +++ b/accelerator/abstract_accelerator.py @@ -222,7 +222,7 @@ def LongTensor(self): ... @abc.abstractmethod - def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): + def pin_memory(self, tensor, align_bytes=1): ... @abc.abstractmethod diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index 36cd779968f3..84e6c855cb51 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -222,7 +222,7 @@ def IntTensor(self): def LongTensor(self): return torch.LongTensor - def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): + def pin_memory(self, tensor, align_bytes=1): return tensor def is_pinned(self, tensor): diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 98d7915b0f52..8518f1ccc66d 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -203,7 +203,7 @@ def IntTensor(self): def LongTensor(self): return torch.cuda.LongTensor - def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): + def pin_memory(self, tensor, align_bytes=1): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/accelerator/mps_accelerator.py b/accelerator/mps_accelerator.py index 4e811338d419..0f028982857c 100644 --- a/accelerator/mps_accelerator.py +++ b/accelerator/mps_accelerator.py @@ -186,7 +186,7 @@ def IntTensor(self): def LongTensor(self): return - def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): + def pin_memory(self, tensor, align_bytes=1): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/accelerator/npu_accelerator.py b/accelerator/npu_accelerator.py index dcff8dc02181..0a1098c21452 100644 --- a/accelerator/npu_accelerator.py +++ b/accelerator/npu_accelerator.py @@ -188,7 +188,7 @@ def IntTensor(self): def LongTensor(self): return torch.npu.LongTensor - def pin_memory(self, tensor, align_bytes=1, sc_page_align=False): + def pin_memory(self, tensor, align_bytes=1): return tensor.pin_memory() def is_pinned(self, tensor): diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index e6f9b52839f7..5df089f86020 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -108,7 +108,7 @@ def _configure_aio(self, ds_config): self.param_buffer_count), dtype=self.dtype, requires_grad=False), - sc_page_align=True) + align_bytes=-1) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -383,7 +383,7 @@ def reserve_partitioned_swap_space(self, partition_num_elems): self.partitioned_swap_buffer = get_accelerator().pin_memory(torch.zeros(aligned_numel, device='cpu', dtype=self.dtype), - sc_page_align=True) + align_bytes=-1) self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer]) def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params): diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index b11597c19592..918af3f6cb15 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -184,7 +184,7 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), sc_page_align=True) + get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), align_bytes=-1) for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] From c481d5d416c8f240bc0d60c06b76a37e5674090f Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Sun, 24 Sep 2023 18:11:27 -0700 Subject: [PATCH 11/13] change value of align bytes --- deepspeed/runtime/swap_tensor/partitioned_param_swapper.py | 4 ++-- deepspeed/runtime/swap_tensor/utils.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 5df089f86020..fcc6a272883f 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -108,7 +108,7 @@ def _configure_aio(self, ds_config): self.param_buffer_count), dtype=self.dtype, requires_grad=False), - align_bytes=-1) + align_bytes=0) self.aio_read_handle = self.aio_handle(self.aio_config[AIO_BLOCK_SIZE], self.aio_config[AIO_QUEUE_DEPTH], self.aio_config[AIO_SINGLE_SUBMIT], self.aio_config[AIO_OVERLAP_EVENTS], @@ -383,7 +383,7 @@ def reserve_partitioned_swap_space(self, partition_num_elems): self.partitioned_swap_buffer = get_accelerator().pin_memory(torch.zeros(aligned_numel, device='cpu', dtype=self.dtype), - align_bytes=-1) + align_bytes=0) self.partitioned_swap_pool = SwapBufferPool([self.partitioned_swap_buffer]) def swap_out_partitioned_params(self, dst_fp16_params, src_fp32_params): diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 918af3f6cb15..90b2d9b8bd31 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -184,7 +184,7 @@ def __init__(self, num_elems, count, dtype): self.count = count self.dtype = dtype self.all_buffers = [ - get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), align_bytes=-1) + get_accelerator().pin_memory(torch.zeros(num_elems, device='cpu', dtype=dtype), align_bytes=0) for _ in range(count) ] self.free_buffer_index = [i for i in range(count)] From 8c4ffb8c1707e674a3d05ffc45786a3ac125803c Mon Sep 17 00:00:00 2001 From: Olatunji Ruwase Date: Wed, 27 Sep 2023 07:28:23 -0700 Subject: [PATCH 12/13] Update csrc/aio/common/deepspeed_aio_common.cpp --- csrc/aio/common/deepspeed_aio_common.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csrc/aio/common/deepspeed_aio_common.cpp b/csrc/aio/common/deepspeed_aio_common.cpp index 2e37e8851e4f..a18aea3c0652 100644 --- a/csrc/aio/common/deepspeed_aio_common.cpp +++ b/csrc/aio/common/deepspeed_aio_common.cpp @@ -116,7 +116,7 @@ static int _do_io_complete(const long long int min_completes, { const auto start_time = std::chrono::high_resolution_clock::now(); long long int n_completes = io_pgetevents( - aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), NULL, NULL); + aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr, nullptr); reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time); assert(n_completes >= min_completes); return n_completes; From 87aa0885131974050861cbc3a71cb20e94825700 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Thu, 28 Sep 2023 08:58:17 -0700 Subject: [PATCH 13/13] add version check and change format --- csrc/aio/common/deepspeed_aio_common.cpp | 8 ++++++-- op_builder/async_io.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_common.cpp b/csrc/aio/common/deepspeed_aio_common.cpp index a18aea3c0652..32b0e8a32394 100644 --- a/csrc/aio/common/deepspeed_aio_common.cpp +++ b/csrc/aio/common/deepspeed_aio_common.cpp @@ -115,8 +115,12 @@ static int _do_io_complete(const long long int min_completes, std::vector>& reap_times) { const auto start_time = std::chrono::high_resolution_clock::now(); - long long int n_completes = io_pgetevents( - aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr, nullptr); + long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt, + min_completes, + max_completes, + aio_ctxt->_io_events.data(), + nullptr, + nullptr); reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time); assert(n_completes >= min_completes); return n_completes; diff --git a/op_builder/async_io.py b/op_builder/async_io.py index 084cb10864cf..2db18e3629a1 100644 --- a/op_builder/async_io.py +++ b/op_builder/async_io.py @@ -79,7 +79,7 @@ def is_compatible(self, verbose=True): # which is a function provided by libaio that is used in the async_io op. # If needed, one can define -I and -L entries in CFLAGS and LDFLAGS # respectively to specify the directories for libaio.h and libaio.so. - aio_compatible = self.has_function('io_submit', ('aio', )) + aio_compatible = self.has_function('io_pgetevents', ('aio', )) if verbose and not aio_compatible: self.warning(f"{self.NAME} requires the dev libaio .so object and headers but these were not found.")