Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Support for pthreads.

  • Loading branch information...
commit 6eed2cd27b55936e32cf13bd7a0786508e133df3 1 parent 6013847
@sdiehl authored
View
0  examples/numba_recv.py
No changes.
View
0  examples/numba_send.py
No changes.
View
9 numpush/getenv.py
@@ -1,3 +1,4 @@
+import os
import sys
import socket
import platform
@@ -15,6 +16,7 @@
PYPY = hasattr(sys, 'pypy_version_info')
CPYTHON = not PYPY
ZMQ = zmq_version()
+SSE2 = 'sse2' in open('/proc/cpuinfo','r').read()
try:
socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE);
@@ -27,3 +29,10 @@
TIPC = True
except:
TIPC = False
+
+try:
+ os.stat('/usr/include/pthread.h')
+ PTHREADS = True
+except:
+ PTHREADS = False
+ pass
View
76 numpush/posix_io/splice.pyx
@@ -2,11 +2,23 @@ import os
from libc.stdint cimport uint32_t, uint64_t
from mmap import PAGESIZE
+cdef extern from "pthread.h" nogil:
+ ctypedef unsigned long int pthread_t
+ ctypedef union pthread_attr_t:
+ char __size[56]
+ long int __align
+
+ int pthread_create(pthread_t *thread, pthread_attr_t *attr,
+ void *(*start_routine) (void *), void *arg)
+ int pthread_tryjoin_np (pthread_t __th, void **__thread_return)
+ int pthread_cancel (pthread_t __th)
+
cdef extern from "fcntl.h" nogil:
ctypedef unsigned size_t
ctypedef signed ssize_t
- ssize_t splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_out, size_t len, unsigned int flags)
+ ssize_t splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_out,
+ size_t len, unsigned int flags)
enum: SPLICE_F_MOVE
enum: SPLICE_F_NONBLOCK
@@ -19,7 +31,8 @@ cdef extern from "errno.h" nogil:
enum: SF_SYNC
int errno
-cdef int _posix_splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_out, size_t nbytes, unsigned int flags):
+cdef int _posix_splice(int fd_in, uint64_t *off_in, int fd_out,
+ uint64_t *off_out, size_t nbytes, unsigned int flags):
cdef int sent
cdef int err
global errno
@@ -36,9 +49,25 @@ cdef int _posix_splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_ou
sent = sts
return sent
+cdef extern from "Python.h" nogil:
+ ctypedef struct PyThreadState
+ void PyEval_InitThreads()
+ PyThreadState* PyEval_SaveThread
+ void PyEval_RestoreThread(PyThreadState *tstate)
+
+ctypedef struct spliceinfo:
+ int fd1
+ int fd2
+ uint64_t fd1_offset
+ uint64_t fd2_offset
+ size_t nbytes
+ int flags
+
# Changed the signature a bit to match our common use case of 0,0
-# offsets
-def posix_splice(fd1, fd2, fd1_offset=0, fd2_offset=0, nbytes=PAGESIZE, flags=0):
+# offsets. If you need SPLICE_F_MORE then you probably just want
+# to use the sendfile implementation.
+def posix_splice(fd1, fd2, fd1_offset=0, fd2_offset=0,
+ nbytes=PAGESIZE, flags=SPLICE_F_MOVE):
if type(fd1) is int:
fd1 = os.fdopen(fd1, 'r')
@@ -52,9 +81,46 @@ def posix_splice(fd1, fd2, fd1_offset=0, fd2_offset=0, nbytes=PAGESIZE, flags=0)
cdef int c_flags = flags
cdef int rc
- rc = _posix_splice(fd1.fileno(), &c_fd1offset, fd2.fileno(), &c_fd2offset, nbytes, 0)
+ rc = _posix_splice(
+ fd1.fileno(),
+ &c_fd1offset,
+ fd2.fileno(),
+ &c_fd2offset,
+ nbytes,
+ 0
+ )
if rc < 0:
raise OSError(os.strerror(-rc))
else:
return rc
+
+cdef void* pthread_splice(void *p) nogil:
+ # cython *grumble grumble*
+ cdef spliceinfo *pms = <spliceinfo*>(p)
+ cdef int fd1 = pms.fd1
+ cdef int fd2 = pms.fd2
+ cdef uint64_t fd1_offset = pms.fd1_offset
+ cdef uint64_t fd2_offset = pms.fd2_offset
+ cdef size_t nbytes = pms.nbytes
+ cdef int flags = pms.flags
+ splice(fd1, &fd1_offset, fd2, &fd2_offset, nbytes, flags)
+
+# Spin a raw OS thread (no GIL!) to do background continuous data
+# transfer between two file descriptors. The POSIX thread also
+# shares the same file descriptor as the Python process so we
+# have to be careful
+cdef void spawn(int fd1, int fd2, uint32_t fd1_offset, uint32_t fd2_offset, int nbytes, int flags=0):
+ # cython *grumble grumble*
+ #cdef sockinfo* params = {fd1, fd2, fd1_offset, fd2_offset, nbytes, flags}
+ cdef pthread_t thread
+ cdef spliceinfo *pms = NULL
+ pms.fd1 = fd1
+ pms.fd2 = fd2
+ pms.fd1_offset = fd1_offset
+ pms.fd2_offset = fd2_offset
+ pms.nbytes = nbytes
+ pms.flags = flags
+
+ with nogil:
+ pthread_create(&thread, NULL , pthread_splice, <void*>pms)
View
22 numpush/zmq_net.py
@@ -1,12 +1,14 @@
import zmq
+import struct
import ctypes
import reductor
+from ctypes import sizeof
import numpy as np
from numpy import ndarray, array
from pandas import DataFrame
-#from theano.tensor import Tensor
+from theano.tensor import Tensor
from collections import namedtuple
from datastructures import ReverseLookupDict
@@ -15,6 +17,12 @@
except ImportError:
import cPickle as srl
+# ============
+# Socket Flags
+# ============
+
+COMPRESS = 0x01
+
numpy_metadata = namedtuple('metadata', 'shape dtype')
tensor_metadata = namedtuple('metadata', 'shape dtype')
pandas_metadata = namedtuple('metadata', 'shape dtype index columns')
@@ -47,11 +55,13 @@
NETWORKX = b'\x00\x06'
THEANO = b'\x00\x07'
+PYTHONBYTECODE = b'\x01\x01'
+
type_coercions = {
array : NUMPYND,
ndarray : NUMPYND,
DataFrame : PANDAS,
- #Tensor : THEANO,
+ Tensor : THEANO,
}
class CannotCoerce(Exception):
@@ -61,6 +71,10 @@ def __init__(self, obj):
def __str__(self):
return "Don't know how encode type %s over ZMQ" % ( self.unknown_type)
+# ===============
+# Data Structures
+# ===============
+
def send_numpy(self, magic, obj, flags=0):
numpy_metadata, narray = reductor.numpy_reduce(obj)
self.send(magic, flags|zmq.SNDMORE)
@@ -97,6 +111,10 @@ def recv_tensor(self, flags=0, copy=True, track=False):
nd = self.recv(flags=flags)
return reductor.tensor_reconstruct(md, nd)
+# ====
+# Code
+# ====
+
# Polymorphic ZMQ socket mixins for all supported scientific types
def numsend(self, obj, **kwargs):
magic = type_coercions.get(type(obj))
Please sign in to comment.
Something went wrong with that request. Please try again.