Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
===========
mog-commons
===========
==================
mog-commons-python
==================

Common utility library for Python

Expand Down Expand Up @@ -30,8 +30,6 @@ Dependencies

* Python: 2.6 / 2.7 / 3.2 / 3.3 / 3.4
* six
* python-dateutil
* pytz
* unittest2

------------
Expand Down
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ def get_version():
url='https://github.com/mogproject/mog-commons-python',
install_requires=[
'six',
'python-dateutil',
'pytz',
'unittest2',
# 'mock',
],
tests_require=[
],
Expand Down
76 changes: 76 additions & 0 deletions src/mog_commons/case_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import division, print_function, absolute_import, unicode_literals

import six


class CaseClass(object):
"""
Implementation like 'case class' in Scala language

This class can order if all the element can order.

Example:
class Coord(CaseClass):
def __init__(self, x, y):
super(Coord, self).__init__(('x', x), ('y', y))

a = Coord(123, 45)
a.x # 123
a.y # 45
str(a) # 'Coord(x=123, y=45)'
b = a.copy(y=54)
str(b) # 'Coord(x=123, y=54)'
a < b # True
"""

def __init__(self, *args, **kwargs):
"""
:param args: list of tuple of field key and value
:param kwargs: specify field key with value
Note that keys do NOT keep order. The order is rearranged in lexicographical.
"""
keys = []
for k, v in list(args) + sorted(kwargs.items()):
if not isinstance(k, six.string_types):
raise TypeError('Field key must be a string: %s' % k)
if k in keys:
raise ValueError('Found duplicate key name: %s' % k)
keys.append(k)
setattr(self, k, v)
self._keys = keys

def __cmp__(self, other):
if not isinstance(other, self.__class__):
raise TypeError('unorderable types: %s() < %s()' % (self.__class__.__name__, other.__class__.__name__))

for k in self._keys:
a, b = getattr(self, k), getattr(other, k)
if a is not None or b is not None:
if a < b:
return -1
if a > b:
return 1
return 0

def __lt__(self, other):
return self.__cmp__(other) < 0

def __eq__(self, other):
if not isinstance(other, self.__class__):
return False

for k in self._keys:
a, b = getattr(self, k), getattr(other, k)
if a is not None or b is not None:
if a != b:
return False
return True

def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % (k, getattr(self, k)) for k in self._keys))

def values(self):
"""
:return: key-value dict : { string: any }
"""
return dict((k, getattr(self, k)) for k in self._keys)
21 changes: 21 additions & 0 deletions src/mog_commons/collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import division, print_function, absolute_import, unicode_literals

import six


def get_single_item(d):
"""Get an item from a dict which contains just one item."""
assert len(d) == 1, 'Single-item dict must have just one item, not %d.' % len(d)
return next(six.iteritems(d))


def get_single_key(d):
"""Get a key from a dict which contains just one item."""
assert len(d) == 1, 'Single-item dict must have just one item, not %d.' % len(d)
return next(six.iterkeys(d))


def get_single_value(d):
"""Get a value from a dict which contains just one item."""
assert len(d) == 1, 'Single-item dict must have just one item, not %d.' % len(d)
return next(six.itervalues(d))
111 changes: 111 additions & 0 deletions src/mog_commons/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import division, print_function, absolute_import, unicode_literals

import sys
import os
import errno
import subprocess
from mog_commons.string import is_unicode
from mog_commons.functional import oget


#
# Process operations
#
def __convert_args(args, shell, cmd_encoding):
xs = []
if shell:
args = [subprocess.list2cmdline(args)]
if shell and sys.version_info[:2] == (3, 2) and not sys.platform == 'win32':
# Note: workaround for http://bugs.python.org/issue8513
xs = ['/bin/sh', '-c']
shell = False
for a in args:
assert is_unicode(a), 'cmd must be unicode string, not %s' % type(a).__name__
xs.append(a.encode(cmd_encoding))
return xs, shell


def execute_command(args, shell=False, cwd=None, env=None, stdin=None, stdout=None, stderr=None, cmd_encoding='utf-8'):
"""
Execute external command
:param args: command line arguments : [unicode]
:param shell: True when using shell : boolean
:param cwd: working directory : string
:param env: environment variables : dict
:param stdin: standard input
:param stdout: standard output
:param stderr: standard error
:param cmd_encoding: command line encoding: string
:return: return code
"""
args, shell = __convert_args(args, shell, cmd_encoding)
return subprocess.call(args=args, shell=shell, cwd=cwd, env=dict(os.environ, **(oget(env, {}))),
stdin=stdin, stdout=stdout, stderr=stderr)


