Skip to content

Commit

Permalink
style: fix cython lint
Browse files Browse the repository at this point in the history
  • Loading branch information
wlruys committed Nov 10, 2023
1 parent 9f0b7e2 commit 6c7d015
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 96 deletions.
3 changes: 0 additions & 3 deletions src/python/parla/cython/core.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ cdef extern from "include/runtime.hpp" nogil:
void stop_callback(stopfunc_t func, void* scheduler) noexcept

void create_parray(InnerPArray* parray, int parray_dev_id) noexcept
#ctypedef void* Ptr_t
#ctypedef InnerTask* InnerTaskPtr_t

cdef cppclass _StatusFlags "TaskStatusFlags":
bool spawnable
Expand Down Expand Up @@ -153,7 +151,6 @@ cdef extern from "include/runtime.hpp" nogil:
void increase_num_active_tasks()
void decrease_num_active_tasks()

#int get_num_active_workers()
int get_num_active_tasks()
int get_num_running_tasks()
int get_num_ready_tasks()
Expand Down
16 changes: 4 additions & 12 deletions src/python/parla/cython/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@

import cython

from ..common.parray.core import PArray
from ..common.dataflow import Dataflow
from ..common.globals import AccessMode, cupy
from ..common.globals import cupy

from .device cimport Device
from .cyparray cimport CyPArray
from .device_manager cimport CyDeviceManager, DeviceManager

from .mm cimport CyMM
import threading
from enum import IntEnum, auto
from libc.stdint cimport uintptr_t

LOG_TRACE = 0
Expand Down Expand Up @@ -627,7 +623,6 @@ cdef class PyInnerScheduler:
cdef InnerScheduler* c_self = self.inner_scheduler
c_self.remove_parray_from_tracker(cy_parray.get_cpp_parray(), dev_id)

#TODO(wlr): Should we release the GIL here? Or is it better to keep it?
cpdef task_cleanup(self, PyInnerWorker worker, PyInnerTask task, int state):
cdef InnerScheduler* c_self = self.inner_scheduler
cdef InnerWorker* c_worker = worker.inner_worker
Expand Down Expand Up @@ -689,13 +684,11 @@ cdef class PyInnerScheduler:
cdef InnerScheduler* c_self = self.inner_scheduler
c_self.create_parray(cy_parray.get_cpp_parray(), parray_dev_id)

cpdef get_mapped_parray_state(\
self, int global_dev_id, long long int parray_parent_id):
cpdef get_mapped_parray_state(self, int global_dev_id, long long int parray_parent_id):
cdef InnerScheduler* c_self = self.inner_scheduler
return c_self.get_mapped_parray_state(global_dev_id, parray_parent_id)

cpdef get_reserved_parray_state(\
self, int global_dev_id, long long int parray_parent_id):
cpdef get_reserved_parray_state(self, int global_dev_id, long long int parray_parent_id):
cdef InnerScheduler* c_self = self.inner_scheduler
return c_self.get_reserved_parray_state(global_dev_id, parray_parent_id)

