Skip to content

Commit

Permalink
Remove redundant code to increase coverage(#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored and qinxuye committed Dec 20, 2018
1 parent c09c37b commit 2ff2cfe
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 238 deletions.
2 changes: 1 addition & 1 deletion mars/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def queue_get(q):
def queue_get(q):
return q.get()

from ..lib.lib_utils import isvalidattr, dir2, raise_exc, getargspec, getfullargspec
from ..lib.lib_utils import isvalidattr, dir2, getargspec, getfullargspec

from ..lib.six.moves import reduce, zip_longest
from ..lib.six.moves import reload_module
Expand Down
5 changes: 2 additions & 3 deletions mars/lib/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import itertools
import time

from ..lib_utils import raise_exc

__author__ = 'Brian Quinlan (brian@sweetapp.com)'

FIRST_COMPLETED = 'FIRST_COMPLETED'
Expand Down Expand Up @@ -357,8 +355,9 @@ def done(self):
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

def __get_result(self):
from ...compat import six
if self._exception:
raise_exc(type(self._exception), self._exception, self._traceback)
six.reraise(type(self._exception), self._exception, self._traceback)
else:
return self._result

Expand Down
15 changes: 3 additions & 12 deletions mars/lib/lib_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 1999-2018 Alibaba Group Holding Ltd.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -92,15 +92,6 @@ def dir2(obj):
return sorted(list(attrs))


if tuple(sys.version_info[:2]) < (3, 5):
def raise_exc(ex_type, ex_value, tb):
glb = dict(ex_type=ex_type, ex=ex_value, tb=tb)
six.exec_('raise ex_type, ex, tb', glb, locals())
else:
def raise_exc(ex_type, ex_value, tb):
raise ex_value


if hasattr(inspect, 'signature'):
FullArgSpec = namedtuple('FullArgSpec',
'args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations')
Expand Down
211 changes: 4 additions & 207 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

import base64
import collections
import functools
import inspect
import json
Expand All @@ -25,58 +24,24 @@
import random
import socket
import struct
import subprocess
import sys
import time
import uuid
from collections import deque
from hashlib import md5
from datetime import date, datetime, timedelta

import numpy as np

from .compat import six, irange, functools32, getargspec
from .utils_c import *
from .compat import irange, functools32, getargspec
from .utils_c import to_binary, to_str, to_text, tokenize

logger = logging.getLogger(__name__)
random.seed(int(time.time()) * os.getpid())


if 'to_binary' not in globals():
def to_binary(text, encoding='utf-8'):
if text is None:
return text
if isinstance(text, six.text_type):
return text.encode(encoding)
elif isinstance(text, (six.binary_type, bytearray)):
return bytes(text)
else:
return str(text).encode(encoding) if six.PY3 else str(text)


if 'to_text' not in globals():
def to_text(binary, encoding='utf-8'):
if binary is None:
return binary
if isinstance(binary, (six.binary_type, bytearray)):
return binary.decode(encoding)
elif isinstance(binary, six.text_type):
return binary
else:
return str(binary) if six.PY3 else str(binary).decode(encoding)


if 'to_str' not in globals():
def to_str(text, encoding='utf-8'):
return to_text(text, encoding=encoding) if six.PY3 else to_binary(text, encoding=encoding)


def build_id(prefix=''):
return prefix + '-' + str(uuid.uuid1())
tokenize = tokenize


# fix encoding conversion problem under windows
if sys.platform == 'win32':
if sys.platform == 'win32': # pragma: no cover
def _replace_default_encoding(func):
def _fun(s, encoding=None):
encoding = encoding or getattr(sys.stdout, 'encoding', None) or 'mbcs'
Expand All @@ -91,105 +56,6 @@ def _fun(s, encoding=None):
to_str = _replace_default_encoding(to_str)


if 'tokenize' not in globals():
def tokenize(*args, **kwargs):
try:
import numpy as np

def h_numpy(ob):
if not ob.shape:
return str(ob), ob.dtype
if hasattr(ob, 'mode') and getattr(ob, 'filename', None):
if hasattr(ob.base, 'ctypes'):
offset = (ob.ctypes.get_as_parameter().value -
ob.base.ctypes.get_as_parameter().value)
else:
offset = 0 # root memmap's have mmap object as base
return (ob.filename, os.path.getmtime(ob.filename), ob.dtype,
ob.shape, ob.strides, offset)
if ob.dtype.hasobject:
try:
data = md5('-'.join(ob.flat).encode('utf-8')).hexdigest()
except TypeError:
data = md5(b'-'.join([six.text_type(item).encode('utf-8') for item in
ob.flat])).hexdigest()
else:
try:
data = md5(ob.ravel().view('i1').data).hexdigest()
except (BufferError, AttributeError, ValueError):
data = md5(ob.copy().ravel().view('i1').data).hexdigest()
return data, ob.dtype, ob.shape, ob.strides
except ImportError:
np = None
h_numpy = None

def h(ob):
try:
return h_non_iterative(ob)
except TypeError:
if isinstance(ob, dict):
return list, h_iterative(sorted(list(ob.items()), key=str))
if isinstance(ob, set):
return list, h_iterative(sorted(ob, key=str))
if isinstance(ob, (tuple, list)):
return type(ob), h_iterative(ob)
raise TypeError('Cannot generate token for %s, type: %s' % (ob, type(ob)))

def h_iterative(ob):
nested = deque(ob)
h_list = []
dq = deque()
while nested or dq:
x = dq.pop() if dq else nested.popleft()
if isinstance(x, (list, tuple)):
dq.extend(reversed(x))
else:
h_list.append(h_non_iterative(x))
return h_list

def h_non_iterative(ob):
if isinstance(ob, (int, float, str, six.text_type, bytes,
type(None), type, slice, date, datetime, timedelta)):
return ob
if isinstance(ob, six.integer_types+(complex,)):
return ob
if hasattr(ob, 'key'):
return ob.key
# numpy relative
if h_numpy and isinstance(ob, np.ndarray):
return h_numpy(ob)
if np and isinstance(ob, (np.dtype, np.generic)):
return repr(ob)

raise TypeError

def h_old(ob):
if isinstance(ob, (int, float, str, six.text_type, bytes,
type(None), type, slice, date, datetime, timedelta)):
return ob
if isinstance(ob, six.integer_types+(complex,)):
return ob
if isinstance(ob, dict):
return h_old(sorted(list(ob.items()), key=str))
if isinstance(ob, (tuple, list)):
return type(ob), list(h_old(it) for it in ob)
if isinstance(ob, set):
return h_old(sorted(ob, key=str))
if hasattr(ob, 'key'):
return ob.key
# numpy relative
if h_numpy and isinstance(ob, np.ndarray):
return h_numpy(ob)
if np and isinstance(ob, (np.dtype, np.generic)):
return repr(ob)

raise TypeError('Cannot generate token for %s, type: %s' % (ob, type(ob)))

if kwargs:
args = args + (kwargs,)
return md5(str([h_old(arg) for arg in args]).encode('utf-8')).hexdigest()


class AttributeDict(dict):
def __getattr__(self, item):
try:
Expand All @@ -199,25 +65,6 @@ def __getattr__(self, item):
"'AttributeDict' object has no attribute {0}".format(item))


