Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions parla/cuda.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@

import logging
from functools import wraps, lru_cache
import os, sys
from typing import Dict, List, Optional, Collection

from parla import array
@@ -15,6 +16,14 @@

logger = logging.getLogger(__name__)

if 'cupy_backends' in sys.modules:
# TODO: This should be dynamically configurable. That needs to be fixed upstream though.
raise ImportError("cupy must be imported after parla.gpu for per-thread default stream configuration to work properly.")

os.environ["CUPY_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"
# numba responds to this env var even if it has already been imported.
os.environ["NUMBA_CUDA_PER_THREAD_DEFAULT_STREAM"] = "1"

try:
import cupy
import cupy.cuda
@@ -167,33 +176,19 @@ def get_array_module(self, a):
# Integration with parla.environments

class _GPUStacksLocal(threading.local):
_stream_stack: List[cupy.cuda.Stream]
_device_stack: List[cupy.cuda.Device]

def __init__(self):
super(_GPUStacksLocal, self).__init__()
self._stream_stack = []
self._device_stack = []

def push_stream(self, stream):
self._stream_stack.append(stream)

def pop_stream(self) -> cupy.cuda.Stream:
return self._stream_stack.pop()

def push_device(self, dev):
self._device_stack.append(dev)

def pop_device(self) -> cupy.cuda.Device:
return self._device_stack.pop()

@property
def stream(self):
if self._stream_stack:
return self._stream_stack[-1]
else:
return None
@property
def device(self):
if self._device_stack:
return self._device_stack[-1]
@@ -213,30 +208,20 @@ def __init__(self, descriptor: "GPUComponent", env: TaskEnvironment):
# Use a stack per thread per GPU component just in case.
self._stack = _GPUStacksLocal()

def _make_stream(self):
with self.gpu.cupy_device:
return cupy.cuda.Stream(null=False, non_blocking=True)

def __enter__(self):
_gpu_locals._gpus = self.gpus
dev = self.gpu.cupy_device
dev.__enter__()
self._stack.push_device(dev)
stream = self._make_stream()
stream.__enter__()
self._stack.push_stream(stream)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
dev = self._stack.device
stream = self._stack.stream
try:
stream.synchronize()
stream.__exit__(exc_type, exc_val, exc_tb)
cupy.cuda.get_current_stream().synchronize()
_gpu_locals._gpus = None
ret = dev.__exit__(exc_type, exc_val, exc_tb)
finally:
self._stack.pop_stream()
self._stack.pop_device()
return ret

@@ -245,15 +230,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()

class GPUComponent(EnvironmentComponentDescriptor):
"""A single GPU CUDA component which configures the environment to use the specific GPU using a single
@@ -312,15 +297,15 @@ def initialize_thread(self) -> None:
# Trigger cuBLAS/etc. initialization for this GPU in this thread.
with cupy.cuda.Device(gpu.index) as device:
a = cupy.asarray([2.])
cupy.cuda.get_current_stream().synchronize()
with cupy.cuda.Stream(False, True) as stream:
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()
stream = cupy.cuda.get_current_stream()
stream.synchronize()
cupy.asnumpy(cupy.sqrt(a))
device.cublas_handle
device.cusolver_handle
device.cusolver_sp_handle
device.cusparse_handle
stream.synchronize()
device.synchronize()


class MultiGPUComponent(EnvironmentComponentDescriptor):