Expand Down Expand Up @@ -744,8 +737,7 @@ class DataMovementTaskAttributes:
This is delcared to avoid circular imports that could happen
when we import tasks.pyx in here.
"""
def __init__(self, name, py_parray, access_mode, assigned_devices, \
c_attrs: CyDataMovementTaskAttributes, dev_id):
def __init__(self, name, py_parray, access_mode, assigned_devices, c_attrs: CyDataMovementTaskAttributes, dev_id):
self.name = name
self.parray = py_parray
self.access_mode = access_mode
Expand Down
1 change: 0 additions & 1 deletion src/python/parla/cython/cyparray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from .cyparray cimport InnerPArray
from .cyparray_state cimport CyPArrayState
from ..common.parray.core import PArray

# a Cython wrapper class around C++ PArray
cdef class CyPArray:
Expand Down
4 changes: 2 additions & 2 deletions src/python/parla/cython/device.pxd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#cython: language_level=3
#cython: language=c++
# cython: language_level=3
# cython: language=c++
from ..cython.resources cimport Resource

import cython
Expand Down
14 changes: 4 additions & 10 deletions src/python/parla/cython/device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ cdef class CyGPUDevice(CyDevice):
def __cinit__(self, int dev_id, long mem_sz, long num_vcus, py_device):
# C++ device object.
# This object is deallocated by the C++ device manager.
self._cpp_device = new GPUDevice(dev_id, mem_sz, num_vcus, \
<void *> py_device)
self._cpp_device = new GPUDevice(dev_id, mem_sz, num_vcus, <void *> py_device)

def __init__(self, int dev_id, long mem_sz, long num_vcus, py_device):
pass
Expand Down Expand Up @@ -197,19 +196,13 @@ class PyDevice:
return self._device_id


"""
Device instances in Python manage resource status.
TODO(hc): the device configuration will be packed in a data class soon.
"""

class PyGPUDevice(PyDevice):
"""
An inherited class from `PyDevice` for a device object specialized to CUDA.
"""

def __init__(self, dev_id: int = 0, mem_sz: long = 0, num_vcus: long = 1):
super().__init__(DeviceType.GPU, "GPU", dev_id)
#TODO(wlr): If we ever support VECs, we might need to move this device initialization
self._cy_device = CyGPUDevice(dev_id, int(mem_sz*0.8), num_vcus, self)

@property
Expand Down Expand Up @@ -391,6 +384,7 @@ class PyGPUArchitecture(PyArchitecture):
def __init__(self):
super().__init__("GPUArch", DeviceType.GPU)


class ImportableGPUArchitecture(PyGPUArchitecture, ImportableArchitecture):
def __init__(self):
ImportableArchitecture.__init__(self, "GPUArch", DeviceType.GPU)
Expand Down Expand Up @@ -496,11 +490,11 @@ class CupyStream(Stream):
return self.__repr__()

def __enter__(self):
#Set the device to the stream's device.
# Set the device to the stream's device.
self.active_device = cupy.cuda.Device(self._device_id)
self.active_device.__enter__()

#Set the stream to the current stream.
# Set the stream to the current stream.
self._stream.__enter__()

Locals.push_stream(self)
Expand Down
3 changes: 1 addition & 2 deletions src/python/parla/cython/device_manager.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ cdef extern from "include/device_manager.hpp" nogil:
void free_memory_by_parray_id(int parray_dev_id, unsigned long memory_size)
void free_memory(unsigned int global_dev_id, unsigned long memory_size)




cdef class CyDeviceManager:
cdef DeviceManager* cpp_device_manager_
cpdef register_device(self, CyDevice cy_device)
Expand Down
9 changes: 5 additions & 4 deletions src/python/parla/cython/device_manager.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ gpu = ImportableGPUArchitecture()
cpu = ImportableCPUArchitecture()

from . import stream_pool
from .stream_pool cimport CyStreamPool

cdef class CyDeviceManager:
"""
Expand Down Expand Up @@ -246,9 +245,11 @@ class PyDeviceManager:
for dev_id in range(num_of_gpus):

if self.num_real_gpus > 0:
py_cuda_device = PyGPUDevice(dev_id % self.num_real_gpus, \
gpu_mem_sizes[dev_id], \
VCU_BASELINE)
py_cuda_device = PyGPUDevice(
dev_id % self.num_real_gpus,
gpu_mem_sizes[dev_id],
VCU_BASELINE
)

