Skip to content
Branch: master
Find file Copy path
Find file Copy path
1 contributor

Users who have contributed to this file

442 lines (365 sloc) 14.2 KB
from __future__ import print_function
import os
import re
import math
import pathlib
import logging
import argparse
import warnings
import magic
import functools
from ..utils.log import getLogger
from .. import LOCAL_FS_ENCODING
if hasattr(os, "fwalk"):
os_walk = functools.partial(os.fwalk, follow_symlinks=True)
def os_walk_unpack(w):
return w[0:3]
os_walk = functools.partial(os.walk, followlinks=True)
def os_walk_unpack(w):
return w
log = getLogger(__name__)
ID3_MIME_TYPE = "application/x-id3"
ID3_MIME_TYPE_EXTENSIONS = (".id3", ".tag")
class MagicTypes(magic.Magic):
def __init__(self):
magic.Magic.__init__(self, mime=True, mime_encoding=False,
def guess_type(self, filename):
if os.path.splitext(filename)[1] in ID3_MIME_TYPE_EXTENSIONS:
return ID3_MIME_TYPE
return self.from_file(filename)
except UnicodeEncodeError:
return self.from_file(filename.encode("utf-8", 'surrogateescape'))
_mime_types = MagicTypes()
def guessMimetype(filename, with_encoding=False):
"""Return the mime-type for ``filename``. If ``with_encoding`` is True
the encoding is included and a 2-tuple is returned, (mine, enc)."""
filename = str(filename) if isinstance(filename, pathlib.Path) else filename
mime = _mime_types.guess_type(filename)
if not with_encoding:
return mime
warnings.warn("File character encoding no lopng return, value is None",
UserWarning, stacklevel=2)
return mime, None
def walk(handler, path, excludes=None, fs_encoding=LOCAL_FS_ENCODING):
"""A wrapper around os.walk which handles exclusion patterns and multiple
path types (str, pathlib.Path, bytes).
if isinstance(path, pathlib.Path):
path = str(path)
path = str(path, fs_encoding) if type(path) is not str else path
excludes = excludes if excludes else []
excludes_re = []
for e in excludes:
def _isExcluded(_p):
for ex in excludes_re:
match = ex.match(_p)
if match:
return True
return False
if not os.path.exists(path):
raise IOError("file not found: %s" % path)
elif os.path.isfile(path) and not _isExcluded(path):
# If not given a directory, invoke the handler and return
for root, dirs, files in [os_walk_unpack(w) for w in os_walk(path)]:
root = root if type(root) is str else str(root, fs_encoding)
for f in files:
f = f if type(f) is str else str(f, fs_encoding)
f = os.path.abspath(os.path.join(root, f))
if not _isExcluded(f):
except StopIteration:
if files:
handler.handleDirectory(root, files)
class FileHandler(object):
"""A handler interface for :func:`eyed3.utils.walk` callbacks."""
def handleFile(self, f):
"""Called for each file walked. The file ``f`` is the full path and
the return value is ignored. If the walk should abort the method should
raise a ``StopIteration`` exception."""
def handleDirectory(self, d, files):
"""Called for each directory ``d`` **after** ``handleFile`` has been
called for each file in ``files``. ``StopIteration`` may be raised to
halt iteration."""
def handleDone(self):
"""Called when there are no more files to handle."""
def _requireArgType(arg_type, *args):
arg_indices = []
kwarg_names = []
for a in args:
if type(a) is int:
assert(arg_indices or kwarg_names)
def wrapper(fn):
def wrapped_fn(*args, **kwargs):
for i in arg_indices:
if i >= len(args):
# The ith argument is not there, as in optional arguments
if args[i] is not None and not isinstance(args[i], arg_type):
raise TypeError("%s(argument %d) must be %s" %
(fn.__name__, i, str(arg_type)))
for name in kwarg_names:
if (name in kwargs and kwargs[name] is not None and
not isinstance(kwargs[name], arg_type)):
raise TypeError("%s(argument %s) must be %s" %
(fn.__name__, name, str(arg_type)))
return fn(*args, **kwargs)
return wrapped_fn
return wrapper
def requireUnicode(*args):
"""Function decorator to enforce str/unicode argument types.
``None`` is a valid argument value, in all cases, regardless of not being
unicode. ``*args`` Positional arguments may be numeric argument index
values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode)
or keyword argument names (requireUnicode("title")) or a combination
return _requireArgType(str, *args)
def requireBytes(*args):
"""Function decorator to enforce byte string argument types.
return _requireArgType(bytes, *args)
def formatTime(seconds, total=None, short=False):
Format ``seconds`` (number of seconds) as a string representation.
When ``short`` is False (the default) the format is:
Otherwise, the format is exacly 6 characters long and of the form:
1w 3d
2d 4h
1h 5m
1m 4s
If ``total`` is not None it will also be formatted and
appended to the result seperated by ' / '.
seconds = round(seconds)
def time_tuple(ts):
if ts is None or ts < 0:
ts = 0
hours = ts / 3600
mins = (ts % 3600) / 60
secs = (ts % 3600) % 60
tstr = '%02d:%02d' % (mins, secs)
if int(hours):
tstr = '%02d:%s' % (hours, tstr)
return (int(hours), int(mins), int(secs), tstr)
if not short:
hours, mins, secs, curr_str = time_tuple(seconds)
retval = curr_str
if total:
hours, mins, secs, total_str = time_tuple(total)
retval += ' / %s' % total_str
return retval
units = [
(u'y', 60 * 60 * 24 * 7 * 52),
(u'w', 60 * 60 * 24 * 7),
(u'd', 60 * 60 * 24),
(u'h', 60 * 60),
(u'm', 60),
(u's', 1),
seconds = int(seconds)
if seconds < 60:
return u' {0:02d}s'.format(seconds)
for i in range(len(units) - 1):
unit1, limit1 = units[i]
unit2, limit2 = units[i + 1]
if seconds >= limit1:
return u'{0:02d}{1}{2:02d}{3}'.format(
seconds // limit1, unit1,
(seconds % limit1) // limit2, unit2)
return u' ~inf'
KB_BYTES = 1024
"""Number of bytes per KB (2^10)"""
MB_BYTES = 1048576
"""Number of bytes per MB (2^20)"""
GB_BYTES = 1073741824
"""Number of bytes per GB (2^30)"""
"""Kilobytes abbreviation"""
"""Megabytes abbreviation"""
"""Gigabytes abbreviation"""
def formatSize(size, short=False):
"""Format ``size`` (nuber of bytes) into string format doing KB, MB, or GB
conversion where necessary.
When ``short`` is False (the default) the format is smallest unit of
bytes and largest gigabytes; '234 GB'.
The short version is 2-4 characters long and of the form
if not short:
unit = "Bytes"
if size >= GB_BYTES:
size = float(size) / float(GB_BYTES)
unit = GB_UNIT
elif size >= MB_BYTES:
size = float(size) / float(MB_BYTES)
unit = MB_UNIT
elif size >= KB_BYTES:
size = float(size) / float(KB_BYTES)
unit = KB_UNIT
return "%.2f %s" % (size, unit)
suffixes = u' kMGTPEH'
if size == 0:
num_scale = 0
num_scale = int(math.floor(math.log(size) / math.log(1000)))
if num_scale > 7:
suffix = '?'
suffix = suffixes[num_scale]
num_scale = int(math.pow(1000, num_scale))
value = size / num_scale
str_value = str(value)
if len(str_value) >= 3 and str_value[2] == '.':
str_value = str_value[:2]
str_value = str_value[:3]
return "{0:>3s}{1}".format(str_value, suffix)
def formatTimeDelta(td):
"""Format a timedelta object ``td`` into a string. """
days = td.days
hours = td.seconds / 3600
mins = (td.seconds % 3600) / 60
secs = (td.seconds % 3600) % 60
tstr = "%02d:%02d:%02d" % (hours, mins, secs)
if days:
tstr = "%d days %s" % (days, tstr)
return tstr
def chunkCopy(src_fp, dest_fp, chunk_sz=(1024 * 512)):
"""Copy ``src_fp`` to ``dest_fp`` in ``chunk_sz`` byte increments."""
done = False
while not done:
data =
if data:
done = True
del data
class ArgumentParser(argparse.ArgumentParser):
"""Subclass of argparse.ArgumentParser that adds version and log level
def __init__(self, *args, **kwargs):
from eyed3 import version as VERSION
from eyed3.utils.log import LEVELS
from eyed3.utils.log import MAIN_LOGGER
def pop_kwarg(name, default):
if name in kwargs:
value = kwargs.pop(name) or default
value = default
return value
main_logger = pop_kwarg("main_logger", MAIN_LOGGER)
version = pop_kwarg("version", VERSION)
self.log_levels = [logging.getLevelName(l).lower() for l in LEVELS]
formatter = argparse.RawDescriptionHelpFormatter
super(ArgumentParser, self).__init__(*args, formatter_class=formatter,
self.add_argument("--version", action="version", version=version,
help="Display version information and exit")
debug_group = self.add_argument_group("Debugging")
"-l", "--log-level", metavar="LEVEL[:LOGGER]",
action=LoggingAction, main_logger=main_logger,
help="Set a log level. This option may be specified multiple "
"times. If a logger name is specified than the level "
"applies only to that logger, otherwise the level is set "
"on the top-level logger. Acceptable levels are %s. " %
(", ".join("'%s'" % l for l in self.log_levels)))
debug_group.add_argument("--profile", action="store_true",
default=False, dest="debug_profile",
help="Run using python profiler.")
debug_group.add_argument("--pdb", action="store_true", dest="debug_pdb",
help="Drop into 'pdb' when errors occur.")
class LoggingAction(argparse._AppendAction):
def __init__(self, *args, **kwargs):
self.main_logger = kwargs.pop("main_logger")
super(LoggingAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
values = values.split(':')
level, logger = values if len(values) > 1 else (values[0],
logger = logging.getLogger(logger)
except KeyError:
msg = "invalid level choice: %s (choose from %s)" % \
(level, parser.log_levels)
raise argparse.ArgumentError(self, msg)
super(LoggingAction, self).__call__(parser, namespace, values,
def datePicker(thing, prefer_recording_date=False):
"""This function returns a date of some sort, amongst all the possible
dates (members called release_date, original_release_date,
and recording_date of type eyed3.core.Date).
The order of preference is:
1) date of original release
2) date of this versions release
3) the recording date.
Unless ``prefer_recording_date`` is ``True`` in which case the order is
3, 1, 2.
``None`` will be returned if no dates are available."""
if not prefer_recording_date:
return (thing.original_release_date or
thing.release_date or
return (thing.recording_date or
thing.original_release_date or
def makeUniqueFileName(file_path, uniq=u''):
"""The ``file_path`` is the desired file name, and it is returned if the
file does not exist. In the case that it already exists the path is
adjusted to be unique. First, the ``uniq`` string is added, and then
a couter is used to find a unique name."""
path = os.path.dirname(file_path)
file = os.path.basename(file_path)
name, ext = os.path.splitext(file)
count = 1
while os.path.exists(os.path.join(path, file)):
if uniq:
name = "%s_%s" % (name, uniq)
file = "".join([name, ext])
uniq = u''
file = "".join(["%s_%s" % (name, count), ext])
count += 1
return os.path.join(path, file)
def b(x, encoder=None):
"""Converts `x` to a bytes string if not already.
:param x: The string.
:param encoder: Optional codec encoder to perform the conversion. The default is
:return: The byte string if conversion was needed.
if isinstance(x, bytes):
return x
import codecs
encoder = encoder or codecs.latin_1_encode
return encoder(x)[0]
You can’t perform that action at this time.