Skip to content

Commit

Permalink
[Storage_ipc] Option II: Provides IPC extensions for 3rd devices. pyt…
Browse files Browse the repository at this point in the history
  • Loading branch information
mengpenghui committed May 20, 2024
1 parent b24ad7e commit b7ead53
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 25 deletions.
19 changes: 19 additions & 0 deletions aten/src/ATen/detail/PrivateUse1HooksInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ struct TORCH_API PrivateUse1HooksInterface : AcceleratorHooksInterface {
false,
"You should register `PrivateUse1HooksInterface` for PrivateUse1 before call `resizePrivateUse1Bytes`.");
}

virtual void* Storage_shareDevice(void* self, void* noargs) const {
TORCH_CHECK_NOT_IMPLEMENTED(
false,
"You should register `PrivateUse1HooksInterface` for PrivateUse1 before call `Storage_sharedevice`.");
}

virtual void* Storage_newSharedDevice(void* _unused, void* args) const {
TORCH_CHECK_NOT_IMPLEMENTED(
false,
"You should register `PrivateUse1HooksInterface` for PrivateUse1 before call `Storage_newSharedDevice`.");
}

virtual void* Storage_releaseIPCCounterDevice(void* _unused, void* args) const {
TORCH_CHECK_NOT_IMPLEMENTED(
false,
"You should register `PrivateUse1HooksInterface` for PrivateUse1 before call `releaseIPCCounterDevice`.");
}

};