else:
py_cuda_device = PyCPUDevice(
Expand Down
6 changes: 2 additions & 4 deletions src/python/parla/cython/mm.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from parla.cython import device_manager

from parla.cython.core cimport LRUGlobalEvictionManager
from parla.cython cimport device_manager
#from parla.cython.core import LRUGlobalEvictionManager


class PyMM:
def __init__(self, dm: device_manager.PyDeviceManager):
Expand All @@ -20,13 +20,11 @@ class PyMM:
return self._cy_mm

def print_memory_stats(self, device_id, label: str):
import psutil
import os
print(f"[{label}] Memory tracking", flush=True)
try:
import cupy
mempool = cupy.get_default_memory_pool()
pinned_mempool = cupy.get_default_pinned_memory_pool()
_pinned_mempool = cupy.get_default_pinned_memory_pool()
print((
f"\t GPU{device_id} {label} CuPy used bytes: {mempool.used_bytes()} \n"
f"\t GPU{device_id} {label} Free bytes: {mempool.free_bytes()} \n"
Expand Down
56 changes: 16 additions & 40 deletions src/python/parla/cython/scheduler.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from abc import abstractmethod
import threading
import inspect
import os
from ..common.globals import DeviceType, cupy, CUPY_ENABLED
from ..common.globals import SynchronizationType as SyncType
from ..common.globals import _Locals as Locals
Expand All @@ -26,9 +25,6 @@ from . import tasks
from . cimport core
from . import core
from .cyparray import CyPArray
from .mm import PyMM

from ..common.parray.core import PArray
from ..utility.tracer import NVTXTracer

Task = tasks.Task
Expand Down Expand Up @@ -208,12 +204,8 @@ class WorkerThread(ControllableThread, SchedulerContext):
self.task = self.inner_worker.get_task()
if(self.task is None):
self.status = "Waiting"
# print("WAITING", flush=True)
#with self._monitor:
# if not self.task:
# self._monitor.wait()
nvtx.push_range(message="worker::wait", domain="Python Runtime", color="blue")
self.inner_worker.wait_for_task() # GIL Release
self.inner_worker.wait_for_task() # GIL Release
self.task = self.inner_worker.get_task()
if isinstance(self.task, core.DataMovementTaskAttributes):
self.task_attrs = self.task
Expand All @@ -237,13 +229,11 @@ class WorkerThread(ControllableThread, SchedulerContext):
# Save device_context with task object
active_task.environment = device_context

#Writes all 'default' streams and event pointers to c++ task
#This allows their synchronization without the GIL and faster iteration over them
#(only saves initial runtime ones, TODO(wlr): save any user added events or streams after body returns)
# Writes all 'default' streams and event pointers to c++ task
# This allows their synchronization without the GIL and faster iteration over them
# (only saves initial runtime ones, TODO(wlr): save any user added events or streams after body returns)
device_context.write_to_task(active_task)

#print("Wrote enviornment to task", active_task, flush=True)

# Wait / Enqueue event for dependencies to complete
if USE_PYTHON_RUNAHEAD:
active_task.py_handle_runahead_dependencies()
Expand Down Expand Up @@ -296,38 +286,37 @@ class WorkerThread(ControllableThread, SchedulerContext):
elif isinstance(final_state, tasks.TaskRunahead):
core.binlog_2("Worker", "Runahead task: ", active_task.inner_task, " on worker: ", self.inner_worker)


if USE_PYTHON_RUNAHEAD:
#Handle synchronization in Python (for debugging, works!)
#print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True)
# Handle synchronization in Python (for debugging, works!)
# print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True)
if self._should_run:
#print("In if", flush=True)
# print("In if", flush=True)
self.status = "Waiting"
nvtx.push_range(message="worker::wait::2", domain="Python Runtime", color="red")
self.scheduler.inner_scheduler.task_cleanup_and_wait_for_task(self.inner_worker, active_task.inner_task, active_task.state.value)
else:
#print("In else", flush=True)
# print("In else", flush=True)
self.scheduler.inner_scheduler.task_cleanup_presync(self.inner_worker, active_task.inner_task, active_task.state.value)
if active_task.runahead != SyncType.NONE:
device_context.synchronize(events=True)
self.scheduler.inner_scheduler.task_cleanup_postsync(self.inner_worker, active_task.inner_task, active_task.state.value)
else:
#Handle synchronization in C++
# Handle synchronization in C++
# self.scheduler.inner_scheduler.task_cleanup(self.inner_worker, active_task.inner_task, active_task.state.value)
# Adding wait here to reduce context switch between GIL
print("Should run before cleanup_and_wait", self._should_run, active_task.inner_task, flush=True)
if self._should_run:
self.status = "Waiting"
nvtx.push_range(message="worker::wait::2", domain="Python Runtime", color="red")
self.scheduler.inner_scheduler.task_cleanup_and_wait_for_task(self.inner_worker, active_task.inner_task, active_task.state.value)
#self.task = self.inner_worker.get_task()
# self.task = self.inner_worker.get_task()
else:
self.scheduler.inner_scheduler.task_cleanup(self.inner_worker, active_task.inner_task, active_task.state.value)
# print("Finished Cleaning up Task", active_task, flush=True)
#print("Should run before device_context", self._should_run, task, flush=True)
# print("Should run before device_context", self._should_run, task, flush=True)
if active_task.runahead != SyncType.NONE:
device_context.return_streams()
#print("Should run before final_state cleanup", self._should_run, task, flush=True)
# print("Should run before final_state cleanup", self._should_run, task, flush=True)
if isinstance(final_state, tasks.TaskRunahead):
final_state = tasks.TaskCompleted(final_state.return_value)
active_task.cleanup()
Expand All @@ -339,7 +328,6 @@ class WorkerThread(ControllableThread, SchedulerContext):
self.task = None
nvtx.pop_range(domain="Python Runtime")


# Adding wait here to reduce context switch between GIL
# if self._should_run:
# self.status = "Waiting"
Expand Down Expand Up @@ -442,7 +430,7 @@ class Scheduler(ControllableThread, SchedulerContext):

def parray_eviction(self):
py_mm = self.memory_manager
print("Eviction policy is activated")
# print("Eviction policy is activated")
for cuda_device in self.device_manager.get_devices(DeviceType.CUDA):
global_id = cuda_device.get_global_id()
parray_id = self.device_manager.globalid_to_parrayid(global_id)
Expand All @@ -453,31 +441,19 @@ class Scheduler(ControllableThread, SchedulerContext):
# from Python eviction manager.
num_evictable_parray = py_mm.size(global_id)
# TODO(hc): remove this. this is for test.
#import cupy
# import cupy
for i in range(0, num_evictable_parray):
try:
# Get a PArray from a memory manager to evict.
evictable_parray = \
py_mm.remove_and_return_head_from_zrlist(global_id)
if evictable_parray is not None:
# TODO(hc): remove this. this is for test.
#for k in range(0, 4):
# with cupy.cuda.Device(k):
# mempool = cupy.get_default_memory_pool()
# print(f"\t OK? {k} Used GPU{k}: {mempool.used_bytes()}, Free Mmeory: {mempool.free_bytes()}", flush=True)

evictable_parray.evict(parray_id)

# TODO(hc): remove this. this is for test.
#for k in range(0, 4):
# with cupy.cuda.Device(k):
# mempool = cupy.get_default_memory_pool()
# print(f"\t OK {k} Used GPU{k}: {mempool.used_bytes()}, Free Mmeory: {mempool.free_bytes()}", flush=True)

# Repeat eviction until it gets enough memory.
memory_size_to_evict -= \
evictable_parray.nbytes_at(parray_id)
#print("\t Remaining size to evict:", memory_size_to_evict, flush=True)
# print("\t Remaining size to evict:", memory_size_to_evict, flush=True)
if memory_size_to_evict <= 0:
break
except Exception as e:
Expand All @@ -490,7 +466,7 @@ class Scheduler(ControllableThread, SchedulerContext):
print("Scheduler: Running", flush=True)
self.inner_scheduler.run()
should_run = self.inner_scheduler.get_should_run()
if should_run == False:
if should_run is False:
break
# This case is executed if PArray eviction
# mechanism was invoked by C++ scheduler.
Expand Down
2 changes: 0 additions & 2 deletions src/python/parla/cython/stream_pool.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ cdef extern from "include/device_contexts.hpp":

void push_event(int device_id, uintptr_t event)
uintptr_t pop_event(int device_id)



cdef class CyStreamPool:
cdef InnerStreamPool* _c_pool
cdef dict _pool
cdef int _per_device
cdef list _device_list

Loading

0 comments on commit 6c7d015

Please sign in to comment.