def capture_command(args, shell=False, cwd=None, env=None, stdin=None, cmd_encoding='utf-8'):
"""
Execute external command and capture output
:param args: command line arguments : [string]
:param shell: True when using shell : boolean
:param cwd: working directory : string
:param env: environment variables : dict
:param stdin: standard input
:param cmd_encoding: command line encoding: string
:return: tuple of return code, stdout data and stderr data
"""
args, shell = __convert_args(args, shell, cmd_encoding)
p = subprocess.Popen(
args, shell=shell, cwd=cwd, env=dict(os.environ, **(oget(env, {}))),
stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_data, stderr_data = p.communicate()
return p.returncode, stdout_data, stderr_data


def execute_command_with_pid(args, pid_file=None, shell=False, cwd=None, env=None,
stdin=None, stdout=None, stderr=None, cmd_encoding='utf-8'):
if pid_file is None:
return execute_command(args, shell, cwd, env, stdin, stdout, stderr, cmd_encoding)
else:
try:
args, shell = __convert_args(args, shell, cmd_encoding)
p = subprocess.Popen(
args, shell=shell, cwd=cwd, env=dict(os.environ, **(oget(env, {}))),
stdin=stdin, stdout=stdout, stderr=stderr)
with open(pid_file, 'w') as f:
f.write(str(p.pid))
ret = p.wait()
finally:
# clean up pid file
if pid_file is not None and os.path.exists(pid_file):
os.remove(pid_file)
return ret


def pid_exists(pid):
# stole from https://github.com/giampaolo/psutil/blob/master/psutil/_psposix.py
"""Check whether pid exists in the current process table."""
if pid == 0:
# According to "man 2 kill" PID 0 has a special meaning:
# it refers to <<every process in the process group of the
# calling process>> so we don't want to go any further.
# If we get here it means this UNIX platform *does* have
# a process with id 0.
return True
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH) therefore we should never get
# here. If we do let's be explicit in considering this
# an error.
raise err
else:
return True
19 changes: 19 additions & 0 deletions src/mog_commons/functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import division, print_function, absolute_import, unicode_literals


#
# functions for handling optional values
#
def omap(function, optional):
"""Map optional value"""
return None if optional is None else function(optional)


def oget(optional, default=None):
"""Get optional value or default value"""
return default if optional is None else optional


def ozip(*optionals):
"""Zip optional values. Return None if one value or the other is None."""
return None if any(x is None for x in optionals) else tuple(optionals)
23 changes: 23 additions & 0 deletions src/mog_commons/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import division, print_function, absolute_import, unicode_literals

import sys
from mog_commons.string import is_unicode, to_bytes, to_unicode


def print_safe(str_or_bytes, encoding='utf-8', errors='ignore', output=sys.stdout, newline='\n'):
"""
Print unicode or bytes universally.

:param str_or_bytes: string
:param encoding: encoding
:param output: output file handler
:param errors: error handling scheme. Refer to codecs.register_error.
"""
writer = output.buffer if hasattr(output, 'buffer') else output

# When the input type is bytes, verify it can be decoded with the specified encoding.
decoded = str_or_bytes if is_unicode(str_or_bytes) else to_unicode(str_or_bytes, encoding, errors)
encoded = to_bytes(decoded, encoding, errors)

writer.write(encoded + to_bytes(newline, encoding, errors))
output.flush()
34 changes: 34 additions & 0 deletions src/mog_commons/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,40 @@ def to_unicode(s, encoding=None, errors='strict'):
return str(s)


def to_str(s, encoding=None, errors='strict'):
"""
Make str from any value
:param s:
:param encoding:
:param errors:
:return: str (not unicode in Python2, nor bytes in Python3)
"""
encoding = encoding or 'utf-8'

if is_strlike(s):
if six.PY2:
return s.encode(encoding, errors) if isinstance(s, unicode) else s
else:
return s.decode(encoding, errors) if isinstance(s, bytes) else s
else:
return str(s)


def to_bytes(s, encoding=None, errors='strict'):
"""Convert string to bytes."""
encoding = encoding or 'utf-8'

if is_unicode(s):
return s.encode(encoding, errors)
elif is_strlike(s):
return s
else:
if six.PY2:
return str(s)
else:
return str(s).encode(encoding, errors)


def edge_just(left, right, width, fillchar=' '):
padding = fillchar * max(1, width - unicode_width(left + right))
return left + padding + right
Expand Down
32 changes: 28 additions & 4 deletions src/mog_commons/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,42 @@
else:
import unittest as base_unittest

from mog_commons.string import to_bytes


class StringBuffer(object):
"""
Replace for six.StringIO

We don't use StringIO because there are many differences between PY2 and PY3.
"""
def __init__(self, init_buffer=None):
self._buffer = init_buffer or b''

def write(self, s, encoding='utf-8', errors='strict'):
self._buffer += to_bytes(s, encoding, errors)

def writelines(self, lines, encoding='utf-8', errors='strict'):
self._buffer += b''.join(to_bytes(s, encoding, errors) for s in lines)

def flush(self):
"""do nothing"""

def getvalue(self, encoding='utf-8', errors='strict'):
return self._buffer.decode(encoding, errors)


class TestCase(base_unittest.TestCase):
def assertRaisesRegexp(self, expected_exception, expected_regexp, callable_obj=None, *args, **kwargs):
"""Accept difference of the function name between PY2 and PY3."""
f = base_unittest.TestCase.assertRaisesRegex if six.PY3 else base_unittest.TestCase.assertRaisesRegexp
f(self, expected_exception, expected_regexp, callable_obj, *args, **kwargs)

def assertOutput(self, expected_stdout, expected_stderr, function):
def assertOutput(self, expected_stdout, expected_stderr, function, encoding='utf-8'):
with self.withOutput() as (out, err):
function()
self.assertMultiLineEqual(out.getvalue(), expected_stdout)
self.assertMultiLineEqual(err.getvalue(), expected_stderr)
self.assertMultiLineEqual(out.getvalue(encoding), expected_stdout)
self.assertMultiLineEqual(err.getvalue(encoding), expected_stderr)

@contextmanager
def withOutput(self):
Expand All @@ -32,7 +56,7 @@ def withOutput(self):
(do main logic)
(verify out.getvalue() or err.getvalue())
"""
new_out, new_err = six.StringIO(), six.StringIO()
new_out, new_err = StringBuffer(), StringBuffer()
old_out, old_err = sys.stdout, sys.stderr

try:
Expand Down
Loading