struct TORCH_API PrivateUse1HooksArgs {};
Expand Down
12 changes: 6 additions & 6 deletions c10/core/StorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct C10_API StorageImpl : public c10::intrusive_ptr_target {
size_bytes_(std::move(size_bytes)),
size_bytes_is_heap_allocated_(size_bytes_.is_heap_allocated()),
resizable_(resizable),
received_cuda_(false),
received_device_(false),
allocator_(allocator) {
if (resizable) {
TORCH_INTERNAL_ASSERT(
Expand Down Expand Up @@ -232,12 +232,12 @@ struct C10_API StorageImpl : public c10::intrusive_ptr_target {

// This method can be used only after storage construction and cannot be used
// to modify storage status
void set_received_cuda(bool received_cuda) {
received_cuda_ = received_cuda;
void set_received_device(bool received_device) {
received_device_ = received_device;
}

bool received_cuda() {
return received_cuda_;
bool received_device() {
return received_device_;
}

impl::PyObjectSlot* pyobj_slot() {
Expand Down Expand Up @@ -294,7 +294,7 @@ struct C10_API StorageImpl : public c10::intrusive_ptr_target {
bool resizable_;
// Identifies that Storage was received from another process and doesn't have
// local to process cuda memory allocation
bool received_cuda_;
bool received_device_;
// All special checks in data/data_ptr calls are guarded behind this single
// boolean. This is for performance: .data/.data_ptr calls are commonly in the
// hot-path.
Expand Down
2 changes: 1 addition & 1 deletion test/test_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def test_storage_meta_errors(self, device, dtype):
s0.cuda()

with self.assertRaisesRegex(RuntimeError, r'only available on CUDA'):
s0._share_cuda_()
s0._share_device_()

with self.assertRaisesRegex(TypeError, r"cannot pin 'torch.storage.UntypedStorage' only CPU memory can be pinned"):
s0.pin_memory()
Expand Down
70 changes: 65 additions & 5 deletions torch/csrc/StorageSharing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <ATen/MapAllocator.h>
#include <ATen/StorageUtils.h>
#include <ATen/DeviceAccelerator.h>
#include <torch/csrc/utils/python_numbers.h>
#include <atomic>
#include <string>
Expand Down Expand Up @@ -377,7 +378,7 @@ static PyObject* THPStorage_shareCuda(PyObject* self, PyObject* noargs) {
END_HANDLE_TH_ERRORS
}

static PyObject* THPStorage_releaseIPCCounter(
static PyObject* THPStorage_releaseIPCCounterCuda(
PyObject* _unused,
PyObject* args) {
HANDLE_TH_ERRORS
Expand Down Expand Up @@ -651,18 +652,77 @@ PyObject* THPStorage_isShared(PyObject* self, PyObject* noargs) {
}
}

static PyObject* THPStorage_shareDevice(PyObject* self, PyObject* noargs) {
HANDLE_TH_ERRORS
c10::DeviceType device_type = at::getAccelerator(true).value();
if (device_type == at::kCUDA) {
return THPStorage_shareCuda(self, noargs);
} else if (device_type == at::kPrivateUse1) {
at::globalContext().lazyInitPrivateUse1();
return (PyObject*)at::detail::getPrivateUse1Hooks().Storage_shareDevice(
self, noargs);
} else {
TORCH_CHECK(false,
"The device ",
device_type,
" does not support _share_device_");
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}

static PyObject* THPStorage_newSharedDevice(PyObject* _unused, PyObject* args) {
HANDLE_TH_ERRORS
c10::DeviceType device_type = at::getAccelerator(true).value();
if (device_type == at::kCUDA) {
return THPStorage_newSharedCuda(_unused, args);
} else if (device_type == at::kPrivateUse1) {
at::globalContext().lazyInitPrivateUse1();
return (PyObject*)at::detail::getPrivateUse1Hooks().Storage_newSharedDevice(
_unused, args);
} else {
TORCH_CHECK(false,
"The device ",
device_type,
" does not support _new_shared_device");
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}

static PyObject* THPStorage_releaseIPCCounter(
PyObject* _unused,
PyObject* args) {
HANDLE_TH_ERRORS
c10::DeviceType device_type = at::getAccelerator(true).value();
if (device_type == at::kCUDA) {
return THPStorage_releaseIPCCounterCuda(_unused, args);
} else if (device_type == at::kPrivateUse1) {
at::globalContext().lazyInitPrivateUse1();
return (PyObject*)at::detail::getPrivateUse1Hooks()
.Storage_releaseIPCCounterDevice(_unused, args);
} else {
TORCH_CHECK(false,
"The device ",
device_type,
" does not support _release_ipc_counter_device");
}
Py_RETURN_NONE;
END_HANDLE_TH_ERRORS
}

// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
static PyMethodDef THPStorage_sharingMethods[] = {
{"_new_with_weak_ptr",
THPStorage_newWithWeakPtr,
METH_O | METH_CLASS,
nullptr},
{"_share_cuda_", THPStorage_shareCuda, METH_NOARGS, nullptr},
{"_new_shared_cuda",
THPStorage_newSharedCuda,
{"_share_device_", THPStorage_shareDevice, METH_NOARGS, nullptr},
{"_new_shared_device",
THPStorage_newSharedDevice,
METH_VARARGS | METH_STATIC,
nullptr},
{"_release_ipc_counter_cuda",
{"_release_ipc_counter_device",
THPStorage_releaseIPCCounter,
METH_VARARGS | METH_STATIC,
nullptr},
Expand Down
6 changes: 3 additions & 3 deletions torch/multiprocessing/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def rebuild_cuda_tensor(
)
if storage is None:
torch.cuda._lazy_init()
storage = storage_cls._new_shared_cuda(
storage = storage_cls._new_shared_device(
storage_device,
storage_handle,
storage_size_bytes,
Expand All @@ -161,7 +161,7 @@ def rebuild_cuda_tensor(
else:
# We already ref counting this Storage, but producer needs new ref-counters to be released.
storage_cls._release_ipc_counter(
ref_counter_handle, ref_counter_offset, device=storage_device
ref_counter_handle, ref_counter_offset
)

_storage = (
Expand Down Expand Up @@ -318,7 +318,7 @@ def reduce_tensor(tensor):
ref_counter_offset,
event_handle,
event_sync_required,
) = storage._share_cuda_()
) = storage._share_device_()
tensor_offset = tensor.storage_offset()
shared_cache[handle] = StorageWeakRef(storage)
# _backward_hooks purposely omitted here, see
Expand Down
20 changes: 10 additions & 10 deletions torch/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def from_buffer(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-b
@classmethod
def _new_shared_filename_cpu(cls: Type[T], manager, obj, size, *, device=None, dtype=None) -> T: ... # type: ignore[empty-body] # noqa: E704
@classmethod
def _release_ipc_counter_cuda(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-body] # noqa: E704
def _release_ipc_counter_device(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-body] # noqa: E704
@classmethod
def _new_with_weak_ptr(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-body] # noqa: E704
def _shared_decref(self) -> T: ... # type: ignore[empty-body, misc, type-var] # noqa: E704
Expand All @@ -70,10 +70,10 @@ def resize_(self, size: int): ... # noqa: E704
def _weak_ref(self, *args, **kwargs) -> T: ... # type: ignore[empty-body, misc, type-var] # noqa: E704
def _set_from_file(self, *args, **kwargs): ... # noqa: E704
def _set_cdata(self, *args, **kwargs): ... # noqa: E704
def _share_cuda_(self, *args, **kwargs): ... # noqa: E704
def _share_device_(self, *args, **kwargs): ... # noqa: E704
def is_shared(self) -> bool: ... # type: ignore[empty-body] # noqa: E704
@classmethod
def _new_shared_cuda(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-body] # noqa: E704
def _new_shared_device(cls: Type[T], *args, **kwargs) -> T: ... # type: ignore[empty-body] # noqa: E704
def _shared_incref(self, *args, **kwargs): ... # noqa: E704
@classmethod
def _free_weak_ref(cls, *args, **kwargs): ... # noqa: E704
Expand Down Expand Up @@ -1158,8 +1158,8 @@ def _set_from_file(self, *args, **kwargs):
def _set_cdata(self, *args, **kwargs):
return self._untyped_storage._set_cdata(*args, **kwargs)

def _share_cuda_(self, *args, **kwargs):
return self._untyped_storage._share_cuda_(*args, **kwargs)
def _share_device_(self, *args, **kwargs):
return self._untyped_storage._share_device_(*args, **kwargs)

def is_shared(self):
_warn_typed_storage_removal()
Expand All @@ -1170,8 +1170,8 @@ def _is_shared(self):
return self._untyped_storage.is_shared()

@classmethod
def _new_shared_cuda(cls, *args, **kwargs):
return torch.UntypedStorage._new_shared_cuda(*args, **kwargs)
def _new_shared_device(cls, *args, **kwargs):
return torch.UntypedStorage._new_shared_device(*args, **kwargs)

def _share_filename_cpu_(self, *args, **kwargs):
manager_handle, storage_handle, size = self._untyped_storage._share_filename_cpu_(*args, **kwargs)
Expand All @@ -1182,8 +1182,8 @@ def _shared_decref(self):
return self

@classmethod
def _release_ipc_counter(cls, *args, device=None, **kwargs):
return torch.UntypedStorage._release_ipc_counter_cuda(*args, **kwargs)
def _release_ipc_counter(cls, *args, **kwargs):
return torch.UntypedStorage._release_ipc_counter_device(*args, **kwargs)

def _shared_incref(self, *args, **kwargs):
return self._untyped_storage._shared_incref(*args, **kwargs)
Expand Down Expand Up @@ -1230,7 +1230,7 @@ def _new_shared(cls, size):

@classmethod
def _release_ipc_counter(cls, *args, **kwargs):
return torch.UntypedStorage._release_ipc_counter_cuda(*args, **kwargs)
return torch.UntypedStorage._release_ipc_counter_device(*args, **kwargs)

@classmethod
def _new_shared_filename(cls, manager, obj, size):
Expand Down

0 comments on commit b7ead53

Please sign in to comment.