Skip to content

Commit

Permalink
Review comments. Passing ShmFile pointer instead of void
Browse files Browse the repository at this point in the history
  • Loading branch information
fpetrini15 committed Apr 11, 2024
1 parent 955ecb9 commit a5b6b7e
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 113 deletions.
4 changes: 2 additions & 2 deletions docs/protocol/extension_shared_memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ memory. These parameters and their type are:
registered shared memory region. Region names share a namespace for
system-shared-memory regions and CUDA-shared-memory regions.

- "shared_memory_offset" : unsigned integer value is the offset, in bytes, into
- "shared_memory_offset" : size_t value is the offset, in bytes, into
the region where the data for the tensor starts.

- "shared_memory_byte_size" : unsigned integer value is the size, in bytes, of
- "shared_memory_byte_size" : size_t value is the size, in bytes, of
the data.

The “shared_memory_offset” parameter is optional and defaults to
Expand Down
31 changes: 30 additions & 1 deletion qa/L0_shared_memory/shared_memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def test_invalid_create_shm(self, protocol):
)
print(f"*\n*\n*\nStarting Test:test_invalid_create_shm.{protocol}\n*\n*\n*\n")
try:
shm_op0_handle = shm.create_shared_memory_region(
shm.create_shared_memory_region(
"dummy_data", (self._shm_key_prefix + "dummy_data"), -1
)
except Exception as ex:
Expand Down Expand Up @@ -305,6 +305,35 @@ def test_valid_create_set_register(self, protocol):
shm.destroy_shared_memory_region(shm_op0_handle)
self._test_passed = True

@parameterized.expand([("grpc"), ("http")])
def test_different_name_same_key(self, protocol):
# Create a valid system shared memory region, fill data in it and register
self._setUp(
protocol,
Path(os.getcwd()) / f"test_different_name_same_key.{protocol}.server.log",
)
print(
f"*\n*\n*\nStarting Test:test_different_name_same_key.{protocol}\n*\n*\n*\n"
)

shm_op0_handle = shm.create_shared_memory_region(
"dummy", (self._shm_key_prefix + "dummy_data"), 8
)
shm.set_shared_memory_region(
shm_op0_handle, [np.array([1, 2], dtype=np.float32)]
)
self._triton_client.register_system_shared_memory(
"dummy", (self._shm_key_prefix + "dummy_data"), 8
)
try:
self._triton_client.register_system_shared_memory(
"dummy2", (self._shm_key_prefix + "dummy_data"), 8
)
except Exception as ex:
self.assertIn("registering an active shared memory key", str(ex))
shm.destroy_shared_memory_region(shm_op0_handle)
self._test_passed = True

@parameterized.expand([("grpc"), ("http")])
def test_unregister_before_register(self, protocol):
# Create a valid system shared memory region and unregister before register
Expand Down
6 changes: 5 additions & 1 deletion qa/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ def kill_server(server_process):
server_process.kill()
else:
server_process.send_signal(signal.SIGINT)
server_process.wait()
try:
server_process.wait(timeout=30)
except subprocess.TimeoutExpired:
server_process.kill()
raise Exception("Server did not shutdown properly")


def stream_to_log(client_log):
Expand Down
183 changes: 94 additions & 89 deletions src/shared_memory_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,44 +59,107 @@ GetWindowsError()
#endif

TRITONSERVER_Error*
OpenSharedMemoryRegion(const std::string& shm_key, void** shm_file)
UnmapSharedMemory(void* mapped_addr, size_t byte_size)
{
#ifdef _WIN32
bool success = UnmapViewOfFile(mapped_addr);
if (!success) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string(
"unable to unmap shared memory region, error code: " +
GetWindowsError())
.c_str());
}
#else
int status = munmap(mapped_addr, byte_size);
if (status == -1) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string(
"unable to munmap shared memory region, errno: " +
std::string(std::strerror(errno)))
.c_str());
}
#endif
return nullptr;
}

