Skip to content

Commit

Permalink
Fine grained Numpy Array locks.
Browse files Browse the repository at this point in the history
  • Loading branch information
sdiehl committed Sep 18, 2012
1 parent 7f53dc2 commit b1b6f8c
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 78 deletions.
11 changes: 2 additions & 9 deletions numpush/getenv.py
Expand Up @@ -6,7 +6,7 @@
from numpy.distutils.cpuinfo import cpuinfo
from zmq import zmq_version

CPU = cpuinfo
CPU = cpuinfo()
PLATFORM = platform.system()
ARCH = platform.architecture()
MAX_PROCS = resource.getrlimit(resource.RLIMIT_NPROC)[1]
Expand All @@ -16,13 +16,7 @@
PYPY = hasattr(sys, 'pypy_version_info')
CPYTHON = not PYPY
ZMQ = zmq_version()
SSE2 = 'sse2' in open('/proc/cpuinfo','r').read()

try:
socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE);
UDPLITE = True
except:
UDPLITE = False
SSE2 = CPU._has_sse2()

try:
socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
Expand All @@ -35,4 +29,3 @@
PTHREADS = True
except:
PTHREADS = False
pass
16 changes: 11 additions & 5 deletions numpush/reductor.py
Expand Up @@ -176,11 +176,12 @@ def bitey_reconstruct(md, bitcode):

LLVM = 0
UFUNC = 1
GUFUNC = 1

# This is a little hairy since Numba is still experimental.
def numba_reduce(tr):
func = tr.func # PyCodeObject
ret_type = tr.ret_type # str
func = tr.func # PyCodeObject
ret_type = tr.ret_type # str
arg_types = tr.arg_types # str

return (func, ret_type, arg_types)
Expand All @@ -196,23 +197,28 @@ def numba_reconstruct(md, func, otype=LLVM):
tr = Translate(func, ret_type, arg_types)
tr.translate()

if otype == 'llvm':
if otype == LLVM:
return tr.get_ctypes_func(llvm=True)
elif otype == 'ufunc':
elif otype == UFUNC:
return tr.make_ufunc(llvm=True)
elif otype == GUFUNC:
pass
else:
raise Exception("Unknown numba cast")

# NumExpr
# =======

def numexpr_reduce(ne):
# Metadata
inputsig = ne.signature
tempsig = ne.tempsig
bytecode = ne.program
input_names = ne.input_names
constants = ne.constants

# Bytecode
bytecode = ne.program

return (inputsig, tempsig, bytecode, constants, input_names)

def numexpr_reconstruct(inputsig, tempsig, bitcode, constants, input_names):
Expand Down
77 changes: 14 additions & 63 deletions numpush/shmem.py
@@ -1,10 +1,16 @@
from numpy import ndarray
from numpy import ndarray, byte_bounds
from pandas import DataFrame
from functools import wraps

from multiprocessing import RLock
from multiprocessing.heap import BufferWrapper
from multiprocessing.sharedctypes import SynchronizedBase

def lock(arr):
arr.flags.writable = False
return byte_bounds(arr)

def unlock(arr):
arr.flags.writable = True
return byte_bounds(arr)

def put_on_heap(na):
size = na.nbytes
Expand All @@ -25,12 +31,6 @@ def RawNumpy(array):
offset=0,
order='C'
)
# Warning, this is a copy operation
# ---------------------------------
# Copy the values from the passed array into shared memory
# arena.

# blosc?
mmap_nd[:] = array[:]
assert mmap_nd.ctypes.data == address
return mmap_nd
Expand All @@ -45,12 +45,9 @@ def SynchronizedNumpy(array, lock=None):
offset = 0,
order = 'C'
)
# Warning, this is a copy operation
# ---------------------------------
# Copy the values from the passed array into shared memory
# arena.
mmap_nd[:] = array[:]
assert mmap_nd.ctypes.data == address
# TODO: agnostic backend
return SynchronizedArray(mmap_nd, lock=lock)

