Skip to content
Browse files

Initial commit.

  • Loading branch information...
0 parents commit 456b5040e2c9ac9f45db5890e189905f164de238 @sdiehl committed Aug 13, 2012
29 .gitignore
@@ -0,0 +1,29 @@
+*.py[co]
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+
+#Translations
+*.mo
+*.swp
+*.swo
+
+*.so
+*.c
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2012 Stephen Diehl, <stephen.m.diehl@gmail.com>
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8 Makefile
@@ -0,0 +1,8 @@
+all:
+ python setup.py build_ext --inplace
+
+valgrind:
+ pvalgrind python runtests.py
+
+clean:
+ python setup.py clean
39 NOTICE
@@ -0,0 +1,39 @@
+MooesFS bindings are adapted from python-moosefs by Joseph Hall
+( perlhoser@gmail.com ) released under the GPLv3. The derivative bindings
+are also released under GPLv3 separate from the license of the main
+code.
+
+datastructures
+==============
+
+Copyright (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * The names of the contributors may not be used to endorse or
+ promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
178 README.md
@@ -0,0 +1,178 @@
+numpush
+=======
+
+Efficient distribution for scientific Python data structures and code.
+
+Numpush is not a clustering or map-reduce solution, it is merely
+a library of common functions that are often needed when building
+distributed scientific Python applications.
+
+
+Target Infastructure
+--------------------
+
+* Network transports
+ * TIPC
+ * ZeroMQ
+ * TCP
+* IO Operations
+ * sendfile
+ * splice
+ * mmap
+ * read/write
+* Libraries
+ * numpy
+ * pandas
+ * numba
+ * bitey
+ * llvm-py
+ * theano
+ * numexpr
+ * pytables
+ * carray
+* Datastore Backends
+ * MooseFS
+ * Redis
+
+Data
+----
+
+The core idea is based on the observation that most scientific Python
+data structures can be represented by two parts:
+
+1. Large block of indexable memory
+2. Small bits of metadata
+
+(1) is a heavy object to move around, so we use fast transports (i.e.
+ZeroMQ, TIPC, UDP Lite ) that all act on a universal memoryview
+interface doing low-level POSIX IO calls ( mmap, splice, sendfile )
+that don't require needless copying in userspace to send data and
+can delegate scheduling to the Linux kernel.
+
+(2) are trivial to move around, these are pickled or msgpacked and moved
+around easily since they are most often PyObject's or can be encoded as
+PyObjects.
+
+For example a Pandas dataframe is a Numpy array with some extra
+metadata and logical methods. The logic exists wherever the library is
+installed, to the move the dataframe we need then only encode
+the metadata and buffer the internal array.
+
+```python
+>>> df = DateFrame({'a': [1,2,3], 'b': [4,5,6]})
+ a b
+0 1 4
+1 2 5
+2 3 6
+
+>>> index = df.index.tolist()
+>>> columns = df.columns.tolist()
+>>> dtype = df.values.dtype
+>>> shape = df.values.shape
+
+>>> nd = buffer(df.values)
+
+>>> df2 = np.frombuffer(nd, dtype=dtype).reshape(shape)
+```
+
+Code
+----
+
+The code scenario is slightly different, but usually the idea
+is that we have
+
+1. Chunk of bytecode/bitcode that is moveable between homogeneous computational architectures.
+2. Some associated "state" that is needed to bootstrap the code
+
+(1) is most often simply a string buffer and moving them around the
+network is trivial. Quite often (2) does not ever occur since we are
+just moving pure objects around.
+
+For example a slug of Numba LLVM that does some simple numerical
+operations, like:
+
+```llvm
+define i32 @myfunc(i32, i32, i32) {
+ %3 = add i32 %0, %1
+ %4 = mul i32 %3, %2
+ ret i32 %4
+}
+```
+Can easily be serialized over the network, recompile at the
+target and translated into a Numpy ufunc by the Numba
+infrastructure.
+
+```python
+'BC\xc0\xde!\x0c\x00\x00z\x00\x00\x00\x01\x10\x00\x00\x12\x00\x00\x00\x07\x81#\x91A'
+```
+
+Shared Memory
+-------------
+
+Once data is at a remote target one often wants to fram it out to as
+many cores (i.e. through multiprocessing ) as possible all with the
+same shared memory source. For numpy derivative data structures this is
+not difficult ( though very undocumented! ). To that end there are some
+*simple* shared memory methods included for doing *simple* shared memory
+array operations with mutex.
+
+For example one might want a shared memory-backed DataFrames and process
+is across an 8 cores.
+
+This could be done with ctypes hacks around the
+``multiprocessing.RawArray``, but it's ugly hacks. The
+``numpush.shmem`` module has a cleaner implementation.
+
+
+```python
+from multiprocessing import Pool
+from numpush.shmem import SDataFrame
+
+df = SDataFrame(.. your enormous dataframe .. )
+
+def f(df):
+ ... modify shared memory dataframe ...
+
+pool = Pool(processes=8)
+pool.apply(f, df)
+pool.join()
+```
+
+Data Stores
+-----------
+
+Not going to try and solve the problem of distributed fault-tolerant
+datastores, both MooseFS and Redis work nearly out of the box and can
+address most use cases. The goal is simply to make them
+interface with scientific Python libraries more easily.
+
+MooseFS is distributed, fault-tolerant and quite easy to
+bootstrap on small-medium size clusters, especially on EC2. Works
+well with data that can be manipulated as file-like objects.
+
+Redis is local, fast, simple, and with hiredis can work efficiently with
+large buffers. Works well for small data simple data that fits
+into key-value model.
+
+Required
+--------
+
+* libzmq
+* Python 2.7.x ( memoryview is copiously )
+* pyzmq 2.1.11
+* msgpack
+* Cython 0.16
+
+Optional
+* https://github.com/techhat/python-moosefs
+* redis & hiredis
+
+License
+-------
+
+Released under the MIT license.
+
+```
+Copyright (c) 2012 Stephen Diehl, <stephen.m.diehl@gmail.com>
+See LICENSE and NOTICE for details.
+```
2 examples/Makefile
@@ -0,0 +1,2 @@
+all:
+ llvm-ld bitcode.ll example
0 examples/__init__.py
No changes.
13 examples/bitcode.ll
@@ -0,0 +1,13 @@
+; ModuleID = 'haskell_codegen.o'
+
+define internal cc10 i32 @plus(i32, i32) {
+_L1:
+ %2 = add i32 %0, %1
+ ret i32 %2
+}
+
+define i32 @test(i32, i32) {
+_L1:
+ %2 = add i32 %0, %1
+ ret i32 %2
+}
15 examples/bitey_recv.py
@@ -0,0 +1,15 @@
+import sys
+import zmq
+from types import ModuleType
+from numpush.zmq_net import numrecv
+
+ctx = zmq.Context.instance()
+sock = ctx.socket(zmq.PULL)
+sock.connect('tcp://127.0.0.1:5555')
+
+module = numrecv(sock)
+assert type(module) is ModuleType
+assert 'example' in sys.modules
+
+import example
+assert example.test(1,2) == 3
14 examples/bitey_send.py
@@ -0,0 +1,14 @@
+import zmq
+from llvm.core import Module
+from llvm.ee import ExecutionEngine
+
+from numpush.zmq_net import numsend
+
+import bitey
+example = bitey.load_library('./example')
+
+ctx = zmq.Context.instance()
+sock = ctx.socket(zmq.PUSH)
+sock.bind('tcp://127.0.0.1:5555')
+
+numsend(sock, example)
44 examples/shared_pandas.py
@@ -0,0 +1,44 @@
+from os import getpid
+
+from numpy.random import randn
+from numpush.shmem import SDataFrame
+from pandas import DataFrame
+from multiprocessing import Process
+
+N, M = 10, 10
+df = DataFrame(randn(N,M))
+sdf = SDataFrame(df)
+
+assert df == sdf
+assert df is not sdf
+
+def modify(shared_df, start):
+ # Modify the column of the dataframe with the pid of the process
+ # operating on it. It's stupid but it illustrates that the DataFrame
+ # is truly shared memory instead of copy-on-write.
+ for i in xrange(start, N, 2):
+ shared_df[i] = getpid()
+
+# Shared Memory
+# =============
+
+p1 = Process(target=modify, args=(sdf,0))
+p2 = Process(target=modify, args=(sdf,1))
+p1.start()
+p2.start()
+p1.join()
+p2.join()
+
+print sdf.to_string()
+
+# Copy on Write
+# =============
+
+p1 = Process(target=modify, args=(df,0))
+p2 = Process(target=modify, args=(df,1))
+p1.start()
+p2.start()
+p1.join()
+p2.join()
+
+print df.to_string()
19 examples/test_sendfile.py
@@ -0,0 +1,19 @@
+import socket
+import numpy as np
+import os.path as path
+from tempfile import mkdtemp
+from numpush.posix_io.sendfile import posix_sendfile
+
+# Just vanilla TCP over IPV4
+sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+sock.connect(("127.0.0.1", 8000))
+
+filename = path.join(mkdtemp(), 'map')
+data = np.linspace(0, 1000)
+
+fd = open(filename, 'a+')
+fp = np.memmap(filename, dtype=data.dtype, shape=data.shape)
+fp[:] = data[:]
+
+sent = posix_sendfile(sock, fd, nbytes=1024)
+assert sent == data.nbytes
20 examples/zmq_recv.py
@@ -0,0 +1,20 @@
+import zmq
+from numpy import array
+from pandas import DataFrame
+#from theano.tensor import Tensor
+
+from numpush.zmq_net import numrecv
+
+ctx = zmq.Context.instance()
+sock = ctx.socket(zmq.PULL)
+sock.connect('tcp://127.0.0.1:5555')
+
+# ------------------
+nd = numrecv(sock)
+assert type(nd) is array
+print nd
+# ------------------
+df = numrecv(sock)
+assert type(df) is DataFrame
+print df
+# ------------------
22 examples/zmq_send.py
@@ -0,0 +1,22 @@
+import zmq
+from numpy import array
+from pandas import DataFrame
+#from theano.tensor import Tensor
+
+from numpush.zmq_net import numsend
+
+ctx = zmq.Context.instance()
+sock = ctx.socket(zmq.PUSH)
+sock.bind('tcp://127.0.0.1:5555')
+
+# ------------------
+nd = array([
+ [1,2,3],
+ [2,3,4],
+ [5,8,9],
+])
+numsend(sock, nd)
+# ------------------
+df = DataFrame({'a': [1,2,3], 'b': [4,5,6]})
+numsend(sock, df)
+# ------------------
0 numpush/__init__.py
No changes.
71 numpush/datastructures.py
@@ -0,0 +1,71 @@
+from collections import MutableMapping
+
+class UpdateDictMixin(object):
+ """Makes dicts call `self.on_update` on modifications.
+ """
+ on_update = None
+
+ def calls_update(name):
+ def oncall(self, *args, **kw):
+ rv = getattr(super(UpdateDictMixin, self), name)(*args, **kw)
+ if self.on_update is not None:
+ self.on_update(self)
+ return rv
+ oncall.__name__ = name
+ return oncall
+
+ __setitem__ = calls_update('__setitem__')
+ __delitem__ = calls_update('__delitem__')
+ clear = calls_update('clear')
+ pop = calls_update('pop')
+ popitem = calls_update('popitem')
+ setdefault = calls_update('setdefault')
+ update = calls_update('update')
+ del calls_update
+
+class CallbackDict(UpdateDictMixin, dict):
+
+ def __init__(self, initial=None, on_update=None):
+ dict.__init__(self, initial or ())
+ self.on_update = on_update
+
+ def __repr__(self):
+ return '<%s %s>' % (
+ self.__class__.__name__,
+ dict.__repr__(self)
+ )
+
+class TypeConversionDict(dict):
+ def get(self, key, default=None, type=None):
+ try:
+ rv = self[key]
+ if type is not None:
+ rv = type(rv)
+ except (KeyError, ValueError):
+ rv = default
+ return rv
+
+class ReverseLookupDict(MutableMapping):
+ def __init__(self, inits):
+ self._map = {}
+ self.update(inits)
+
+ def __getitem__(self, key):
+ return self._map.__getitem__(key)
+
+ def __setitem__(self, key, val):
+ self._map.__setitem__(key, val)
+ self._map.__setitem__(val, key)
+
+ def __delitem__(self, key):
+ self._map.__delitem__(self[key])
+ self._map.__delitem__(key)
+
+ def __iter__(self):
+ return self._map.__iter__()
+
+ def __len__(self):
+ return self._map
+
+class NumpyProxy(object):
+ pass
29 numpush/getenv.py
@@ -0,0 +1,29 @@
+import sys
+import socket
+import platform
+import resource
+from numpy.distutils.cpuinfo import cpuinfo
+from zmq import zmq_version
+
+CPU = cpuinfo
+PLATFORM = platform.system()
+ARCH = platform.architecture()
+MAX_PROCS = resource.getrlimit(resource.RLIMIT_NPROC)[1]
+MAX_FDS = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+PAGESIZE = resource.getpagesize()
+HOSTNAME = socket.gethostname()
+PYPY = hasattr(sys, 'pypy_version_info')
+CPYTHON = not PYPY
+ZMQ = zmq_version()
+
+try:
+ socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE);
+ UDPLITE = True
+except:
+ UDPLITE = False
+
+try:
+ socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
+ TIPC = True
+except:
+ TIPC = False
0 numpush/moose_store/__init__.py
No changes.
832 numpush/moose_store/moose.pyx
@@ -0,0 +1,832 @@
+import sys
+import time
+import struct
+import socket
+import traceback
+
+cdef enum ERRORS:
+ STATUS_OK
+ ERROR_EPERM # Operation not permitted
+ ERROR_ENOTDIR # Not a directory
+ ERROR_ENOENT # No such file or directory
+ ERROR_EACCES # Permission denied
+ ERROR_EEXIST # File exists
+ ERROR_EINVAL # Invalid argument
+ ERROR_ENOTEMPTY # Directory not empty
+ ERROR_CHUNKLOST # Chunk lost
+ ERROR_OUTOFMEMORY # Out of memory
+ ERROR_INDEXTOOBIG # Index too big
+ ERROR_LOCKED # Chunk locked
+ ERROR_NOCHUNKSERVERS # No chunk servers
+ ERROR_NOCHUNK # No such chunk
+ ERROR_CHUNKBUSY # Chunk is busy
+ ERROR_REGISTER # Incorrect register BLOB
+ ERROR_NOTDONE # None of chunk servers performed requested operation
+ ERROR_NOTOPENED # File not opened
+ ERROR_NOTSTARTED # Write not started
+ ERROR_WRONGVERSION # Wrong chunk version
+ ERROR_CHUNKEXIST # Chunk already exists
+ ERROR_NOSPACE # No space left
+ ERROR_IO # IO error
+ ERROR_BNUMTOOBIG # Incorrect block number
+ ERROR_WRONGSIZE # Incorrect size
+ ERROR_WRONGOFFSET # Incorrect offset
+ ERROR_CANTCONNECT # Can't connect
+ ERROR_WRONGCHUNKID # Incorrect chunk id
+ ERROR_DISCONNECTED # Disconnected
+ ERROR_CRC # CRC error
+ ERROR_DELAYED # Operation delayed
+ ERROR_CANTCREATEPATH # Can't create path
+ ERROR_MISMATCH # Data mismatch
+ ERROR_EROFS # Read-only file system
+ ERROR_QUOTA # Quota exceeded
+ ERROR_BADSESSIONID # Bad session id
+ ERROR_NOPASSWORD # Password is needed
+ ERROR_BADPASSWORD # Incorrect password
+ ERROR_MAX
+
+errtab = [
+ "OK",
+ "Operation not permitted",
+ "Not a directory",
+ "No such file or directory",
+ "Permission denied",
+ "File exists",
+ "Invalid argument",
+ "Directory not empty",
+ "Chunk lost",
+ "Out of memory",
+ "Index too big",
+ "Chunk locked",
+ "No chunk servers",
+ "No such chunk",
+ "Chunk is busy",
+ "Incorrect register BLOB",
+ "None of chunk servers performed requested operation",
+ "File not opened",
+ "Write not started",
+ "Wrong chunk version",
+ "Chunk already exists",
+ "No space left",
+ "IO error",
+ "Incorrect block number",
+ "Incorrect size",
+ "Incorrect offset",
+ "Can't connect",
+ "Incorrect chunk id",
+ "Disconnected",
+ "CRC error",
+ "Operation delayed",
+ "Can't create path",
+ "Data mismatch",
+ "Read-only file system",
+ "Quota exceeded",
+ "Bad session id",
+ "Password is needed",
+ "Incorrect password",
+ "Unknown MFS error",
+]
+
+strerror = lambda errcode: errtab[errcode]
+
+DEFAULT_HOST = 'mfsmaster'
+DEFAULT_PORT = 9421
+
+class MooseFS(object):
+
+ def __init__(self, masterhost=DEFAULT_HOST, masterport=DEFAULT_PORT ):
+ self.masterhost = masterhost
+ self.masterport = masterport
+ self.masterversion = self.check_master_version()
+
+ def mysend(self, socket, msg):
+ totalsent = 0
+ while totalsent < len(msg):
+ sent = socket.send(msg[totalsent:])
+ if sent == 0:
+ raise RuntimeError("socket connection broken")
+ totalsent = totalsent + sent
+
+ def myrecv(self, socket, leng):
+ msg = ''
+ while len(msg) < leng:
+ chunk = socket.recv(leng-len(msg))
+ if chunk == '':
+ raise RuntimeError("socket connection broken")
+ msg = msg + chunk
+ return msg
+
+ def check_master_version(self):
+ masterversion = (0,0,0)
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s,struct.pack(">LL",510,0))
+ header = self.myrecv(s,8)
+ cmd,length = struct.unpack(">LL", header)
+ data = self.myrecv(s, length)
+ if cmd==511:
+ if length==52:
+ masterversion = (1,4,0)
+ elif length==60:
+ masterversion = (1,5,0)
+ elif length==68:
+ masterversion = struct.unpack(">HBB", data[:4])
+ return masterversion
+
+ def mfs_info(self, INmatrix=0):
+ # For INmatrix, 0 means all, 1 means regular
+ info = {}
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 510, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==511 and length==52:
+ data = self.myrecv(s, length)
+ total, avail, trspace, trfiles, respace, refiles, nodes, chunks, tdcopies = struct.unpack(">QQQLQLLLL", data)
+ info = {
+ 'total_space': total,
+ 'avail_space': avail,
+ 'trash_space': trspace,
+ 'trash_files': trfiles,
+ 'reserved_space': respace,
+ 'reserved_files': refiles,
+ 'all_fs_objects': nodes,
+ 'chunks': chunks,
+ 'regular_chunk_copies': tdcopies,
+ }
+ elif cmd==511 and length==60:
+ data = self.myrecv(s, length)
+ total, avail, trspace, trfiles, respace, refiles, nodes, dirs, files, chunks, tdcopies = struct.unpack(">QQQLQLLLLLL", data)
+ info = {
+ 'total_space': total,
+ 'avail_space': avail,
+ 'trash_space': trspace,
+ 'trash_files': trfiles,
+ 'reserved_space': respace,
+ 'reserved_files': refiles,
+ 'all_fs_objects': nodes,
+ 'directories': dirs,
+ 'files': files,
+ 'chunks': chunks,
+ 'regular_chunk_copies': tdcopies,
+ }
+ elif cmd==511 and length==68:
+ data = self.myrecv(s, length)
+ v1, v2, v3, total, avail, trspace, trfiles, respace, refiles, nodes, dirs, files, chunks, allcopies, tdcopies = struct.unpack(">HBBQQQLQLLLLLLL", data)
+ ver = '.'.join([str(v1), str(v2), str(v3)])
+ info = {
+ 'version': ver,
+ 'total_space': total,
+ 'avail_space': avail,
+ 'trash_space': trspace,
+ 'trash_files': trfiles,
+ 'reserved_space': respace,
+ 'reserved_files': refiles,
+ 'all_fs_objects': nodes,
+ 'directories': dirs,
+ 'files': files,
+ 'chunks': chunks,
+ 'all_chunk_copies': allcopies,
+ 'regular_chunk_copies': tdcopies,
+ }
+ else:
+ info = {
+ 'error': 'unrecognized answer from MFSmaster',
+ }
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ # All chunks state matrix
+ matrix = []
+ if self.masterversion>=(1, 5, 13):
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ if self.masterversion>=(1, 6, 10):
+ self.mysend(s, struct.pack(">LLB", 516, 1, INmatrix))
+ else:
+ self.mysend(s, struct.pack(">LL", 516, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==517 and length==484:
+ # This will generate a matrix of goals, from 0 to 10+
+ # for both rows and columns. It does not include totals.
+ for i in xrange(11):
+ data = self.myrecv(s, 44)
+ matrix.append(list(struct.unpack(">LLLLLLLLLLL", data)))
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ # Chunk operations info
+ chunk_info = {}
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 514, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==515 and length==52:
+ data = self.myrecv(s, length)
+ loopstart, loopend, del_invalid, ndel_invalid, del_unused, ndel_unused, del_dclean, ndel_dclean, del_ogoal, ndel_ogoal, rep_ugoal, nrep_ugoal, rebalnce = struct.unpack(">LLLLLLLLLLLLL", data[:52])
+ chunk_info = {
+ 'loop_start': loopstart,
+ 'loop_end' : loopend,
+ 'invalid_deletions' : del_invalid,
+ 'invalid_deletions_out_of' : del_invalid+ndel_invalid,
+ 'unused_deletions' : del_unused,
+ 'unused_deletions_out_of' : del_unused+ndel_unused,
+ 'disk_clean_deletions' : del_dclean,
+ 'disk_clean_deletions_out_of' : del_dclean+ndel_dclean,
+ 'over_goal_deletions' : del_ogoal,
+ 'over_goal_deletions_out_of' : del_ogoal+ndel_ogoal,
+ 'replications_under_goal' : rep_ugoal,
+ 'replications_under_goal_out_of' : rep_ugoal+nrep_ugoal,
+ 'replocations_rebalance' : rebalnce,
+ }
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ # Filesystem check info
+ check_info = {}
+ try:
+ out = []
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 512, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==513 and length>=36:
+ data = self.myrecv(s, length)
+ loopstart, loopend, files, ugfiles, mfiles, chunks, ugchunks, \
+ mchunks, msgbuffleng = struct.unpack(">LLLLLLLLL", data[:36])
+ messages = ''
+ truncated = ''
+ if loopstart>0:
+ if msgbuffleng>0:
+ if msgbuffleng==100000:
+ truncated = 'first 100k'
+ else:
+ truncated = 'no'
+ messages = data[36:]
+ else:
+ messages = 'no data'
+ check_info = {
+ 'check_loop_start_time' : loopstart,
+ 'check_loop_end_time' : loopend,
+ 'files' : files,
+ 'under_goal_files' : ugfiles,
+ 'missing_files' : mfiles,
+ 'chunks' : chunks,
+ 'under_goal_chunks' : ugchunks,
+ 'missing_chunks' : mchunks,
+ 'msgbuffleng' : msgbuffleng,
+ 'important_messages' : messages,
+ 'truncated' : truncated,
+ }
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ ret = {
+ 'info' : info,
+ 'matrix' : matrix,
+ 'chunk_info' : chunk_info,
+ 'check_info' : check_info,
+ }
+ return ret
+
+ def mfs_servers(self):
+ servers = []
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 500, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==501 and self.masterversion>=(1, 5, 13) and (length%54)==0:
+ data = self.myrecv(s, length)
+ n = length/54
+ for i in xrange(n):
+ d = data[i*54:(i+1)*54]
+ v1, v2, v3, ip1, ip2, ip3, ip4, port, used, total, chunks, tdused, tdtotal, tdchunks, errcnt = struct.unpack(">HBBBBBBHQQLQQLL", d)
+ host = ''
+ try:
+ host = (socket.gethostbyaddr("%u.%u.%u.%u" % (ip1, ip2, ip3, ip4)))[0]
+ except Exception:
+ host = "(unresolved)"
+ ip = '.'.join([str(ip1), str(ip2), str(ip3), str(ip4)])
+ ver = '.'.join([str(v1), str(v2), str(v3)])
+ percent_used = ''
+ if (total>0):
+ percent_used = (used*100.0)/total
+ else:
+ percent_used = '-'
+ tdpercent_used = ''
+ if (tdtotal>0):
+ tdpercent_used = (tdused*100.0)/tdtotal
+ else:
+ tdpercent_used = ''
+ servers.append({
+ 'host' : host,
+ 'ip' : ip,
+ 'version' : ver,
+ 'port' : port,
+ 'used' : used,
+ 'total' : total,
+ 'chunks' : chunks,
+ 'percent_used' : percent_used,
+ 'tdused' : tdused,
+ 'tdtotal' : tdtotal,
+ 'tdchucnks' : tdchunks,
+ 'tdpercent_used' : tdpercent_used,
+ 'errcount' : errcnt,
+ })
+ elif cmd==501 and self.masterversion<(1, 5, 13) and (length%50)==0:
+ data = self.myrecv(s, length)
+ n = length/50
+ for i in xrange(n):
+ d = data[i*50:(i+1)*50]
+ ip1, ip2, ip3, ip4, port, used, total, chunks, tdused, \
+ tdtotal, tdchunks, errcnt = struct.unpack(">BBBBHQQLQQLL", d)
+ try:
+ host = (socket.gethostbyaddr("%u.%u.%u.%u" % (ip1, ip2, ip3, ip4)))[0]
+ except Exception:
+ host = "(unresolved)"
+ ip = '.'.join([str(ip1), str(ip2), str(ip3), str(ip4)])
+ percent_used = ''
+ if (total>0):
+ percent_used = (used*100.0)/total
+ else:
+ percent_used = '-'
+ tdpercent_used = ''
+ if (tdtotal>0):
+ tdpercent_used = (tdused*100.0)/tdtotal
+ else:
+ tdpercent_used = ''
+ servers.append({
+ 'host': host,
+ 'ip': ip,
+ 'port': port,
+ 'used': used,
+ 'total': total,
+ 'chunks': chunks,
+ 'percent_used': percent_used,
+ 'tdused': tdused,
+ 'tdtotal': tdtotal,
+ 'tdchucnks': tdchunks,
+ 'tdpercent_used': tdpercent_used,
+ 'errcount': errcnt,
+ })
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ # Metadata backup loggers
+ mbloggers = []
+ if self.masterversion>=(1, 6, 5):
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 522, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==523 and (length%8)==0:
+ data = self.myrecv(s, length)
+ n = length/8
+ for i in xrange(n):
+ d = data[i*8:(i+1)*8]
+ v1, v2, v3, ip1, ip2, ip3, ip4 = struct.unpack(">HBBBBBB", d)
+ ip = '.'.join([str(ip1), str(ip2), str(ip3), str(ip4)])
+ ver = '.'.join([str(v1), str(v2), str(v3)])
+ try:
+ host = (socket.gethostbyaddr("%u.%u.%u.%u" % (ip1, ip2, ip3, ip4)))[0]
+ except Exception:
+ host = "(unresolved)"
+ mbloggers.append((host, ip, ver))
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+ ret = {
+ 'servers': servers,
+ 'metadata_backup_loggers': mbloggers,
+ }
+ return ret
+
+ def mfs_disks(self, HDtime=max, HDperiod=min):
+ # HDtime can be avg or max
+ # HDperiod can be min, hour or day
+ try:
+ # get cs list
+ hostlist = []
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 500, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==501 and self.masterversion>=(1, 5, 13) and (length%54)==0:
+ data = self.myrecv(s, length)
+ n = length/54
+ servers = []
+ for i in xrange(n):
+ d = data[i*54:(i+1)*54]
+ v1, v2, v3, ip1, ip2, ip3, ip4, port, used, total, chunks, tdused, tdtotal, tdchunks, errcnt = struct.unpack(">HBBBBBBHQQLQQLL", d)
+ hostlist.append((v1, v2, v3, ip1, ip2, ip3, ip4, port))
+ elif cmd==501 and self.masterversion<(1, 5, 13) and (length%50)==0:
+ data = self.myrecv(s, length)
+ n = length/50
+ servers = []
+ for i in xrange(n):
+ d = data[i*50:(i+1)*50]
+ ip1, ip2, ip3, ip4, port, used, total, chunks, tdused, tdtotal, tdchunks, errcnt = struct.unpack(">BBBBHQQLQQLL", d)
+ hostlist.append((1, 5, 0, ip1, ip2, ip3, ip4, port))
+ s.close()
+
+ # get hdd lists one by one
+ hdd = []
+ for v1, v2, v3, ip1, ip2, ip3, ip4, port in hostlist:
+ hostip = "%u.%u.%u.%u" % (ip1, ip2, ip3, ip4)
+ try:
+ hoststr = (socket.gethostbyaddr(hostip))[0]
+ except Exception:
+ hoststr = "(unresolved)"
+ if port>0:
+ if (v1, v2, v3) <= (1, 6, 8):
+ s = socket.socket()
+ s.connect((hostip, port))
+ self.mysend(s, struct.pack(">LL", 502, 0))
+ header = self.myrecv(s,8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==503:
+ data = self.myrecv(s, length)
+ while length>0:
+ plen = ord(data[0])
+ host_path = "%s:%u:%s" % (hoststr, port, data[1:plen+1])
+ ip_path = "%s:%u:%s" % (hostip, port, data[1:plen+1])
+ flags, errchunkid, errtime, used, total, chunkscnt = struct.unpack(">BQLQQL", data[plen+1:plen+34])
+ length -= plen+34
+ data = data[plen+34:]
+ hdd.append((ip_path, host_path, flags, errchunkid, errtime, used, total, chunkscnt, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
+ s.close()
+ else:
+ s = socket.socket()
+ s.connect((hostip, port))
+ self.mysend(s, struct.pack(">LL", 600, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==601:
+ data = self.myrecv(s, length)
+ while length>0:
+ entrysize = struct.unpack(">H", data[:2])[0]
+ entry = data[2:2+entrysize]
+ data = data[2+entrysize:]
+ length -= 2+entrysize;
+
+ plen = ord(entry[0])
+ host_path = "%s:%u:%s" % (hoststr, port, entry[1:plen+1])
+ ip_path = "%s:%u:%s" % (hostip, port, entry[1:plen+1])
+ flags, errchunkid, errtime, used, total, chunkscnt = struct.unpack(">BQLQQL", entry[plen+1:plen+34])
+ rbytes, wbytes, usecreadsum, usecwritesum, usecfsyncsum, rops, wops, fsyncops, usecreadmax, usecwritemax, usecfsyncmax = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ if entrysize==plen+34+144:
+ if HDperiod == 'min':
+ rbytes, wbytes, usecreadsum, usecwritesum, rops, wops, usecreadmax, usecwritemax = struct.unpack(">QQQQLLLL", entry[plen+34:plen+34+48])
+ elif HDperiod == 'hour':
+ rbytes, wbytes, usecreadsum, usecwritesum, rops, wops, usecreadmax, usecwritemax = struct.unpack(">QQQQLLLL", entry[plen+34+48:plen+34+96])
+ elif HDperiod == 'day':
+ rbytes, wbytes, usecreadsum, usecwritesum, rops, wops, usecreadmax, usecwritemax = struct.unpack(">QQQQLLLL", entry[plen+34+96:plen+34+144])
+ elif entrysize==plen+34+192:
+ if HDperiod == 'min':
+ rbytes, wbytes, usecreadsum, usecwritesum, usecfsyncsum, rops, wops, fsyncops, usecreadmax, usecwritemax, usecfsyncmax = struct.unpack(">QQQQQLLLLLL", entry[plen+34:plen+34+64])
+ elif HDperiod == 'hour':
+ rbytes, wbytes, usecreadsum, usecwritesum, usecfsyncsum, rops, wops, fsyncops, usecreadmax, usecwritemax, usecfsyncmax = struct.unpack(">QQQQQLLLLLL", entry[plen+34+64:plen+34+128])
+ elif HDperiod == 'day':
+ rbytes, wbytes, usecreadsum, usecwritesum, usecfsyncsum, rops, wops, fsyncops, usecreadmax, usecwritemax, usecfsyncmax = struct.unpack(">QQQQQLLLLLL", entry[plen+34+128:plen+34+192])
+ if usecreadsum>0:
+ rbw = rbytes*1000000/usecreadsum
+ else:
+ rbw = 0
+ if usecwritesum+usecfsyncsum>0:
+ wbw = wbytes*1000000/(usecwritesum+usecfsyncsum)
+ else:
+ wbw = 0
+ if HDtime == 'avg':
+ if rops>0:
+ rtime = usecreadsum/rops
+ else:
+ rtime = 0
+ if wops>0:
+ wtime = usecwritesum/wops
+ else:
+ wtime = 0
+ if fsyncops>0:
+ fsynctime = usecfsyncsum/fsyncops
+ else:
+ fsynctime = 0
+ else:
+ rtime = usecreadmax
+ wtime = usecwritemax
+ fsynctime = usecfsyncmax
+ if flags == 1:
+ if self.masterversion>=(1, 6, 10):
+ status = 'marked for removal'
+ else:
+ status = 'to be empty'
+ elif flags == 2:
+ status = 'damaged'
+ elif flags == 3:
+ if self.masterversion>=(1, 6, 10):
+ status = 'damaged, marked for removal'
+ else:
+ status = 'damaged, to be empty'
+ else:
+ status = 'ok'
+ if errtime==0 and errchunkid==0:
+ lerror = 'no errors'
+ else:
+ lerror = time.localtime(errtime)
+ if rops>0:
+ rbsize = rbytes/rops
+ else:
+ rbsize = 0
+ if wops>0:
+ wbsize = wbytes/wops
+ else:
+ wbsize = 0
+ if (total>0):
+ percent_used = (used*100.0)/total
+ else:
+ percent_used = '-'
+ hdd.append({
+ 'ip_path': ip_path,
+ 'host_path': host_path,
+ 'flags': flags,
+ 'errchunkid': errchunkid,
+ 'errtime': errtime,
+ 'used': used,
+ 'total': total,
+ 'chunkscount': chunkscnt,
+ 'rbw': rbw,
+ 'wbw': wbw,
+ 'rtime': rtime,
+ 'wtime': wtime,
+ 'fsynctime': fsynctime,
+ 'read_ops': rops,
+ 'write_ops': wops,
+ 'fsyncops': fsyncops,
+ 'read_bytes': rbytes,
+ 'write_bytes': wbytes,
+ 'usecreadsum': usecreadsum,
+ 'usecwritesum': usecwritesum,
+ 'status': status,
+ 'lerror': lerror,
+ 'rbsize': rbsize,
+ 'wbsize': wbsize,
+ 'percent_used': percent_used,
+ })
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ return hdd
+
+ def mfs_exports(self):
+ servers = []
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 520, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==521 and self.masterversion>=(1, 5, 14):
+ data = self.myrecv(s, length)
+ pos = 0
+ while pos<length:
+ fip1, fip2, fip3, fip4, tip1, tip2, tip3, tip4, pleng = struct.unpack(">BBBBBBBBL", data[pos:pos+12])
+ ipfrom = "%d.%d.%d.%d" % (fip1, fip2, fip3, fip4)
+ ipto = "%d.%d.%d.%d" % (tip1, tip2, tip3, tip4)
+ pos+=12
+ path = data[pos:pos+pleng]
+ pos+=pleng
+ if self.masterversion>=(1, 6, 1):
+ v1, v2, v3, exportflags, sesflags, rootuid, rootgid, mapalluid, mapallgid = struct.unpack(">HBBBBLLLL", data[pos:pos+22])
+ pos+=22
+ else:
+ v1, v2, v3, exportflags, sesflags, rootuid, rootgid = struct.unpack(">HBBBBLL", data[pos:pos+14])
+ mapalluid = 0
+ mapallgid = 0
+ pos+=14
+ ver = "%d.%d.%d" % (v1, v2, v3)
+ if path=='.':
+ meta=1
+ else:
+ meta=0
+ servers.append( {
+ 'ip_range_from': ipfrom,
+ 'ip_range_to': ipto,
+ 'path': path,
+ 'meta': meta,
+ 'version': ver,
+ 'export_flags': exportflags,
+ 'ses_flags': sesflags,
+ 'root_uid': rootuid,
+ 'root_gid': rootgid,
+ 'all_users_uid': mapalluid,
+ 'all_users_gid': mapallgid,
+ } )
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ return servers
+
+ def mfs_mountl(self):
+ # This section appeared in the original mfs.cgi,
+ # but doesn't actually show up in the browser.
+ # An old version of section MS, perhaps?
+ # I have left it here for historical reasons.
+ servers = []
+
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 508, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==509 and self.masterversion<=(1, 5, 13) and (length%136)==0:
+ data = self.myrecv(s, length)
+ n = length/136
+ for i in xrange(n):
+ d = data[i*136:(i+1)*136]
+ addrdata = d[0:8]
+ stats_c = []
+ stats_l = []
+ ip1, ip2, ip3, ip4, spare, v1, v2, v3 = struct.unpack(">BBBBBBBB", addrdata)
+ ipnum = "%d.%d.%d.%d" % (ip1, ip2, ip3, ip4)
+ if v1==0 and v2==0:
+ if v3==2:
+ ver = "1.3.x"
+ elif v3==3:
+ ver = "1.4.x"
+ else:
+ ver = "unknown"
+ else:
+ ver = "%d.%d.%d" % (v1, v2, v3)
+ for i in xrange(16):
+ stats_c.append(struct.unpack(">L", d[i*4+8:i*4+12]))
+ stats_l.append(struct.unpack(">L", d[i*4+72:i*4+76]))
+ try:
+ host = (socket.gethostbyaddr(ipnum))[0]
+ except Exception:
+ host = "(unresolved)"
+ servers.append((host, ipnum, ver, stats_c, stats_l))
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ return servers
+
+ def mfs_mounts(self):
+ servers = []
+
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 508, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd==509 and self.masterversion>=(1, 5, 14):
+ data = self.myrecv(s, length)
+ pos = 0
+ while pos<length:
+ sessionid, ip1, ip2, ip3, ip4, v1, v2, v3, ileng = struct.unpack(">LBBBBHBBL", data[pos:pos+16])
+ ipnum = "%d.%d.%d.%d" % (ip1, ip2, ip3, ip4)
+ ver = "%d.%d.%d" % (v1, v2, v3)
+ pos+=16
+ info = data[pos:pos+ileng]
+ pos+=ileng
+ pleng = struct.unpack(">L", data[pos:pos+4])[0]
+ pos+=4
+ path = data[pos:pos+pleng]
+ pos+=pleng
+ if self.masterversion>=(1, 6, 1):
+ sesflags, rootuid, rootgid, mapalluid, mapallgid = struct.unpack(">BLLLL", data[pos:pos+17])
+ pos+=145 # 17+64+64 - skip stats
+ else:
+ sesflags, rootuid, rootgid = struct.unpack(">BLL", data[pos:pos+9])
+ mapalluid = 0
+ mapallgid = 0
+ pos+=137 # 9+64+64 - skip stats
+ if path=='.':
+ meta=1
+ else:
+ meta=0
+ try:
+ host = (socket.gethostbyaddr(ipnum))[0]
+ except Exception:
+ host = "(unresolved)"
+ servers.append( {
+ 'sessionid': sessionid,
+ 'host': host,
+ 'ip': ipnum,
+ 'mount': info,
+ 'version': ver,
+ 'meta': meta,
+ 'moose_path': path,
+ 'ses_flags': sesflags,
+ 'root_uid': rootuid,
+ 'root_gid': rootgid,
+ 'all_users_uid': mapalluid,
+ 'all_users_gid': mapallgid,
+ } )
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ return servers
+
+ def mfs_operations(self):
+ servers = []
+
+ try:
+ s = socket.socket()
+ s.connect((self.masterhost, self.masterport))
+ self.mysend(s, struct.pack(">LL", 508, 0))
+ header = self.myrecv(s, 8)
+ cmd, length = struct.unpack(">LL", header)
+ if cmd == 509 and self.masterversion >= (1, 5, 14):
+ data = self.myrecv(s, length)
+ pos = 0
+ while pos<length:
+ sessionid, ip1, ip2, ip3, ip4, v1, v2, v3, ileng = struct.unpack(">LBBBBHBBL", data[pos:pos+16])
+ ipnum = "%d.%d.%d.%d" % (ip1, ip2, ip3, ip4)
+ ver = "%d.%d.%d" % (v1, v2, v3)
+ pos += 16
+ info = data[pos:pos+ileng]
+ pos += ileng
+ pleng = struct.unpack(">L", data[pos:pos+4])[0]
+ pos += 4
+ path = data[pos:pos+pleng]
+ pos += pleng
+ if self.masterversion >= (1, 6, 0):
+ pos += 17
+ else:
+ pos += 9
+ stats_c = struct.unpack(">LLLLLLLLLLLLLLLL", data[pos:pos+64])
+ pos += 64
+ stats_l = struct.unpack(">LLLLLLLLLLLLLLLL", data[pos:pos+64])
+ pos += 64
+ try:
+ host = (socket.gethostbyaddr(ipnum))[0]
+ except Exception:
+ host = "(unresolved)"
+ if path != '.':
+ servers.append( {
+ 'host': host,
+ 'ip': ipnum,
+ 'info': info,
+ 'stats_current': {
+ 'statfs' : stats_c[0],
+ 'getattr' : stats_c[1],
+ 'setattr' : stats_c[2],
+ 'lookup' : stats_c[3],
+ 'mkdir' : stats_c[4],
+ 'rmdir' : stats_c[5],
+ 'symlink' : stats_c[6],
+ 'readlink' : stats_c[7],
+ 'mknod' : stats_c[8],
+ 'unlink' : stats_c[9],
+ 'rename' : stats_c[10],
+ 'link' : stats_c[11],
+ 'readdir' : stats_c[12],
+ 'open' : stats_c[13],
+ 'read' : stats_c[14],
+ 'write' : stats_c[15],
+ },
+ 'stats_lasthour': {
+ 'statfs' : stats_l[0],
+ 'getattr' : stats_l[1],
+ 'setattr' : stats_l[2],
+ 'lookup' : stats_l[3],
+ 'mkdir' : stats_l[4],
+ 'rmdir' : stats_l[5],
+ 'symlink' : stats_l[6],
+ 'readlink' : stats_l[7],
+ 'mknod' : stats_l[8],
+ 'unlink' : stats_l[9],
+ 'rename' : stats_l[10],
+ 'link' : stats_l[11],
+ 'readdir' : stats_l[12],
+ 'open' : stats_l[13],
+ 'read' : stats_l[14],
+ 'write' : stats_l[15],
+ },
+ } )
+ s.close()
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+
+ return servers
4 numpush/posix_io/__init__.py
@@ -0,0 +1,4 @@
+#from .sendfile import posix_sendfile
+#from .splice import posix_splice
+
+#__all__ = [posix_sendfile, posix_splice]
27 numpush/posix_io/platform_sendfile.h
@@ -0,0 +1,27 @@
+#if !defined(__APPLE__) && \
+ !defined(__FreeBSD__) && \
+ !defined(__linux__)
+#error invalid platform
+#endif
+
+#include <sys/uio.h>
+
+struct sf_hdtr {
+ struct iovec *headers;
+ int hdr_cnt;
+ struct iovec *trailers;
+ int trl_cnt;
+};
+
+#if defined(__FreeBSD__)
+#include <sys/types.h>
+#include <sys/socket.h>
+#define sendfile_bsd sendfile
+#define sendfile_linux (void *)
+#endif
+
+#if defined(__linux__)
+#include <sys/sendfile.h>
+#define sendfile_bsd (void *)
+#define sendfile_linux sendfile
+#endif
80 numpush/posix_io/sendfile.pyx
@@ -0,0 +1,80 @@
+#from libc.stdio cimport printf
+from libc.stdint cimport uint32_t, uint64_t
+
+import os
+import sys
+from mmap import PAGESIZE
+
+cdef extern from "platform_sendfile.h" nogil:
+ ctypedef signed off_t
+ ctypedef unsigned size_t
+ ctypedef signed ssize_t
+ struct sf_hdtr
+
+ ssize_t sendfile_linux(int out_fd, int in_fd, uint64_t *offset, size_t nbytes)
+ ssize_t sendfile_bsd(int fd, int s, uint64_t *offset, size_t len, sf_hdtr *hdtr, int flags)
+
+cdef extern from "errno.h" nogil:
+ enum: EAGAIN
+ #enum: EWOULDBLOCK
+ enum: EBADF
+ enum: EFAULT
+ enum: EINVAL
+ enum: EIO
+ enum: ENOMEM
+ enum: EBUSY
+
+ enum: SF_NODISKIO
+ enum: SF_MNOWAIT
+ enum: SF_SYNC
+ int errno
+
+
+cdef int _posix_sendfile(int fdout, int fdin, uint64_t *offset, size_t nbytes):
+
+ cdef int sent
+ cdef int err
+ global errno
+
+ if sys.platform == 'win32':
+ raise SystemError("Windows not supported")
+
+ # FreeBSD
+ if sys.platform == 'freebsd' or sys.platform == 'darwin':
+ with nogil:
+ ret = sendfile_bsd(fdout, fdin, offset, nbytes, NULL, 0)
+
+ if ret == -1:
+ err = 1
+ sent = -errno
+ errno = 0
+ else:
+ err = 0
+ sent = ret
+ return sent
+
+ ## Linux
+ else:
+ with nogil:
+ ret = sendfile_linux(fdout, fdin, offset, nbytes)
+
+ if ret == -1:
+ err = 1
+ sent = -errno
+ errno = 0
+ else:
+ err = 0
+ sent = ret
+ return sent
+
+def posix_sendfile(sock, fd, offset=0, nbytes=PAGESIZE):
+ cdef uint64_t c_offset = offset
+ cdef size_t c_count = nbytes
+ cdef int rc
+
+ rc = _posix_sendfile(sock.fileno(), fd.fileno(), &c_offset, nbytes)
+
+ if rc < 0:
+ raise OSError(os.strerror(-rc))
+ else:
+ return rc
60 numpush/posix_io/splice.pyx
@@ -0,0 +1,60 @@
+import os
+from libc.stdint cimport uint32_t, uint64_t
+from mmap import PAGESIZE
+
+cdef extern from "fcntl.h" nogil:
+ ctypedef unsigned size_t
+ ctypedef signed ssize_t
+
+ ssize_t splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_out, size_t len, unsigned int flags)
+
+ enum: SPLICE_F_MOVE
+ enum: SPLICE_F_NONBLOCK
+ enum: SPLICE_F_MORE
+ enum: SPLICE_F_GIFT
+
+cdef extern from "errno.h" nogil:
+ enum: SF_NODISKIO
+ enum: SF_MNOWAIT
+ enum: SF_SYNC
+ int errno
+
+cdef int _posix_splice(int fd_in, uint64_t *off_in, int fd_out, uint64_t *off_out, size_t nbytes, unsigned int flags):
+ cdef int sent
+ cdef int err
+ global errno
+
+ with nogil:
+ sts = splice(fd_in, off_in, fd_out, off_out, nbytes, flags)
+
+ if sts == -1:
+ err = 1
+ sent = -errno
+ errno = 0
+ else:
+ err = 0
+ sent = sts
+ return sent
+
+# Changed the signature a bit to match our common use case of 0,0
+# offsets
+def posix_splice(fd1, fd2, fd1_offset=0, fd2_offset=0, nbytes=PAGESIZE, flags=0):
+
+ if type(fd1) is int:
+ fd1 = os.fdopen(fd1, 'r')
+
+ if type(fd2) is int:
+ fd2 = os.fdopen(fd2, 'w')
+
+ cdef uint64_t c_fd1offset = fd1_offset
+ cdef uint64_t c_fd2offset = fd2_offset
+ cdef size_t c_count = nbytes
+ cdef int c_flags = flags
+ cdef int rc
+
+ rc = _posix_splice(fd1.fileno(), &c_fd1offset, fd2.fileno(), &c_fd2offset, nbytes, 0)
+
+ if rc < 0:
+ raise OSError(os.strerror(-rc))
+ else:
+ return rc
0 numpush/redis_store/__init__.py
No changes.
228 numpush/reductor.py
@@ -0,0 +1,228 @@
+from io import BytesIO
+from array import array
+from imp import new_module
+from StringIO import StringIO
+
+try:
+ from numpy import dtype, frombuffer
+except ImportError:
+ have_numpy = False
+
+try:
+ from pandas import DataFrame, TimeSeries
+except ImportError:
+ have_pandas = False
+
+try:
+ from theano.tensor import Tensor
+ have_theano = True
+except ImportError:
+ have_theano = False
+
+try:
+ from carray import carray
+ have_carray = True
+except ImportError:
+ have_carray = False
+
+try:
+ from nuemxpr.necompiler import NumExpr
+ have_numexpr = True
+except ImportError:
+ have_numexpr = False
+
+try:
+ from numba.translate import Translate
+ have_numba = True
+except ImportError:
+ have_numba = False
+
+try:
+ import cython
+ have_cython = True
+except ImportError:
+ have_cython = False
+
+try:
+ from llvm.core import Module
+ from llvm.ee import ExecutionEngine
+ from bitey.bind import make_all_wrappers
+ have_bitey = True
+except:
+ have_bitey = False
+
+
+# Numpy
+# =====
+
+def numpy_reduce(nd):
+ dtype_name = nd.dtype.name # string
+ shape = nd.values.shape # tuple
+
+ # Pandas ndarray
+ nd = nd
+ md = (shape, dtype_name)
+ return md, nd
+
+def numpy_reconstruct(md, nd):
+ index, columns, dtype_name, shape = md
+ buf = buffer(nd)
+
+ ndarray = frombuffer(buf, dtype=dtype(dtype_name)).reshape(shape)
+ return ndarray
+
+# Pandas
+# ======
+
+# You take a Panda and you squeeze it into a zero-copy buffer, it ain't
+# pretty.
+def pandas_reduce(df):
+ index = df.index.tolist() # list
+ columns = df.columns.tolist() # list
+ dtype_name = df.values.dtype.name # string
+ shape = df.values.shape # tuple
+
+ # Pandas ndarray
+ nd = df.values
+ md = (shape, dtype_name, index, columns)
+ return md, nd
+
+def pandas_reconstruct(md, nd):
+ shape, ndtype, index, columns = md
+ ndarray = frombuffer(nd, dtype=dtype(ndtype)).reshape(shape)
+ return DataFrame(data=ndarray, index=index,
+ columns=columns, dtype=None)
+
+def pandasts_reduce(df):
+ index = df.index.tolist() # list
+ ndtype = df.values.dtype.name # string
+ shape = df.values.shape # tuple
+
+ # Pandas ndarray
+ nd = buffer(df.values)
+ md = (shape, ndtype, index)
+ return md, nd
+
+def pandasts_reconstruct(md, nd):
+ index, columns, dtype_name, shape = md
+ buf = buffer(nd)
+
+ ndarray = frombuffer(buf, dtype=dtype(dtype_name)).reshape(shape)
+ return TimeSeries(data=ndarray, index=index,
+ columns=columns, dtype=dtype_name)
+
+# Array
+# =====
+
+def array_reduce(pd):
+ typecode = pd.typecode
+
+ # Pandas ndarray
+ nd = buffer(pd)
+ md = (typecode,)
+ return md, nd
+
+def array_reconstruct(md, nd):
+ typecode, = md
+ buf = buffer(nd)
+ return array(typecode, buf)
+
+# Theano
+# ======
+
+def tensor_reduce(ts):
+ pass
+
+def tensor_reconstruct(md, nd):
+ pass
+
+# NetworkX
+# ========
+
+def digraph_reduce(ts):
+ pass
+
+def digrpah_reconstruct(md, nd):
+ pass
+
+# ============
+# --- Code ---
+# ============
+
+# LLVM
+# =====
+
+def bitey_reduce(module):
+ if hasattr(module, '_llvm_module'):
+ name = module.__name__
+ bitcode = StringIO()
+ module.to_bitcode(bitcode)
+ return (name, bitcode)
+ else:
+ raise TypeError("Are you sure thats a LLVM backed Bitey module?")
+
+def bitey_reconstruct(md, bitcode):
+ name, = md
+
+ mod = new_module(name)
+ llvm_module = Module.from_bitcode(BytesIO(bitcode))
+ engine = ExecutionEngine.new(llvm_module)
+ make_all_wrappers(llvm_module, engine, mod)
+
+ return mod
+
+# Numba
+# =====
+
+LLVM = 0
+UFUNC = 1
+
+# This is a little hairy since Numba is still experimental.
+def numba_reduce(tr):
+ func = tr.func # PyCodeObject
+ ret_type = tr.ret_type # str
+ arg_types = tr.arg_types # str
+
+ return (func, ret_type, arg_types)
+
+def numba_reduce_bitcode(tr):
+ out = StringIO()
+ tr.mod.to_bitcode(out)
+ return (out,)
+
+# Returns a CFunctionType or numpy.ufunc
+def numba_reconstruct(md, func, otype=LLVM):
+ ret_type, arg_types = md
+ tr = Translate(func, ret_type, arg_types)
+ tr.translate()
+
+ if otype == 'llvm':
+ return tr.get_ctypes_func(llvm=True)
+ elif otype == 'ufunc':
+ return tr.make_ufunc(llvm=True)
+ else:
+ raise Exception("Unknown numba cast")
+
+# NumExpr
+# =======
+
+def numexpr_reduce(ne):
+ inputsig = ne.signature
+ tempsig = ne.tempsig
+ bytecode = ne.program
+ input_names = ne.input_names
+ constants = ne.constants
+
+ return (inputsig, tempsig, bytecode, constants, input_names)
+
+def numexpr_reconstruct(inputsig, tempsig, bitcode, constants, input_names):
+ return NumExpr(inputsig, tempsig, bitcode, constants, input_names)
+
+# Cython
+# ======
+
+def cython_reduce(cf):
+ pass
+
+def cython_reconstruct(cf):
+ pass
124 numpush/shmem.py
@@ -0,0 +1,124 @@
+from numpy import ndarray
+from pandas import DataFrame
+from functools import wraps
+
+from multiprocessing import RLock
+from multiprocessing.heap import BufferWrapper
+from multiprocessing.sharedctypes import SynchronizedBase
+
+def put_on_heap(na):
+ size = na.nbytes
+ wrapper = BufferWrapper(size)
+ address = wrapper.get_address()
+ block, size = wrapper._state
+ arena, start, new_stop = block
+
+ return (arena.buffer, address)
+
+def RawNumpy(array):
+ mmap, address = put_on_heap(array)
+ mmap_nd = ndarray.__new__(ndarray, array.shape, dtype=array.dtype,
+ buffer=mmap, offset=0, order='C')
+ mmap_nd[:] = array[:]
+ assert mmap_nd.ctypes.data == address
+ return mmap_nd
+
+def SynchronizedNumpy(array, lock=None):
+ mmap, address = put_on_heap(array)
+ mmap_nd = ndarray.__new__(
+ ndarray,
+ array.shape,
+ dtype = array.dtype,
+ buffer = mmap,
+ offset = 0,
+ order = 'C'
+ )
+ # Warning, this is a copy operation
+ # ---------------------------------
+ # Copy the values from the passed array into shared memory
+ # arena.
+ mmap_nd[:] = array[:]
+
+ assert mmap_nd.ctypes.data == address
+ return SynchronizedArray(mmap_nd, lock=lock)
+
+def sync(f):
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ self = args[0]
+ self.acquire()
+ f(*args, **kwargs)
+ self.release()
+ return wrapper
+
+class SynchronizedArray(SynchronizedBase):
+
+ def __init__(self, obj, lock=None):
+ self._obj = obj
+ self._lock = lock or RLock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
+ def __len__(self):
+ return len(self._obj)
+
+ def __getitem__(self, i):
+ self.acquire()
+ try:
+ return self._obj[i]
+ finally:
+ self.release()
+
+ def __setitem__(self, i, value):
+ self.acquire()
+ try:
+ self._obj[i] = value
+ finally:
+ self.release()
+
+ def __getslice__(self, start, stop):
+ self.acquire()
+ try:
+ return self._obj[start:stop]
+ finally:
+ self.release()
+
+ def __setslice__(self, start, stop, values):
+ self.acquire()
+ try:
+ self._obj[start:stop] = values
+ finally:
+ self.release()
+
+ def __iadd__(self, other):
+ with self._lock:
+ return self._obj.__iadd__(other)
+
+ def __imul__(self, other):
+ with self._lock:
+ return self._obj.__imul__(other)
+
+# Shared Memory Instances
+# -----------------------
+
+def SDataFrame(df, mutex=False, lock=None):
+ '''
+ Shared memory Pandas dataframe. Any number of processes can
+ write on it and access it across multiple cores.
+ '''
+ snd = RawNumpy(df.values)
+ index = df.index.tolist() # list
+ columns = df.columns.tolist() # list
+ return DataFrame(data=snd, index=index, columns=columns, dtype=None, copy=False)
+
+def SDiGraph(graph, mutex=False):
+ '''
+ Shared memory NetworkX graph.
+ '''
+ pass
+
+def STensor(tensor, mutex=False):
+ '''
+ Shared memory Theano tensor.
+ '''
+ pass
20 numpush/shmem_stat.py
@@ -0,0 +1,20 @@
+def shm_counters():
+ f = open("/proc/sysvipc/shm", "r")
+ try:
+ lines = f.readlines()
+ finally:
+ f.close()
+
+ retdict = {}
+ for line in lines[2:]:
+ fields = line.split()
+ shmid = int(fields[1])
+ perms = int(fields[2])
+ size = int(fields[3])
+ cpid = int(fields[4])
+ lpid = int(fields[5])
+ nattach = int(fields[6])
+ swap = int(fields[11])
+
+ retdict[shmid] = (perms, size, cpid, lpid, nattach, swap)
+ return retdict
82 numpush/tipc_net/tipc.pyx
@@ -0,0 +1,82 @@
+from libc.sys cimport socket
+from libc.stdint cimport uint32_t, uint64_t, ssize_t
+
+cdef extern from "linux/tipc.h":
+
+ enum: TIPC_OK
+ enum: TIPC_ERR_NO_NAM
+ enum: TIPC_ERR_NO_PORT
+ enum: TIPC_ERR_NO_NODE
+ enum: TIPC_ERR_OVERLOAD
+ enum: TIPC_CONN_SHUTDOWN
+
+ enum: TIPC_ADDR_NAMESEQ
+ enum: TIPC_ADDR_NAME
+ enum: TIPC_ADDR_ID
+ enum: TIPC_ZONE_SCOPE
+ enum: TIPC_CLUSTER_SCOPE
+ enum: TIPC_NODE_SCOPE
+ enum: TIPC_IMPORTANCE
+ enum: TIPC_SRC_DROPPABLE
+ enum: TIPC_DEST_DROPPABLE
+ enum: TIPC_CONN_TIMEOUT
+ enum: TIPC_LOW_IMPORTANCE
+ enum: TIPC_MEDIUM_IMPORTANCE
+ enum: TIPC_HIGH_IMPORTANCE
+ enum: TIPC_CRITICAL_IMPORTANCE
+ enum: TIPC_SUB_PORTS
+ enum: TIPC_SUB_SERVICE
+ enum: TIPC_SUB_CANCEL
+ enum: TIPC_WAIT_FOREVER
+ enum: TIPC_PUBLISHED
+ enum: TIPC_WITHDRAWN
+ enum: TIPC_SUBSCR_TIMEOUT
+ enum: TIPC_CFG_SRV
+ enum: TIPC_TOP_SRV
+
+ ctypedef struct tipc_portid:
+ uint32_t ref
+ uint32_t node
+
+ ctypedef struct tipc_name:
+ uint32_t type
+ uint32_t instance
+
+ ctypedef struct tipc_name_seq:
+ uint32_t type
+ uint32_t lower
+ uint32_t upper
+
+ ctypedef struct tipc_subscr
+ struct tipc_name_seq seq
+ uint32_t timeout
+ uint32_t filter
+ char usr_handle[8]
+
+cdef extern from "arpa/inet.h" nogil:
+ pass
+
+cdef extern from "errno.h" nogil:
+ enum: EAGAIN
+ enum: EWOULDBLOCK
+ enum: EBADF
+ enum: EFAULT
+ enum: EINVAL
+ enum: EIO
+ enum: ENOMEM
+ enum: EBUSY
+
+ enum: SF_NODISKIO
+ enum: SF_MNOWAIT
+ enum: SF_SYNC
+ int errno
+
+ctypedef struct sockaddr:
+ pass
+
+cdef int tipc_socket_bind(sockaddr *address)
+ cdef int fd
+ sa_tipc.addrtype = TIPC_ADDR_NAMESEQ
+
+ fd = socket(AF_TIPC, opts.socktype, opts.protocol)
+ return fd
117 numpush/zmq_net.py
@@ -0,0 +1,117 @@
+import zmq
+import ctypes
+import reductor
+
+import numpy as np
+from numpy import ndarray, array
+
+from pandas import DataFrame
+#from theano.tensor import Tensor
+from collections import namedtuple
+from datastructures import ReverseLookupDict
+
+try:
+ import msgpack as srl
+except ImportError:
+ import cPickle as srl
+
+numpy_metadata = namedtuple('metadata', 'shape dtype')
+tensor_metadata = namedtuple('metadata', 'shape dtype')
+pandas_metadata = namedtuple('metadata', 'shape dtype index columns')
+
+# Pad the wire protocol so that we can recognize types on the receiving
+# side. The most common numpy data transfers are likely 1 or 2
+# dimensional arrays, so we have special bit flag for those to avoid
+# serializing metadata.
+
+ctypes_numpy = ReverseLookupDict({
+ ctypes.c_char : np.int8,
+ ctypes.c_wchar : np.int16,
+ ctypes.c_byte : np.int8,
+ ctypes.c_ubyte : np.uint8,
+ ctypes.c_short : np.int16,
+ ctypes.c_ushort : np.uint16,
+ ctypes.c_int : np.int32,
+ ctypes.c_uint : np.int32,
+ ctypes.c_long : np.int32,
+ ctypes.c_ulong : np.int32,
+ ctypes.c_float : np.float32,
+ ctypes.c_double : np.float64
+})
+
+NUMPYND = b'\x00\x01'
+NUMPY1D = b'\x00\x02'
+NUMPY2D = b'\x00\x03'
+PANDAS = b'\x00\x04'
+PANDASTS = b'\x00\x05'
+NETWORKX = b'\x00\x06'
+THEANO = b'\x00\x07'
+
+type_coercions = {
+ array : NUMPYND,
+ ndarray : NUMPYND,
+ DataFrame : PANDAS,
+ #Tensor : THEANO,
+}
+
+class CannotCoerce(Exception):
+ def __init__(self, obj):
+ self.unknown_type = type(obj)
+
+ def __str__(self):
+ return "Don't know how encode type %s over ZMQ" % ( self.unknown_type)
+
+def send_numpy(self, magic, obj, flags=0):
+ numpy_metadata, narray = reductor.numpy_reduce(obj)
+ self.send(magic, flags|zmq.SNDMORE)
+ self.send(srl.dumps(numpy_metadata), flags|zmq.SNDMORE)
+ return self.send(narray, flags, copy=False, track=False)
+
+def recv_numpy(self, flags=0, copy=True, track=False):
+ mdload = srl.loads(self.recv(flags=flags))
+ md = numpy_metadata(*mdload)
+ nd = self.recv(flags=flags)
+ return reductor.numpy_reconstruct(md, nd)
+
+def send_pandas(self, magic, obj, flags=0):
+ pandas_metadata, narray = reductor.pandas_reduce(obj)
+ self.send(magic, flags|zmq.SNDMORE)
+ self.send(srl.dumps(pandas_metadata), flags|zmq.SNDMORE)
+ return self.send(narray, flags, copy=False, track=False)
+
+def recv_pandas(self, flags=0, copy=True, track=False):
+ mdload = srl.loads(self.recv(flags=flags))
+ md = pandas_metadata(*mdload)
+ nd = self.recv(flags=flags)
+ return reductor.pandasts_reconstruct(md, nd)
+
+def send_tensor(self, magic, obj, flags=0):
+ tensor_metadata, narray = reductor.tensor_reduce(obj)
+ self.send(magic, flags|zmq.SNDMORE)
+ self.send(srl.dumps(tensor_metadata), flags|zmq.SNDMORE)
+ return self.send(narray, flags, copy=False, track=False)
+
+def recv_tensor(self, flags=0, copy=True, track=False):
+ mdload = srl.loads(self.recv(flags=flags))
+ md = tensor_metadata(*mdload)
+ nd = self.recv(flags=flags)
+ return reductor.tensor_reconstruct(md, nd)
+
+# Polymorphic ZMQ socket mixins for all supported scientific types
+def numsend(self, obj, **kwargs):
+ magic = type_coercions.get(type(obj))
+ if magic == NUMPYND:
+ send_numpy(self, magic, obj, **kwargs)
+ elif magic == PANDAS:
+ send_pandas(self, magic, obj, **kwargs)
+ else:
+ raise CannotCoerce(obj)
+
+def numrecv(self, **kwargs):
+ magic = self.recv()
+ if magic in [NUMPYND, NUMPY1D, NUMPY2D]:
+ return recv_numpy(self, **kwargs)
+ elif magic == PANDAS:
+ return recv_pandas(self, **kwargs)
+ else:
+ raise Exception("Unknown wire protocol")
5 requirements.txt
@@ -0,0 +1,5 @@
+nose==1.1.2
+numpy>=1.6.2
+Cython==0.16
+pyzmq==2.1.11
+msgpack-python==0.1.12
102 setup.py
@@ -0,0 +1,102 @@
+import os
+import shutil
+import numpy as np
+from glob import glob
+from os.path import join as pjoin
+
+from distutils.core import setup, Command
+from distutils.extension import Extension
+from Cython.Distutils import build_ext
+
+extensions = [
+ Extension(
+ "numpush.posix_io.splice",
+ ["numpush/posix_io/splice.pyx"],
+ include_dirs=[],
+ ),
+ Extension(
+ "numpush.posix_io.sendfile",
+ ["numpush/posix_io/sendfile.pyx"],
+ include_dirs=[],
+ ),
+ Extension("numpush.moose_store.moose",
+ ["numpush/moose_store/moose.pyx"],
+ include_dirs=[],
+ ),
+]
+
+def find_packages():
+ packages = []
+ for dir,subdirs,files in os.walk('numpush'):
+ package = dir.replace(os.path.sep, '.')
+ if '__init__.py' not in files:
+ # not a package
+ continue
+ packages.append(package)
+ return packages
+
+class CleanCommand(Command):
+ """Custom distutils command to clean the .so and .pyc files."""
+
+ user_options = [ ]
+
+ def initialize_options(self):
+ self._clean_me = []
+ self._clean_trees = []
+ for root, dirs, files in list(os.walk('numpush')):
+ for f in files:
+ if os.path.splitext(f)[-1] in ('.pyc', '.so', '.o', '.pyd'):
+ self._clean_me.append(pjoin(root, f))
+ for d in dirs:
+ if d == '__pycache__':
+ self._clean_trees.append(pjoin(root, d))
+
+ for d in ('build',):
+ if os.path.exists(d):
+ self._clean_trees.append(d)
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ for clean_me in self._clean_me:
+ try:
+ os.unlink(clean_me)
+ except Exception:
+ pass
+ for clean_tree in self._clean_trees:
+ try:
+ shutil.rmtree(clean_tree)
+ except Exception:
+ pass
+
+#-----------------------------------------------------------------------------
+# Main setup
+#-----------------------------------------------------------------------------
+
+long_desc = \
+"""
+"""
+
+setup(
+ name = "numpush",
+ version = '0.0.1dev',
+ packages = find_packages(),
+ ext_modules = extensions,
+ package_data = {},
+ author = "Stephen Diehl",
+ author_email = "stephen.m.diehl@gmail.com",
+ url = 'http://github.com/sdiehl/numpush',
+ download_url = 'http://github.com/sdiehl/numpush/downloads',
+ description = "Distributed data/code push for Numpy derivative structures",
+ long_description = long_desc,
+ license = "MIT",
+ cmdclass = {'build_ext': build_ext, 'clean': CleanCommand},
+ classifiers = [
+ 'Intended Audience :: Developers',
+ 'Intended Audience :: Science/Research',
+ 'Operating System :: POSIX',
+ 'Topic :: System :: Networking',
+ 'Programming Language :: Python :: 2.7',
+ ]
+)

0 comments on commit 456b504

Please sign in to comment.
Something went wrong with that request. Please try again.