Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero infinity xpu support #4130

Merged
merged 19 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions csrc/aio/common/deepspeed_aio_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,25 @@ static void _do_io_submit_block(const long long int n_iocbs,
assert(submit_ret > 0);
}

const char *ds_acc_env_var = "DS_ACCELERATOR";
static int _do_io_complete(const long long int min_completes,
const long long int max_completes,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& reap_times)
{
const auto start_time = std::chrono::high_resolution_clock::now();
const auto n_completes = io_getevents(
long long int n_completes;
if(std::getenv(ds_acc_env_var) == "xpu")
{
n_completes = io_pgetevents(
tjruwase marked this conversation as resolved.
Show resolved Hide resolved
aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), NULL, NULL);
tjruwase marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
n_completes = io_getevents(
aio_ctxt->_io_ctxt, min_completes, max_completes, aio_ctxt->_io_events.data(), nullptr);
}
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);

assert(n_completes >= min_completes);
return n_completes;
}
Expand Down
35 changes: 31 additions & 4 deletions csrc/aio/py_lib/deepspeed_aio_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,44 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
_fd(fd),
_filename(filename),
_num_bytes(num_bytes),
_validate(validate)
_validate(validate),
_use_aligned_buffer(false)
{
_cpu_buffer = _buffer.is_cuda() ? _buffer.to(torch::kCPU).pin_memory() : _buffer;
_contiguous_buffer = _cpu_buffer.contiguous();
const char *ds_acc_env_var = "DS_ACCELERATOR";
if (_buffer.is_cuda())
{
_cpu_buffer = _buffer.to(torch::kCPU).pin_memory();
_contiguous_buffer = _cpu_buffer.contiguous();
}
else if (std::getenv(ds_acc_env_var) == "xpu")
{
_cpu_buffer = _buffer.to(torch::kCPU);
tjruwase marked this conversation as resolved.
Show resolved Hide resolved
_contiguous_buffer = _cpu_buffer.contiguous();
posix_memalign((void**)&_aligned_buffer, (size_t)sysconf(_SC_PAGESIZE), _num_bytes);
memcpy(_aligned_buffer, _contiguous_buffer.data_ptr(), _num_bytes);
_use_aligned_buffer = true;
}
else
{
_cpu_buffer = _buffer;
_contiguous_buffer = _cpu_buffer.contiguous();
}
}

char* io_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr(); }
char* io_op_desc_t::data_ptr() const {
if (_use_aligned_buffer)
return _aligned_buffer;
else (char*)_contiguous_buffer.data_ptr();
}

void io_op_desc_t::fini()
{
if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
if (_read_op && _use_aligned_buffer)
{
memcpy(_contiguous_buffer.data_ptr(), _aligned_buffer, _num_bytes);
_buffer.copy_(_contiguous_buffer.to(torch::kXPU));
}
}

deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config)
Expand Down
3 changes: 3 additions & 0 deletions csrc/aio/py_lib/deepspeed_aio_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ struct io_op_desc_t {
torch::Tensor _contiguous_buffer;
const bool _validate;

char* _aligned_buffer;
bool _use_aligned_buffer;

io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
Expand Down
2 changes: 1 addition & 1 deletion csrc/aio/py_lib/deepspeed_py_aio_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void deepspeed_aio_handle_t::_stop_threads()

int deepspeed_aio_handle_t::wait()
{
assert(_num_pending_ops > 0);
// assert(_num_pending_ops > 0);
tjruwase marked this conversation as resolved.
Show resolved Hide resolved
auto num_completed_ops = 0;

while (_num_pending_ops > 0) {
Expand Down
3 changes: 2 additions & 1 deletion deepspeed/runtime/swap_tensor/async_swapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from deepspeed import comm as dist
from deepspeed.utils.logging import logger
from deepspeed.runtime.swap_tensor.utils import swap_out_tensors, SwapBuffer
from deepspeed.accelerator import get_accelerator

INVALID_BUFFER_INDEX = -1
ASYNC_SWAPPER_WAIT_TIMER = 'async_swap_gradient_wait'
Expand Down Expand Up @@ -37,7 +38,7 @@ def has_buffers(self):

def add_buffers(self, buffer_list):
assert len(self.all_buffers) == 0
assert all([buffer.is_pinned() for buffer in buffer_list])
assert all([get_accelerator().is_pinned(buffer) for buffer in buffer_list])
dtype = buffer_list[0].dtype
assert all([buffer.dtype == dtype for buffer in buffer_list])

Expand Down
8 changes: 4 additions & 4 deletions deepspeed/runtime/swap_tensor/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from deepspeed.runtime.swap_tensor.utils import swap_in_tensors, swap_out_tensors, \
MIN_AIO_BYTES, AIO_ALIGNED_BYTES, get_sized_buffers
from deepspeed.runtime.swap_tensor.utils import SwapBufferManager, SwapBufferPool

from deepspeed.accelerator import get_accelerator

class FlattenedTensorSwapInfo(object):

Expand Down Expand Up @@ -90,7 +90,7 @@ def get_swap_gradient_paths(self):
return [grad.path for grad in self.swapped_gradients.values()]

def get_unpinned_state_tensors(self):
return [t for t in self.tensors if not t.is_pinned()]
return [t for t in self.tensors if not get_accelerator().is_pinned(t)]

def read_unswapped_gradients(self, dest_buffer):
num_elem_count = 0
Expand Down Expand Up @@ -216,7 +216,7 @@ def _initialize_from_swapped_fp16_params(self, aio_handle, fp16_partitions_info,
fp16_pinned_buffers, fp32_parameters):
assert len(fp32_parameters) == len(fp16_partitions_info)
assert len(fp32_parameters) == len(fp16_num_elems)
assert all([buffer.is_pinned() for buffer in fp16_pinned_buffers])
assert all([get_accelerator().is_pinned(buffer) for buffer in fp16_pinned_buffers])

fp32_swap_paths = self._get_swap_paths(parameters=fp32_parameters, num_elems=fp16_num_elems)

Expand Down Expand Up @@ -363,7 +363,7 @@ def _swap_out_unpinned_tensors(self, aio_handle, unpinned_tensors, dest_paths, p
for dst, src in zip(compute_buffers, src_tensors):
dst.data.copy_(src.data)

swap_lengths = [self._io_aligned_numel(t.numel()) for t in src_tensors]
swap_lengths = [self._io_aligned_numel(unpinned_tensors[-1].numel())] * len(src_tensors)
swap_buffers = get_sized_buffers(pinned_buffers, swap_lengths)

swap_paths = dest_paths[i:(i + swap_tensor_count)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
get_sized_buffers
from deepspeed.runtime.swap_tensor.async_swapper import AsyncTensorSwapper
from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper
from deepspeed.accelerator import get_accelerator

DEBUG_MODE = False

Expand Down Expand Up @@ -174,7 +175,7 @@ def _separate_pinned_tensors(self, swap_info):
unpinned_paths = []

for tensor, path in zip(swap_info.tensors, swap_info.swap_paths):
if tensor.is_pinned():
if get_accelerator().is_pinned(tensor):
pinned_tensors.append(tensor)
pinned_paths.append(path)
else:
Expand Down Expand Up @@ -206,7 +207,7 @@ def _swap_in_gradients(self, aio_handle, parameter, dest_buffer):
if not (swap_info and swap_info.has_gradients()):
return

assert dest_buffer.is_pinned()
assert get_accelerator().is_pinned(dest_buffer)
assert parameter.numel() <= dest_buffer.numel()

parameter.grad = dest_buffer.narrow(0, 0, parameter.numel())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def swap_in(self, params, async_op=True, swap_in_buffers=None):
def swap_into_buffer(self, param, dest_buffer):
assert param.ds_tensor.status == PartitionedParamStatus.NOT_AVAILABLE, f"param {param.ds_id} is already available or inflight"

require_swap_buffer = not (dest_buffer.is_pinned() and self._is_io_aligned(dest_buffer.numel()))
require_swap_buffer = not (get_accelerator().is_pinned(dest_buffer) and self._is_io_aligned(dest_buffer.numel()))

if require_swap_buffer:
assert len(self.available_buffer_ids) > 0, f"No buffer available to swap param {param.ds_id}."
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/swap_tensor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_swap_path(self, offset):
class SwapBufferPool(object):

def __init__(self, buffers):
assert all([buf.is_pinned() for buf in buffers])
assert all([get_accelerator().is_pinned(buf) for buf in buffers])
self.buffers = [SwapBuffer(buf) for buf in buffers]
self.current_index = 0

Expand Down