#ifdef TRITON_ENABLE_GPU
TRITONSERVER_Error*
OpenCudaIPCRegion(
const cudaIpcMemHandle_t* cuda_shm_handle, void** data_ptr, int device_id)
{
#ifdef _WIN32
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_UNSUPPORTED,
std::string(
"GPU shared memory features are currently not supported on Windows")
.c_str());
#else
// Set to device curres
cudaSetDevice(device_id);

// Open CUDA IPC handle and read data from it
cudaError_t err = cudaIpcOpenMemHandle(
data_ptr, *cuda_shm_handle, cudaIpcMemLazyEnablePeerAccess);
if (err != cudaSuccess) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL, std::string(
"failed to open CUDA IPC handle: " +
std::string(cudaGetErrorString(err)))
.c_str());
}

return nullptr;
#endif
}

#endif // TRITON_ENABLE_GPU

} // namespace

TRITONSERVER_Error*
SharedMemoryManager::OpenSharedMemoryRegion(
const std::string& shm_key, ShmFile** shm_file)
{
#ifdef _WIN32
HANDLE* shm_handle = static_cast<HANDLE*>(shm_file);
*shm_handle = OpenFileMapping(
HANDLE shm_handle = OpenFileMapping(
FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // cannot inherit handle
shm_key.c_str()); // name of mapping object

if (*shm_handle == NULL) {
if (shm_handle == NULL) {
LOG_VERBOSE(1) << "OpenFileMapping failed with error code: "
<< GetWindowsError();
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string("Unable to open shared memory region: '" + shm_key + "'")
.c_str());
}
// Dynamic memory will eventually be owned by uniqe_ptr
*shm_file = new ShmFile(shm_handle);
#else
// get shared memory region descriptor
int* shm_fd = *reinterpret_cast<int**>(shm_file);
*shm_fd = shm_open(shm_key.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
if (*shm_fd == -1) {
int shm_fd = shm_open(shm_key.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
if (shm_fd == -1) {
LOG_VERBOSE(1) << "shm_open failed, errno: " << errno;
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string("Unable to open shared memory region: '" + shm_key + "'")
.c_str());
}
// Dynamic memory will eventually be owned by uniqe_ptr
*shm_file = new ShmFile(shm_fd);
#endif
return nullptr;
}

TRITONSERVER_Error*
CloseSharedMemoryRegion(void* shm_file)
SharedMemoryManager::CloseSharedMemoryRegion(ShmFile* shm_file)
{
#ifdef _WIN32
HANDLE shm_handle = static_cast<HANDLE>(shm_file);
bool success = CloseHandle(shm_handle);
bool success = CloseHandle(shm_file->shm_handle_);
if (!success) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
Expand All @@ -106,8 +169,7 @@ CloseSharedMemoryRegion(void* shm_file)
.c_str());
}
#else
int shm_fd = *static_cast<int*>(shm_file);
int status = close(shm_fd);
int status = close(shm_file->shm_fd_);
if (status == -1) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
Expand All @@ -122,12 +184,11 @@ CloseSharedMemoryRegion(void* shm_file)
}

TRITONSERVER_Error*
MapSharedMemory(
void* shm_file, const size_t offset, const size_t byte_size,
SharedMemoryManager::MapSharedMemory(
ShmFile* shm_file, const size_t offset, const size_t byte_size,
void** mapped_addr)
{
#ifdef _WIN32
HANDLE shm_handle = static_cast<HANDLE>(shm_file);
// The MapViewOfFile function takes a high-order and low-order DWORD (4 bytes
// each) for offset. 'size_t' can either be 4 or 8 bytes depending on the
// operating system. To handle both cases agnostically, we cast 'offset' to
Expand All @@ -138,10 +199,10 @@ MapSharedMemory(
DWORD low_order_offset = upperbound_offset & 0xFFFFFFFF;
// map shared memory to process address space
*mapped_addr = MapViewOfFile(
shm_handle, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
high_order_offset, // offset (high-order DWORD)
low_order_offset, // offset (low-order DWORD)
shm_file->shm_handle_, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
high_order_offset, // offset (high-order DWORD)
low_order_offset, // offset (low-order DWORD)
byte_size);

if (*mapped_addr == NULL) {
Expand All @@ -153,10 +214,10 @@ MapSharedMemory(
.c_str());
}
#else
int shm_fd = *static_cast<int*>(shm_file);
// map shared memory to process address space
*mapped_addr =
mmap(NULL, byte_size, PROT_WRITE | PROT_READ, MAP_SHARED, shm_fd, offset);
*mapped_addr = mmap(
NULL, byte_size, PROT_WRITE | PROT_READ, MAP_SHARED, shm_file->shm_fd_,
offset);
if (*mapped_addr == MAP_FAILED) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL, std::string(
Expand All @@ -168,67 +229,6 @@ MapSharedMemory(
return nullptr;
}

TRITONSERVER_Error*
UnmapSharedMemory(void* mapped_addr, size_t byte_size)
{
#ifdef _WIN32
bool success = UnmapViewOfFile(mapped_addr);
if (!success) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string(
"unable to unmap shared memory region, error code: " +
GetWindowsError())
.c_str());
}
#else
int status = munmap(mapped_addr, byte_size);
if (status == -1) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
std::string(
"unable to munmap shared memory region, errno: " +
std::string(std::strerror(errno)))
.c_str());
}
#endif
return nullptr;
}

#ifdef TRITON_ENABLE_GPU
TRITONSERVER_Error*
OpenCudaIPCRegion(
const cudaIpcMemHandle_t* cuda_shm_handle, void** data_ptr, int device_id)
{
#ifdef _WIN32
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_UNSUPPORTED,
std::string(
"GPU shared memory features are currently not supported on Windows")
.c_str());
#else
// Set to device curres
cudaSetDevice(device_id);

// Open CUDA IPC handle and read data from it
cudaError_t err = cudaIpcOpenMemHandle(
data_ptr, *cuda_shm_handle, cudaIpcMemLazyEnablePeerAccess);
if (err != cudaSuccess) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL, std::string(
"failed to open CUDA IPC handle: " +
std::string(cudaGetErrorString(err)))
.c_str());
}