def sync(f):
Expand All @@ -62,52 +59,6 @@ def wrapper(*args, **kwargs):
self.release()
return wrapper

class SynchronizedArray(SynchronizedBase):

def __init__(self, obj, lock=None):
self._obj = obj
self._lock = lock or RLock()
self.acquire = self._lock.acquire
self.release = self._lock.release

def __len__(self):
return len(self._obj)

def __getitem__(self, i):
self.acquire()
try:
return self._obj[i]
finally:
self.release()

def __setitem__(self, i, value):
self.acquire()
try:
self._obj[i] = value
finally:
self.release()

def __getslice__(self, start, stop):
self.acquire()
try:
return self._obj[start:stop]
finally:
self.release()

def __setslice__(self, start, stop, values):
self.acquire()
try:
self._obj[start:stop] = values
finally:
self.release()

def __iadd__(self, other):
with self._lock:
return self._obj.__iadd__(other)

def __imul__(self, other):
with self._lock:
return self._obj.__imul__(other)

# Shared Memory Instances
# -----------------------
Expand All @@ -122,14 +73,14 @@ def SDataFrame(df, mutex=False, lock=None):
columns = df.columns.tolist() # list
return DataFrame(data=snd, index=index, columns=columns, dtype=None, copy=False)

def SDiGraph(graph, mutex=False):
def STensor(tensor, mutex=False):
'''
Shared memory NetworkX graph.
Shared memory Theano tensor.
'''
pass

def STensor(tensor, mutex=False):
def SDiGraph(graph, mutex=False):
'''
Shared memory Theano tensor.
Shared memory NetworkX graph.
'''
pass
93 changes: 93 additions & 0 deletions numpush/shmem_sync.py
@@ -0,0 +1,93 @@
import numpy as np
from threading import currentThread
from contextlib import contextmanager

from _multiprocessing import SemLock
from multiprocessing import BoundedSemaphore, Semaphore, Condition, Event
from multiprocessing.util import register_after_fork
from multiprocessing.forking import assert_spawning, Popen

# The counter is decremented when the semaphore is acquired, and
# incremented when the semaphore is released. If the counter reaches zero
# when acquired, the acquiring thread will block.

class SharedExclusiveLock(object):

def __init__(self, maxreaders=120):
# Linux max semaphore sets is 120
self.max = 120
self._reader = Semaphore(120)
self._writer = Semaphore(1)
self._sleeping = Event()

# Does this process hold the write?
self.localwrite = False
self.thread_id = currentThread()

self.create_methods()

def after_fork(obj):
obj._reader._after_fork()
obj._writer._after_fork()
obj._sleeping._after_fork()

register_after_fork(self, after_fork)

def create_methods(self):
self.acquire = self._reader.acquire
self.release = self._reader.release

@property
def ismine(self):
return self.localwrite and (currentThread() == self.thread_id)

# you can nest write calls
def wait_noreaders(self):
if self.ismine:
return
while self._reader.get_value() < self.max:
self._sleeping.set()
# twiddle the futex
self._sleeping.wait()

@contextmanager
def writing(self):
if self.ismine:
yield
else:
self.wait_noreaders()
self._writer.acquire()
self.localwrite = True
yield
self.localwrite = False
self._writer.release()

def __enter__(self, blocking=True, timeout=None):
# prevent writer starvation
if self.ismine:
return
else:
self._reader.acquire()

def __exit__(self, *args):
if self.ismine:
return
else:
self._reader.release()
if self._sleeping.is_set():
# twiddle the futex
self._sleeping.clear()

def __getstate__(self):
assert_spawning(self)
r = self._reader._semlock
w = self._writer._semlock

reader = Popen.duplicate_for_child(r.handle), r.kind, r.maxvalue
writer = Popen.duplicate_for_child(w.handle), w.kind, w.maxvalue
return (reader, writer)

def __setstate__(self, state):
reader, writer = state
self._reader = SemLock._rebuild(*reader)
self._writer = SemLock._rebuild(*writer)

0 comments on commit b1b6f8c

Please sign in to comment.