def hashable(obj):
if isinstance(obj, six.string_types):
items = obj
elif isinstance(obj, slice):
items = (obj.start, obj.stop, obj.step)
elif isinstance(obj, collections.Mapping):
items = type(obj)((k, hashable(v)) for k, v in six.iteritems(obj))
elif isinstance(obj, collections.Iterable):
items = tuple(hashable(item) for item in obj)
elif isinstance(obj, collections.Hashable):
items = obj
elif hasattr(obj, 'key'):
items = obj.key
else:
raise TypeError(type(obj))

return items


def on_serialize_shape(shape):
if shape:
return tuple(s if not np.isnan(s) else -1 for s in shape)
Expand Down Expand Up @@ -344,56 +191,6 @@ def __get__(self, obj, owner):
return self.f(owner)


class PlasmaProcessHelper(object):
def __init__(self, base_directory=None, proc_name=None, size=1024 ** 3,
socket='/tmp/plasma', one_mapped_file=False, mount_point=None, huge_pages=False):
import pyarrow

self._proc_name = proc_name
self._size = size
self._socket = socket
self._mount_point = mount_point
self._enable_huge_pages = huge_pages
self._one_mapped_file = one_mapped_file

if not base_directory:
base_directory = pyarrow.__path__[0]
self._base_dir = base_directory

if proc_name is None:
for pname in ('plasma_store', 'plasma_store_server'):
if sys.platform == 'win32':
pname += '.exe'
if os.path.exists(os.path.join(self._base_dir, pname)):
self._proc_name = pname
break

self._process = None

def run(self, proc_args=None):
proc_args = proc_args or []
if self._proc_name is None:
raise RuntimeError('Plasma store not found.')
args = [os.path.join(self._base_dir, self._proc_name), '-m', str(self._size),
'-s', self._socket]
if self._mount_point:
args.extend(['-d', self._mount_point])
if self._enable_huge_pages:
args.append('-h')
if self._one_mapped_file:
args.append('-f')
args.extend(proc_args)

daemon = subprocess.Popen(args)
logger.debug('Started %d' % daemon.pid)
logger.debug('Params: %s' % args)
time.sleep(2)
self._process = daemon

def stop(self):
self._process.kill()


def serialize_graph(graph):
return base64.b64encode(graph.to_pb().SerializeToString())

Expand Down

0 comments on commit 2ff2cfe

Please sign in to comment.