return nullptr;
#endif
}

#endif // TRITON_ENABLE_GPU

} // namespace

SharedMemoryManager::~SharedMemoryManager()
{
UnregisterAll(TRITONSERVER_MEMORY_CPU);
Expand All @@ -253,17 +253,14 @@ SharedMemoryManager::RegisterSystemSharedMemory(

// register
void* mapped_addr;
// Safe initializer for Unix case where shm_file must be dereferenced to
// base in order to store file descriptor.
int shm_safe_initializer = -1;
void* shm_file = &shm_safe_initializer;
ShmFile* shm_file = nullptr;
bool shm_file_exists = false;

// don't re-open if shared memory is already open
for (auto itr = shared_memory_map_.begin(); itr != shared_memory_map_.end();
++itr) {
if (itr->second->shm_key_ == shm_key) {
shm_file = itr->second->platform_handle_->GetShmFile();
shm_file = itr->second->platform_handle_.get();
shm_file_exists = true;
break;
}
Expand All @@ -272,8 +269,16 @@ SharedMemoryManager::RegisterSystemSharedMemory(
// open and set new shm_file if new shared memory key
if (!shm_file_exists) {
RETURN_IF_ERR(OpenSharedMemoryRegion(shm_key, &shm_file));
} else {
// FIXME: DLIS-6448 - We should allow users the flexibility to register
// the same key under different names with different attributes.
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG,
std::string(
"registering an active shared memory key, \"" + shm_key +
"\", under a different name is not currently supported")
.c_str());
}

// Mmap and then close the shared memory descriptor
TRITONSERVER_Error* err_map =
MapSharedMemory(shm_file, offset, byte_size, &mapped_addr);
Expand Down

0 comments on commit a5b6b7e

Please sign in to comment.