diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index e4d8bcbcb..4651e5ac6 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -155,6 +155,17 @@ Common configuration is a subdictionary that is a value of ``common`` key in :ref:`module segment option `. Paths defined here have priority when searching for modules. +``log_file`` + Defines path which will hold powerline logs. If not present, logging will be + done to stderr. + +``log_level`` + String, determines logging level. Defaults to ``WARNING``. + +``log_format`` + String, determines format of the log messages. Defaults to + ``'%(asctime)s:%(level)s:%(message)s'``. + Extension-specific configuration -------------------------------- diff --git a/powerline/__init__.py b/powerline/__init__.py index 18779189f..cd21a94e8 100644 --- a/powerline/__init__.py +++ b/powerline/__init__.py @@ -4,6 +4,7 @@ import json import os import sys +import logging from powerline.colorscheme import Colorscheme @@ -25,6 +26,42 @@ def load_json_config(search_paths, config_file, load=json.load, open_file=open_f raise IOError('Config file not found in search path: {0}'.format(config_file)) +class PowerlineState(object): + def __init__(self, logger, environ, getcwd, home): + self.environ = environ + self.getcwd = getcwd + self.home = home or environ.get('HOME', None) + self.logger = logger + self.prefix = None + self.last_msgs = {} + + def _log(self, attr, msg, *args, **kwargs): + prefix = kwargs.get('prefix') or self.prefix + msg = ((prefix + ':') if prefix else '') + msg.format(*args, **kwargs) + key = attr+':'+prefix + if msg != self.last_msgs.get(key): + getattr(self.logger, attr)(msg) + self.last_msgs[key] = msg + + def critical(self, msg, *args, **kwargs): + self._log('critical', msg, *args, **kwargs) + + def exception(self, msg, *args, **kwargs): + self._log('exception', msg, *args, **kwargs) + + def info(self, msg, *args, **kwargs): + self._log('info', msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self._log('error', msg, *args, **kwargs) + + def warn(self, msg, *args, **kwargs): + self._log('warning', msg, *args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self._log('debug', msg, *args, **kwargs) + + class Powerline(object): '''Main powerline class, entrance point for all powerline uses. Sets powerline up and loads the configuration. @@ -37,9 +74,29 @@ class Powerline(object): :param str renderer_module: Overrides renderer module (defaults to ``ext``). Should be the name of the package imported like this: ``powerline.renders.{render_module}``. + :param bool run_once: + Determines whether .renderer.render() method will be run only once + during python session. + :param Logger logger: + If present, no new logger will be created and this logger will be used. + :param dict environ: + Object with ``.__getitem__`` and ``.get`` methods used to obtain + environment variables. Defaults to ``os.environ``. + :param func getcwd: + Function used to get current working directory. Defaults to + ``os.getcwdu`` or ``os.getcwd``. + :param str home: + Home directory. Defaults to ``environ.get('HOME')``. ''' - def __init__(self, ext, renderer_module=None, run_once=False): + def __init__(self, + ext, + renderer_module=None, + run_once=False, + logger=None, + environ=os.environ, + getcwd=getattr(os, 'getcwdu', os.getcwd), + home=None): self.config_paths = self.get_config_paths() # Load main config file @@ -78,7 +135,40 @@ def __init__(self, ext, renderer_module=None, run_once=False): 'tmux_escape': common_config.get('additional_escapes') == 'tmux', 'screen_escape': common_config.get('additional_escapes') == 'screen', } - self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, **options) + + # Create logger + if not logger: + log_format = common_config.get('format', '%(asctime)s:%(level)s:%(message)s') + formatter = logging.Formatter(log_format) + + level = getattr(logging, common_config.get('log_level', 'WARNING')) + handler = self.get_log_handler(common_config) + handler.setLevel(level) + + logger = logging.getLogger('powerline') + logger.setLevel(level) + logger.addHandler(handler) + + pl = PowerlineState(logger=logger, environ=environ, getcwd=getcwd, home=home) + + self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, pl, **options) + + def get_log_handler(self, common_config): + '''Get log handler. + + :param dict common_config: + Common configuration. + + :return: logging.Handler subclass. + ''' + log_file = common_config.get('file', None) + if log_file: + log_dir = os.path.dirname(log_file) + if not os.path.isdir(log_dir): + os.mkdir(log_dir) + return logging.FileHandler(log_file) + else: + return logging.StreamHandler() @staticmethod def get_config_paths(): diff --git a/powerline/bindings/zsh/__init__.py b/powerline/bindings/zsh/__init__.py index 8be9f6f89..295cd699b 100644 --- a/powerline/bindings/zsh/__init__.py +++ b/powerline/bindings/zsh/__init__.py @@ -45,6 +45,22 @@ def config_path(self): return None +class Environment(object): + @staticmethod + def __getitem__(key): + try: + return zsh.getvalue(key) + except IndexError as e: + raise KeyError(*e.args) + + @staticmethod + def get(key, default=None): + try: + return zsh.getvalue(key) + except IndexError: + return default + + class Prompt(object): __slots__ = ('render', 'side', 'savedpsvar', 'savedps') @@ -53,9 +69,10 @@ def __init__(self, powerline, side, savedpsvar=None, savedps=None): self.side = side self.savedpsvar = savedpsvar self.savedps = savedps + self.args = powerline.args def __str__(self): - return self.render(width=zsh.columns(), side=self.side).encode('utf-8') + return self.render(width=zsh.columns(), side=self.side, segment_info=args).encode('utf-8') def __del__(self): if self.savedps: @@ -71,6 +88,6 @@ def set_prompt(powerline, psvar, side): def setup(): - powerline = ShellPowerline(Args()) + powerline = ShellPowerline(Args(), environ=Environment(), getcwd=lambda: zsh.getvalue('PWD')) set_prompt(powerline, 'PS1', 'left') set_prompt(powerline, 'RPS1', 'right') diff --git a/powerline/lib/threaded.py b/powerline/lib/threaded.py index eedfc142a..1b74d2575 100644 --- a/powerline/lib/threaded.py +++ b/powerline/lib/threaded.py @@ -23,15 +23,24 @@ def __init__(self): self.did_set_interval = False self.thread = None - def __call__(self, **kwargs): + def __call__(self, update_first=True, **kwargs): if self.run_once: + self.pl = kwargs['pl'] self.set_state(**kwargs) self.update() elif not self.is_alive(): - self.startup(**kwargs) + # Without this we will not have to wait long until receiving bug “I + # opened vim, but branch information is only shown after I move + # cursor”. + # + # If running once .update() is called in __call__. + if update_first and self.update_first: + self.update_first = False + self.update() + self.start() with self.write_lock: - return self.render(**kwargs) + return self.render(update_first=update_first, **kwargs) def is_alive(self): return self.thread and self.thread.is_alive() @@ -49,7 +58,10 @@ def run(self): start_time = monotonic() with self.update_lock: - self.update() + try: + self.update() + except Exception as e: + self.error('Exception while updating: {0}', str(e)) self.sleep(monotonic() - start_time) @@ -67,28 +79,27 @@ def set_interval(self, interval=None): self.interval = interval self.has_set_interval = True - def set_state(self, update_first=True, interval=None, **kwargs): + def set_state(self, interval=None, **kwargs): if not self.did_set_interval or interval: self.set_interval(interval) - # Without this we will not have to wait long until receiving bug “I - # opened vim, but branch information is only shown after I move cursor”. - # - # If running once .update() is called in __call__. - if update_first and self.update_first and not self.run_once: - self.update_first = False - self.update() - def startup(self, **kwargs): - # Normally .update() succeeds to run before value is requested, meaning - # that user is getting values he needs directly at vim startup. Without - # .startup() we will not have to wait long until receiving bug “I opened - # vim, but branch information is only shown after I move cursor”. + def startup(self, pl, **kwargs): self.run_once = False + self.pl = pl if not self.is_alive(): self.set_state(**kwargs) self.start() + def error(self, *args, **kwargs): + self.pl.error(prefix=self.__class__.__name__, *args, **kwargs) + + def warn(self, *args, **kwargs): + self.pl.warn(prefix=self.__class__.__name__, *args, **kwargs) + + def debug(self, *args, **kwargs): + self.pl.debug(prefix=self.__class__.__name__, *args, **kwargs) + def printed(func): def f(*args, **kwargs): @@ -109,12 +120,14 @@ def __init__(self): def key(**kwargs): return frozenset(kwargs.items()) - def render(self, **kwargs): + def render(self, update_first, **kwargs): key = self.key(**kwargs) try: update_state = self.queries[key][1] except KeyError: - update_state = self.compute_state(key) if self.update_first or self.run_once else None + # Allow only to forbid to compute missing values: in either user + # configuration or in subclasses. + update_state = self.compute_state(key) if update_first and self.update_first or self.run_once else None # No locks: render method is already running with write_lock acquired. self.queries[key] = (monotonic(), update_state) return self.render_one(update_state, **kwargs) @@ -124,7 +137,10 @@ def update(self): removes = [] for key, (last_query_time, state) in list(self.queries.items()): if last_query_time < monotonic() < last_query_time + self.drop_interval: - updates[key] = (last_query_time, self.compute_state(key)) + try: + updates[key] = (last_query_time, self.compute_state(key)) + except Exception as e: + self.error('Exception while computing state for {0}: {1}', repr(key), str(e)) else: removes.append(key) with self.write_lock: @@ -132,19 +148,13 @@ def update(self): for key in removes: self.queries.pop(key) - def set_state(self, update_first=True, interval=None, **kwargs): + def set_state(self, interval=None, **kwargs): if not self.did_set_interval or (interval < self.interval): self.set_interval(interval) - # Allow only to forbid to compute missing values: in either user - # configuration or in subclasses. if self.update_first: self.update_first = update_first - key = self.key(**kwargs) - if not self.run_once and key not in self.queries: - self.queries[key] = (monotonic(), self.compute_state(key) if self.update_first else None) - @staticmethod def render_one(update_state, **kwargs): return update_state diff --git a/powerline/lint/__init__.py b/powerline/lint/__init__.py index 0ab981e4e..91078e26d 100644 --- a/powerline/lint/__init__.py +++ b/powerline/lint/__init__.py @@ -8,6 +8,7 @@ import re from collections import defaultdict from copy import copy +import logging try: @@ -408,6 +409,11 @@ def check_config(d, theme, data, context, echoerr): # only for existence of the path, not for it being a directory paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))), lambda value: 'path does not exist: {0}'.format(value)).optional(), + log_file=Spec().type(str).func(lambda value, *args: (True, True, not os.path.isdir(os.path.dirname(value))), + lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))).optional(), + log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)), + lambda value: 'unknown debugging level {0}'.format(value)).optional(), + log_format=Spec().type(str).optional(), ).context_message('Error while loading common configuration (key {key})'), ext=Spec( vim=Spec( @@ -786,6 +792,11 @@ def check_segment_data_key(key, data, context, echoerr): return True, False, False +# FIXME More checks, limit existing to ThreadedSegment instances only +args_spec = Spec( + interval=Spec().either(Spec().type(float), Spec().type(int)).optional(), + update_first=Spec().type(bool).optional(), +).unknown_spec(Spec(), Spec()).optional().copy highlight_group_spec = Spec().type(unicode).copy segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy segments_spec = Spec().optional().list( @@ -801,8 +812,7 @@ def check_segment_data_key(key, data, context, echoerr): before=Spec().type(unicode).optional(), width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(), align=Spec().oneof(set('lr')).optional(), - # FIXME Check args - args=Spec().type(dict).optional(), + args=args_spec(), contents=Spec().type(unicode).optional(), highlight_group=Spec().list( highlight_group_spec().re('^(?:(?!:divider$).)+$', @@ -819,8 +829,7 @@ def check_segment_data_key(key, data, context, echoerr): Spec( after=Spec().type(unicode).optional(), before=Spec().type(unicode).optional(), - # FIXME Check args - args=Spec().type(dict).optional(), + args=args_spec(), contents=Spec().type(unicode).optional(), ), ).optional().context_message('Error while loading segment data (key {key})'), diff --git a/powerline/renderer.py b/powerline/renderer.py index 4fdcff72f..7e6433fef 100644 --- a/powerline/renderer.py +++ b/powerline/renderer.py @@ -18,9 +18,11 @@ def construct_returned_value(rendered_highlighted, segments, output_raw): class Renderer(object): - def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, **options): + def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, pl, **options): self.__dict__.update(options) self.theme_config = theme_config + theme_kwargs['pl'] = pl + self.pl = pl self.theme = Theme(theme_config=theme_config, **theme_kwargs) self.local_themes = local_themes self.theme_kwargs = theme_kwargs diff --git a/powerline/renderers/vim.py b/powerline/renderers/vim.py index 3cf71dfa8..59f230d1c 100644 --- a/powerline/renderers/vim.py +++ b/powerline/renderers/vim.py @@ -66,12 +66,7 @@ def strwidth(string): return vim.strwidth(string) def render(self, window_id, winidx, current): - '''Render all segments. - - This method handles replacing of the percent placeholder for vim - statuslines, and it caches segment contents which are retrieved and - used in non-current windows. - ''' + '''Render all segments.''' if current: mode = vim_mode(1) mode = mode_translations.get(mode, mode) diff --git a/powerline/segment.py b/powerline/segment.py index 8f37702c9..8f1660131 100644 --- a/powerline/segment.py +++ b/powerline/segment.py @@ -64,6 +64,7 @@ def get(segment, side): contents, contents_func, module = get_segment_info(data, segment) highlight_group = segment_type != 'function' and segment.get('highlight_group') or segment.get('name') return { + 'name': segment.get('name'), 'type': segment_type, 'highlight_group': highlight_group, 'divider_highlight_group': None, diff --git a/powerline/segments/common.py b/powerline/segments/common.py index fdcd8cf5b..e58e3d1d2 100644 --- a/powerline/segments/common.py +++ b/powerline/segments/common.py @@ -18,13 +18,13 @@ from collections import namedtuple -def hostname(only_if_ssh=False): +def hostname(pl, only_if_ssh=False): '''Return the current hostname. :param bool only_if_ssh: only return the hostname if currently in an SSH session ''' - if only_if_ssh and not os.environ.get('SSH_CLIENT'): + if only_if_ssh and not pl.environ.get('SSH_CLIENT'): return None return socket.gethostname() @@ -35,8 +35,8 @@ def __init__(self): self.directories = {} @staticmethod - def key(**kwargs): - return os.path.abspath(os.getcwd()) + def key(pl, **kwargs): + return os.path.abspath(pl.getcwd()) def update(self): # .compute_state() is running only in this method, and only in one @@ -82,16 +82,16 @@ def render_one(branch, status_colors=False, **kwargs): if branch and status_colors: return [{ 'contents': branch, - 'highlight_group': ['branch_dirty' if repository_status() else 'branch_clean', 'branch'], + 'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'], }] else: return branch def startup(self, status_colors=False, **kwargs): - super(BranchSegment, self).startup() + super(BranchSegment, self).startup(**kwargs) if status_colors: self.started_repository_status = True - repository_status.startup() + repository_status.startup(**kwargs) def shutdown(self): if self.started_repository_status: @@ -109,7 +109,7 @@ def shutdown(self): ''') -def cwd(dir_shorten_len=None, dir_limit_depth=None): +def cwd(pl, dir_shorten_len=None, dir_limit_depth=None): '''Return the current working directory. Returns a segment list to create a breadcrumb-like effect. @@ -126,18 +126,16 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None): ''' import re try: - try: - cwd = os.getcwdu() - except AttributeError: - cwd = os.getcwd() + cwd = pl.getcwd() except OSError as e: if e.errno == 2: # user most probably deleted the directory # this happens when removing files from Mercurial repos for example + pl.warn('Current directory not found') cwd = "[not found]" else: raise - home = os.environ.get('HOME') + home = pl.home if home: cwd = re.sub('^' + re.escape(home), '~', cwd, 1) cwd_split = cwd.split(os.sep) @@ -160,7 +158,7 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None): return ret -def date(format='%Y-%m-%d', istime=False): +def date(pl, format='%Y-%m-%d', istime=False): '''Return the current date. :param str format: @@ -177,7 +175,7 @@ def date(format='%Y-%m-%d', istime=False): }] -def fuzzy_time(): +def fuzzy_time(pl): '''Display the current time as fuzzy time, e.g. "quarter past six".''' hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'] minute_str = { @@ -241,7 +239,7 @@ def update(self): with self.write_lock: self.ip = ip - def render(self): + def render(self, **kwargs): if not hasattr(self, 'ip'): return None return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}] @@ -373,7 +371,8 @@ def update(self): self.location = ','.join([location_data['city'], location_data['region_name'], location_data['country_name']]) - except (TypeError, ValueError): + except (TypeError, ValueError) as e: + self.error('Failed to get location: {0}', str(e)) return query_data = { 'q': @@ -389,13 +388,19 @@ def update(self): condition = response['query']['results']['weather']['rss']['channel']['item']['condition'] condition_code = int(condition['code']) temp = float(condition['temp']) - except (KeyError, TypeError, ValueError): + except (KeyError, TypeError, ValueError) as e: + self.error('Failed to get weather conditions: {0}', str(e)) return try: icon_names = weather_conditions_codes[condition_code] - except IndexError: - icon_names = (('not_available' if condition_code == 3200 else 'unknown'),) + except IndexError as e: + if condition_code == 3200: + icon_names = ('not_available',) + self.warn('Weather is not available for location {0}', self.location) + else: + icon_names = ('unknown',) + self.error('Unknown condition code: {0}', condition_code) with self.write_lock: self.temp = temp @@ -472,7 +477,7 @@ def render(self, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_ ''') -def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2): +def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2): '''Return system load average. Highlights using ``system_load_good``, ``system_load_bad`` and @@ -500,6 +505,7 @@ def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2): try: cpu_num = cpu_count() except NotImplementedError: + pl.warn('Unable to get CPU count: method is not implemented') return None ret = [] for avg in os.getloadavg(): @@ -539,10 +545,10 @@ def _get_interfaces(): if data: yield interface, data.bytes_recv, data.bytes_sent - def _get_user(): + def _get_user(pl): return psutil.Process(os.getpid()).username - def cpu_load_percent(measure_interval=.5): + def cpu_load_percent(pl, measure_interval=.5): '''Return the average CPU load as a percentage. Requires the ``psutil`` module. @@ -569,10 +575,10 @@ def _get_interfaces(): if x is not None: yield interface, x[0], x[1] - def _get_user(): # NOQA - return os.environ.get('USER', None) + def _get_user(pl): # NOQA + return pl.environ.get('USER', None) - def cpu_load_percent(measure_interval=.5): # NOQA + def cpu_load_percent(pl, measure_interval=.5): # NOQA '''Return the average CPU load as a percentage. Requires the ``psutil`` module. @@ -580,13 +586,14 @@ def cpu_load_percent(measure_interval=.5): # NOQA :param float measure_interval: interval used to measure CPU load (in seconds) ''' + pl.warn('psutil package is not installed, thus CPU load is not available') return None username = False -def user(): +def user(pl): '''Return the current user. Highlights the user with the ``superuser`` if the effective user ID is 0. @@ -595,8 +602,9 @@ def user(): ''' global username if username is False: - username = _get_user() + username = _get_user(pl) if username is None: + pl.warn('Failed to get username') return None try: euid = os.geteuid() @@ -625,7 +633,7 @@ def _get_uptime(): # NOQA @add_divider_highlight_group('background:divider') -def uptime(format='{days}d {hours:02d}h {minutes:02d}m'): +def uptime(pl, format='{days}d {hours:02d}h {minutes:02d}m'): '''Return system uptime. :param str format: @@ -636,7 +644,11 @@ def uptime(format='{days}d {hours:02d}h {minutes:02d}m'): ''' try: seconds = _get_uptime() - except (IOError, NotImplementedError): + except IOError as e: + pl.error('Failed to get uptime: {0}', e) + return None + except NotImplementedError: + pl.warn('Unable to get uptime. You should install psutil package') return None minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) @@ -705,7 +717,10 @@ def render_one(self, idata, recv_format='⬇ {value:>8}', sent_format='⬆ {valu t2, b2 = idata['last'] measure_interval = t2 - t1 - if None in (b1, b2) or measure_interval == 0: + if None in (b1, b2): + return None + if measure_interval == 0: + self.error('Measure interval is zero. This should not happen') return None r = [] @@ -762,9 +777,9 @@ def render_one(self, idata, recv_format='⬇ {value:>8}', sent_format='⬆ {valu ''') -def virtualenv(): +def virtualenv(pl): '''Return the name of the current Python virtualenv.''' - return os.path.basename(os.environ.get('VIRTUAL_ENV', '')) or None + return os.path.basename(pl.environ.get('VIRTUAL_ENV', '')) or None _IMAPKey = namedtuple('Key', 'username password server port folder') @@ -774,12 +789,12 @@ class EmailIMAPSegment(KwThreadedSegment): interval = 60 @staticmethod - def key(username, password, server='imap.gmail.com', port=993, folder='INBOX'): + def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs): return _IMAPKey(username, password, server, port, folder) - @staticmethod - def compute_state(key): + def compute_state(self, key): if not key.username or not key.password: + self.warn('Username and password are not configured') return None try: import imaplib @@ -789,8 +804,6 @@ def compute_state(key): rc, message = mail.status(key.folder, '(UNSEEN)') unread_str = message[0].decode('utf-8') unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) - except socket.gaierror: - return None except imaplib.IMAP4.error as e: unread_count = str(e) return unread_count @@ -842,7 +855,7 @@ class NowPlayingSegment(object): 'stop': '■', } - def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', *args, **kwargs): + def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', **kwargs): player_func = getattr(self, 'player_{0}'.format(player)) stats = { 'state': None, @@ -853,7 +866,7 @@ def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({tot 'elapsed': None, 'total': None, } - func_stats = player_func(*args, **kwargs) + func_stats = player_func(**kwargs) if not func_stats: return None stats.update(func_stats) @@ -884,7 +897,7 @@ def _convert_state(state): def _convert_seconds(seconds): return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60)) - def player_cmus(self): + def player_cmus(self, pl): '''Return cmus player information. cmus-remote -Q returns data with multi-level information i.e. @@ -922,7 +935,7 @@ def player_cmus(self): 'total': self._convert_seconds(now_playing.get('duration', 0)), } - def player_mpd(self, host='localhost', port=6600): + def player_mpd(self, pl, host='localhost', port=6600): try: import mpd client = mpd.MPDClient() @@ -954,7 +967,7 @@ def player_mpd(self, host='localhost', port=6600): 'total': now_playing[3], } - def player_spotify(self): + def player_spotify(self, pl): try: import dbus except ImportError: @@ -982,7 +995,7 @@ def player_spotify(self): 'total': self._convert_seconds(info.get('mpris:length') / 1e6), } - def player_rhythmbox(self): + def player_rhythmbox(self, pl): now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td']) if not now_playing: return diff --git a/powerline/segments/ipython.py b/powerline/segments/ipython.py index c1b626153..4946c4c3e 100644 --- a/powerline/segments/ipython.py +++ b/powerline/segments/ipython.py @@ -3,5 +3,5 @@ from powerline.theme import requires_segment_info @requires_segment_info -def prompt_count(segment_info): +def prompt_count(pl, segment_info): return str(segment_info.prompt_count) diff --git a/powerline/segments/shell.py b/powerline/segments/shell.py index 5bcbe5ac0..acad81a76 100644 --- a/powerline/segments/shell.py +++ b/powerline/segments/shell.py @@ -4,7 +4,7 @@ @requires_segment_info -def last_status(segment_info): +def last_status(pl, segment_info): '''Return last exit code. Highlight groups used: ``exit_fail`` @@ -15,7 +15,7 @@ def last_status(segment_info): @requires_segment_info -def last_pipe_status(segment_info): +def last_pipe_status(pl, segment_info): '''Return last pipe status. Highlight groups used: ``exit_fail``, ``exit_success`` diff --git a/powerline/segments/vim.py b/powerline/segments/vim.py index 94d0ec81d..19e7ebfaa 100644 --- a/powerline/segments/vim.py +++ b/powerline/segments/vim.py @@ -94,7 +94,7 @@ def ret(segment_info, **kwargs): @requires_segment_info -def mode(segment_info, override=None): +def mode(pl, segment_info, override=None): '''Return the current vim mode. :param dict override: @@ -112,7 +112,7 @@ def mode(segment_info, override=None): @requires_segment_info -def modified_indicator(segment_info, text='+'): +def modified_indicator(pl, segment_info, text='+'): '''Return a file modified indicator. :param string text: @@ -122,7 +122,7 @@ def modified_indicator(segment_info, text='+'): @requires_segment_info -def paste_indicator(segment_info, text='PASTE'): +def paste_indicator(pl, segment_info, text='PASTE'): '''Return a paste mode indicator. :param string text: @@ -132,7 +132,7 @@ def paste_indicator(segment_info, text='PASTE'): @requires_segment_info -def readonly_indicator(segment_info, text=''): +def readonly_indicator(pl, segment_info, text=''): '''Return a read-only indicator. :param string text: @@ -142,7 +142,7 @@ def readonly_indicator(segment_info, text=''): @requires_segment_info -def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False): +def file_directory(pl, segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False): '''Return file directory (head component of the file path). :param bool shorten_user: @@ -165,13 +165,15 @@ def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_ho @requires_segment_info -def file_name(segment_info, display_no_file=False, no_file_text='[No file]'): +def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]'): '''Return file name (tail component of the file path). :param bool display_no_file: display a string if the buffer is missing a file name :param str no_file_text: the string to display if the buffer is missing a file name + + Highlight groups used: ``file_name_no_file`` or ``file_name``, ``file_name``. ''' name = segment_info['buffer'].name if not name: @@ -187,7 +189,7 @@ def file_name(segment_info, display_no_file=False, no_file_text='[No file]'): @window_cached -def file_size(suffix='B', si_prefix=False): +def file_size(pl, suffix='B', si_prefix=False): '''Return file size in &encoding. :param str suffix: @@ -204,7 +206,7 @@ def file_size(suffix='B', si_prefix=False): @requires_segment_info @add_divider_highlight_group('background:divider') -def file_format(segment_info): +def file_format(pl, segment_info): '''Return file format (i.e. line ending type). :return: file format or None if unknown or missing file format @@ -216,7 +218,7 @@ def file_format(segment_info): @requires_segment_info @add_divider_highlight_group('background:divider') -def file_encoding(segment_info): +def file_encoding(pl, segment_info): '''Return file encoding/character set. :return: file encoding/character set or None if unknown or missing file encoding @@ -228,7 +230,7 @@ def file_encoding(segment_info): @requires_segment_info @add_divider_highlight_group('background:divider') -def file_type(segment_info): +def file_type(pl, segment_info): '''Return file type. :return: file type or None if unknown file type @@ -239,7 +241,7 @@ def file_type(segment_info): @requires_segment_info -def line_percent(segment_info, gradient=False): +def line_percent(pl, segment_info, gradient=False): '''Return the cursor position in the file as a percentage. :param bool gradient: @@ -260,20 +262,20 @@ def line_percent(segment_info, gradient=False): @requires_segment_info -def line_current(segment_info): +def line_current(pl, segment_info): '''Return the current cursor line.''' return str(segment_info['window'].cursor[0]) @requires_segment_info -def col_current(segment_info): +def col_current(pl, segment_info): '''Return the current cursor column. ''' return str(segment_info['window'].cursor[1] + 1) @window_cached -def virtcol_current(): +def virtcol_current(pl): '''Return current visual column with concealed characters ingored Highlight groups used: ``virtcol_current`` or ``col_current``. @@ -282,7 +284,7 @@ def virtcol_current(): 'highlight_group': ['virtcol_current', 'col_current']}] -def modified_buffers(text='+ ', join_str=','): +def modified_buffers(pl, text='+ ', join_str=','): '''Return a comma-separated list of modified buffers. :param str text: @@ -366,16 +368,16 @@ def render_one(self, update_state, segment_info, status_colors=False, **kwargs): return [{ 'contents': update_state, - 'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info) else 'branch_clean'] + 'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info, **kwargs) else 'branch_clean'] if status_colors else []) + ['branch'], 'divider_highlight_group': 'branch:divider', }] def startup(self, status_colors=False, **kwargs): - super(BranchSegment, self).startup() + super(BranchSegment, self).startup(**kwargs) if status_colors: self.started_repository_status = True - repository_status.startup() + repository_status.startup(**kwargs) def shutdown(self): if self.started_repository_status: diff --git a/powerline/shell.py b/powerline/shell.py index 388a5e7b7..91e7538cd 100644 --- a/powerline/shell.py +++ b/powerline/shell.py @@ -19,9 +19,6 @@ def __init__(self, args, run_once=False): self.theme_option = mergeargs(args.theme_option) or {} super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once) - def get_segment_info(self): - return self.args - def load_main_config(self): r = super(ShellPowerline, self).load_main_config() if self.args.config: diff --git a/powerline/theme.py b/powerline/theme.py index 9b84ba5f2..266b351e0 100644 --- a/powerline/theme.py +++ b/powerline/theme.py @@ -24,7 +24,7 @@ def requires_segment_info(func): class Theme(object): - def __init__(self, ext, theme_config, common_config, top_theme_config=None, run_once=False): + def __init__(self, ext, theme_config, common_config, pl, top_theme_config=None, run_once=False): self.dividers = theme_config.get('dividers', common_config['dividers']) self.spaces = theme_config.get('spaces', common_config['spaces']) self.segments = { @@ -35,16 +35,22 @@ def __init__(self, ext, theme_config, common_config, top_theme_config=None, run_ 'contents': None, 'highlight': {'fg': False, 'bg': False, 'attr': 0} } + self.pl = pl theme_configs = [theme_config] if top_theme_config: theme_configs.append(top_theme_config) get_segment = gen_segment_getter(ext, common_config['paths'], theme_configs, theme_config.get('default_module')) for side in ['left', 'right']: - self.segments[side].extend((get_segment(segment, side) for segment in theme_config['segments'].get(side, []))) - if not run_once: - for segment in self.segments[side]: + for segment in theme_config['segments'].get(side, []): + segment = get_segment(segment, side) + if not run_once: if segment['startup']: - segment['startup'](**segment['args']) + try: + segment['startup'](pl=pl, **segment['args']) + except Exception as e: + pl.error('Exception during {0} startup: {1}', segment['name'], str(e)) + continue + self.segments[side].append(segment) def shutdown(self): for segments in self.segments.values(): @@ -71,11 +77,17 @@ def get_segments(self, side=None, segment_info=None): parsed_segments = [] for segment in self.segments[side]: if segment['type'] == 'function': - if (hasattr(segment['contents_func'], 'powerline_requires_segment_info') - and segment['contents_func'].powerline_requires_segment_info): - contents = segment['contents_func'](segment_info=segment_info, **segment['args']) - else: - contents = segment['contents_func'](**segment['args']) + self.pl.prefix = segment['name'] + try: + if (hasattr(segment['contents_func'], 'powerline_requires_segment_info') + and segment['contents_func'].powerline_requires_segment_info): + contents = segment['contents_func'](pl=self.pl, segment_info=segment_info, **segment['args']) + else: + contents = segment['contents_func'](pl=self.pl, **segment['args']) + except Exception as e: + self.pl.exception('Exception while computing segment: {0}', str(e)) + continue + if contents is None: continue if isinstance(contents, list): diff --git a/tests/lib/__init__.py b/tests/lib/__init__.py index 8d72fb4eb..a32e4294f 100644 --- a/tests/lib/__init__.py +++ b/tests/lib/__init__.py @@ -4,6 +4,28 @@ import os +class Pl(object): + def __init__(self): + self.errors = [] + self.warns = [] + self.debugs = [] + self._cwd = None + self.prefix = None + self.environ = {} + self.home = None + + def getcwd(self): + if isinstance(self._cwd, Exception): + raise self._cwd + else: + return self._cwd + + for meth in ('error', 'warn', 'debug'): + exec ('''def {0}(self, msg, *args, **kwargs): + self.{0}s.append((kwargs.get('prefix') or self.prefix, msg, args, kwargs)) + ''').format(meth) + + class Args(object): theme_option = None config = None @@ -63,50 +85,58 @@ def new_module(name, **kwargs): return module -class ModuleAttrReplace(object): - def __init__(self, module, attr, new): - self.module = module +class AttrReplace(object): + def __init__(self, obj, attr, new): + self.obj = obj self.attr = attr self.new = new def __enter__(self): try: - self.old = getattr(self.module, self.attr) + self.old = getattr(self.obj, self.attr) except AttributeError: pass - setattr(self.module, self.attr, self.new) + setattr(self.obj, self.attr, self.new) def __exit__(self, *args): try: - setattr(self.module, self.attr, self.old) + setattr(self.obj, self.attr, self.old) except AttributeError: - delattr(self.module, self.attr) + delattr(self.obj, self.attr) -replace_module_attr = ModuleAttrReplace +replace_attr = AttrReplace def replace_module_module(module, name, **kwargs): - return replace_module_attr(module, name, new_module(name, **kwargs)) + return replace_attr(module, name, new_module(name, **kwargs)) -class EnvReplace(object): - def __init__(self, name, new): - self.name = name +class ItemReplace(object): + def __init__(self, d, key, new, r=None): + self.key = key self.new = new + self.d = d + self.r = r def __enter__(self): - self.old = os.environ.get(self.name) - os.environ[self.name] = self.new + self.old = self.d.get(self.key) + self.d[self.key] = self.new + return self.r def __exit__(self, *args): if self.old is None: try: - os.environ.pop(self.name) + self.d.pop(self.key) except KeyError: pass else: - os.environ[self.name] = self.old + self.d[self.key] = self.old -replace_env = EnvReplace +def replace_env(key, new, d=None): + r = None + if not d: + r = Pl() + d = r.environ + return ItemReplace(d, key, new, r) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index d2c0f5a6d..ec6b29176 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -7,7 +7,7 @@ import sys import os import json -from tests.lib import Args, urllib_read, replace_module_attr +from tests.lib import Args, urllib_read, replace_attr from tests import TestCase @@ -67,7 +67,7 @@ def test_tmux(self): from imp import reload reload(common) from powerline.shell import ShellPowerline - with replace_module_attr(common, 'urllib_read', urllib_read): + with replace_attr(common, 'urllib_read', urllib_read): powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) powerline.renderer.render() powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) @@ -112,7 +112,7 @@ def test_wm(self): from imp import reload reload(common) from powerline import Powerline - with replace_module_attr(common, 'urllib_read', urllib_read): + with replace_attr(common, 'urllib_read', urllib_read): Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render() reload(common) diff --git a/tests/test_segments.py b/tests/test_segments.py index 629782946..478566ab7 100644 --- a/tests/test_segments.py +++ b/tests/test_segments.py @@ -4,7 +4,7 @@ import tests.vim as vim_module import sys import os -from tests.lib import Args, urllib_read, replace_module_attr, new_module, replace_module_module, replace_env +from tests.lib import Args, urllib_read, replace_attr, new_module, replace_module_module, replace_env, Pl from tests import TestCase @@ -13,14 +13,16 @@ class TestShell(TestCase): def test_last_status(self): - self.assertEqual(shell.last_status(Args(last_exit_code=10)), + pl = Pl() + self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=10)), [{'contents': '10', 'highlight_group': 'exit_fail'}]) - self.assertEqual(shell.last_status(Args(last_exit_code=None)), None) + self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=None)), None) def test_last_pipe_status(self): - self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[])), None) - self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 0, 0])), None) - self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 2, 0])), + pl = Pl() + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[])), None) + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 0, 0])), None) + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 2, 0])), [{'contents': '0', 'highlight_group': 'exit_success'}, {'contents': '2', 'highlight_group': 'exit_fail'}, {'contents': '0', 'highlight_group': 'exit_success'}]) @@ -28,220 +30,227 @@ def test_last_pipe_status(self): class TestCommon(TestCase): def test_hostname(self): - with replace_env('SSH_CLIENT', '192.168.0.12 40921 22'): + with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as pl: with replace_module_module(common, 'socket', gethostname=lambda: 'abc'): - self.assertEqual(common.hostname(), 'abc') - self.assertEqual(common.hostname(only_if_ssh=True), 'abc') - os.environ.pop('SSH_CLIENT') - self.assertEqual(common.hostname(), 'abc') - self.assertEqual(common.hostname(only_if_ssh=True), None) + self.assertEqual(common.hostname(pl=pl), 'abc') + self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), 'abc') + pl.environ.pop('SSH_CLIENT') + self.assertEqual(common.hostname(pl=pl), 'abc') + self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), None) def test_user(self): - new_os = new_module('os', environ={'USER': 'def'}, getpid=lambda: 1) + new_os = new_module('os', getpid=lambda: 1) new_psutil = new_module('psutil', Process=lambda pid: Args(username='def')) - with replace_module_attr(common, 'os', new_os): - with replace_module_attr(common, 'psutil', new_psutil): - self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) - new_os.geteuid = lambda: 1 - self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) - new_os.geteuid = lambda: 0 - self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}]) + with replace_env('USER', 'def') as pl: + with replace_attr(common, 'os', new_os): + with replace_attr(common, 'psutil', new_psutil): + self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}]) + new_os.geteuid = lambda: 1 + self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}]) + new_os.geteuid = lambda: 0 + self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}]) def test_branch(self): - with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')): - self.assertEqual(common.branch(status_colors=False), 'tests') - self.assertEqual(common.branch(status_colors=True), + pl = Pl() + pl._cwd = os.getcwd() + with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')): + self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests') + self.assertEqual(common.branch(pl=pl, status_colors=True), [{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}]) - with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')): - self.assertEqual(common.branch(status_colors=False), 'tests') - self.assertEqual(common.branch(status_colors=True), + with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')): + self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests') + self.assertEqual(common.branch(pl=pl, status_colors=True), [{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}]) - self.assertEqual(common.branch(), 'tests') - with replace_module_attr(common, 'guess', lambda path: None): - self.assertEqual(common.branch(), None) + self.assertEqual(common.branch(pl=pl), 'tests') + with replace_attr(common, 'guess', lambda path: None): + self.assertEqual(common.branch(pl=pl), None) def test_cwd(self): - new_os = new_module('os', path=os.path, environ={}, sep='/') - new_os.getcwd = lambda: '/abc/def/ghi/foo/bar' - with replace_module_attr(common, 'os', new_os): - self.assertEqual(common.cwd(), + new_os = new_module('os', path=os.path, sep='/') + pl = Pl() + with replace_attr(common, 'os', new_os): + pl._cwd = '/abc/def/ghi/foo/bar' + self.assertEqual(common.cwd(pl=pl), [{'contents': '/', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'abc', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'def', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - new_os.getcwdu = lambda: '/abc/def/ghi/foo/bar' - self.assertEqual(common.cwd(), - [{'contents': '/', 'divider_highlight_group': 'cwd:divider'}, - {'contents': 'abc', 'divider_highlight_group': 'cwd:divider'}, - {'contents': 'def', 'divider_highlight_group': 'cwd:divider'}, - {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'}, - {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, - {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - new_os.environ['HOME'] = '/abc/def/ghi' - self.assertEqual(common.cwd(), + pl.home = '/abc/def/ghi' + self.assertEqual(common.cwd(pl=pl), [{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - self.assertEqual(common.cwd(dir_limit_depth=3), + self.assertEqual(common.cwd(pl=pl, dir_limit_depth=3), [{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - self.assertEqual(common.cwd(dir_limit_depth=1), + self.assertEqual(common.cwd(pl=pl, dir_limit_depth=1), [{'contents': '⋯', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2), + self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2), [{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'fo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) ose = OSError() ose.errno = 2 - - def raises(exc): - raise exc - - new_os.getcwdu = lambda: raises(ose) - self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2), + pl._cwd = ose + self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2), [{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) - new_os.getcwdu = lambda: raises(OSError()) - self.assertRaises(OSError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) - new_os.getcwdu = lambda: raises(ValueError()) - self.assertRaises(ValueError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) + pl._cwd = OSError() + self.assertRaises(OSError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2) + pl._cwd = ValueError() + self.assertRaises(ValueError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2) def test_date(self): - with replace_module_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): - self.assertEqual(common.date(), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) - self.assertEqual(common.date(format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) + pl = Pl() + with replace_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): + self.assertEqual(common.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) + self.assertEqual(common.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) def test_fuzzy_time(self): time = Args(hour=0, minute=45) - with replace_module_attr(common, 'datetime', Args(now=lambda: time)): - self.assertEqual(common.fuzzy_time(), 'quarter to one') + pl = Pl() + with replace_attr(common, 'datetime', Args(now=lambda: time)): + self.assertEqual(common.fuzzy_time(pl=pl), 'quarter to one') time.hour = 23 time.minute = 59 - self.assertEqual(common.fuzzy_time(), 'round about midnight') + self.assertEqual(common.fuzzy_time(pl=pl), 'round about midnight') time.minute = 33 - self.assertEqual(common.fuzzy_time(), 'twenty-five to twelve') + self.assertEqual(common.fuzzy_time(pl=pl), 'twenty-five to twelve') time.minute = 60 - self.assertEqual(common.fuzzy_time(), 'twelve o\'clock') + self.assertEqual(common.fuzzy_time(pl=pl), 'twelve o\'clock') def test_external_ip(self): - with replace_module_attr(common, 'urllib_read', urllib_read): - self.assertEqual(common.external_ip(), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) + pl = Pl() + with replace_attr(common, 'urllib_read', urllib_read): + self.assertEqual(common.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) def test_uptime(self): - with replace_module_attr(common, '_get_uptime', lambda: 65536): - self.assertEqual(common.uptime(), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}]) + pl = Pl() + with replace_attr(common, '_get_uptime', lambda: 65536): + self.assertEqual(common.uptime(pl=pl), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}]) def _get_uptime(): raise NotImplementedError - with replace_module_attr(common, '_get_uptime', _get_uptime): - self.assertEqual(common.uptime(), None) + with replace_attr(common, '_get_uptime', _get_uptime): + self.assertEqual(common.uptime(pl=pl), None) def test_weather(self): - with replace_module_attr(common, 'urllib_read', urllib_read): - self.assertEqual(common.weather(), [ + pl = Pl() + with replace_attr(common, 'urllib_read', urllib_read): + self.assertEqual(common.weather(pl=pl), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} ]) - self.assertEqual(common.weather(temp_coldest=0, temp_hottest=100), [ + self.assertEqual(common.weather(pl=pl, temp_coldest=0, temp_hottest=100), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0} ]) - self.assertEqual(common.weather(temp_coldest=-100, temp_hottest=-50), [ + self.assertEqual(common.weather(pl=pl, temp_coldest=-100, temp_hottest=-50), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100} ]) - self.assertEqual(common.weather(icons={'cloudy': 'o'}), [ + self.assertEqual(common.weather(pl=pl, icons={'cloudy': 'o'}), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} ]) - self.assertEqual(common.weather(icons={'partly_cloudy_day': 'x'}), [ + self.assertEqual(common.weather(pl=pl, icons={'partly_cloudy_day': 'x'}), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} ]) - self.assertEqual(common.weather(unit='F'), [ + self.assertEqual(common.weather(pl=pl, unit='F'), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0} ]) - self.assertEqual(common.weather(unit='K'), [ + self.assertEqual(common.weather(pl=pl, unit='K'), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0} ]) - self.assertEqual(common.weather(temp_format='{temp:.1e}C'), [ + self.assertEqual(common.weather(pl=pl, temp_format='{temp:.1e}C'), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '}, {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0} ]) def test_system_load(self): + pl = Pl() with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): - with replace_module_attr(common, 'cpu_count', lambda: 2): - self.assertEqual(common.system_load(), + with replace_attr(common, 'cpu_count', lambda: 2): + self.assertEqual(common.system_load(pl=pl), [{'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, {'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, {'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 0}]) - self.assertEqual(common.system_load(format='{avg:.0f}', threshold_good=0, threshold_bad=1), + self.assertEqual(common.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1), [{'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, {'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, {'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}]) def test_cpu_load_percent(self): + pl = Pl() with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3): - self.assertEqual(common.cpu_load_percent(), '52%') + self.assertEqual(common.cpu_load_percent(pl=pl), '52%') def test_network_load(self): - def _get_bytes(interface): + def gb(interface): return None - with replace_module_attr(common, '_get_bytes', _get_bytes): - self.assertEqual(common.network_load(), None) - l = [0, 0] - - def _get_bytes2(interface): - l[0] += 1200 - l[1] += 2400 - return tuple(l) - - from imp import reload - reload(common) - with replace_module_attr(common, '_get_bytes', _get_bytes2): - common.network_load.startup() + + f = [gb] + def _get_bytes(interface): + return f[0](interface) + + pl = Pl() + + with replace_attr(common, '_get_bytes', _get_bytes): + common.network_load.startup(pl=pl) + self.assertEqual(common.network_load(pl=pl), None) + common.network_load.sleep(0) + self.assertEqual(common.network_load(pl=pl), None) + + l = [0, 0] + + def gb2(interface): + l[0] += 1200 + l[1] += 2400 + return tuple(l) + f[0] = gb2 + common.network_load.sleep(0) common.network_load.sleep(0) - self.assertEqual(common.network_load(), [ + self.assertEqual(common.network_load(pl=pl), [ {'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, ]) - self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}'), [ + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}'), [ {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, ]) - self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', suffix='bps'), [ + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps'), [ {'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']}, ]) - self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', si_prefix=True), [ + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True), [ {'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']}, ]) - self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', recv_max=0), [ + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0), [ {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, ]) class ApproxEqual(object): def __eq__(self, i): return abs(i - 50.0) < 1 - self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', sent_max=4800), [ + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800), [ {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, ]) def test_virtualenv(self): - with replace_env('VIRTUAL_ENV', '/abc/def/ghi'): - self.assertEqual(common.virtualenv(), 'ghi') - os.environ.pop('VIRTUAL_ENV') - self.assertEqual(common.virtualenv(), None) + with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as pl: + self.assertEqual(common.virtualenv(pl=pl), 'ghi') + pl.environ.pop('VIRTUAL_ENV') + self.assertEqual(common.virtualenv(pl=pl), None) def test_email_imap_alert(self): # TODO @@ -254,131 +263,145 @@ def test_now_playing(self): class TestVim(TestCase): def test_mode(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.mode(segment_info=segment_info), 'NORMAL') - self.assertEqual(vim.mode(segment_info=segment_info, override={'i': 'INS'}), 'NORMAL') - self.assertEqual(vim.mode(segment_info=segment_info, override={'n': 'NORM'}), 'NORM') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'NORMAL') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'i': 'INS'}), 'NORMAL') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'n': 'NORM'}), 'NORM') with vim_module._with('mode', 'i') as segment_info: - self.assertEqual(vim.mode(segment_info=segment_info), 'INSERT') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'INSERT') with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info: - self.assertEqual(vim.mode(segment_info=segment_info), 'V·BLCK') - self.assertEqual(vim.mode(segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'V·BLCK') + self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK') def test_modified_indicator(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.modified_indicator(segment_info=segment_info), None) + self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), None) segment_info['buffer'][0] = 'abc' try: - self.assertEqual(vim.modified_indicator(segment_info=segment_info), '+') - self.assertEqual(vim.modified_indicator(segment_info=segment_info, text='-'), '-') + self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), '+') + self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info, text='-'), '-') finally: vim_module._bw(segment_info['bufnr']) def test_paste_indicator(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.paste_indicator(segment_info=segment_info), None) + self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), None) with vim_module._with('options', paste=1): - self.assertEqual(vim.paste_indicator(segment_info=segment_info), 'PASTE') - self.assertEqual(vim.paste_indicator(segment_info=segment_info, text='P'), 'P') + self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), 'PASTE') + self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info, text='P'), 'P') def test_readonly_indicator(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.readonly_indicator(segment_info=segment_info), None) + self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), None) with vim_module._with('bufoptions', readonly=1): - self.assertEqual(vim.readonly_indicator(segment_info=segment_info), '') - self.assertEqual(vim.readonly_indicator(segment_info=segment_info, text='L'), 'L') + self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), '') + self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info, text='L'), 'L') def test_file_directory(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.file_directory(segment_info=segment_info), None) - with replace_env('HOME', '/home/foo'): + self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), None) + with replace_env('HOME', '/home/foo', os.environ): with vim_module._with('buffer', '/tmp/abc') as segment_info: - self.assertEqual(vim.file_directory(segment_info=segment_info), '/tmp/') + self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/') os.environ['HOME'] = '/tmp' - self.assertEqual(vim.file_directory(segment_info=segment_info), '~/') + self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '~/') def test_file_name(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.file_name(segment_info=segment_info), None) - self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True), + self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), None) + self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True), [{'contents': '[No file]', 'highlight_group': ['file_name_no_file', 'file_name']}]) - self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True, no_file_text='X'), + self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True, no_file_text='X'), [{'contents': 'X', 'highlight_group': ['file_name_no_file', 'file_name']}]) with vim_module._with('buffer', '/tmp/abc') as segment_info: - self.assertEqual(vim.file_name(segment_info=segment_info), 'abc') + self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), 'abc') with vim_module._with('buffer', '/tmp/’’') as segment_info: - self.assertEqual(vim.file_name(segment_info=segment_info), '’’') + self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), '’’') def test_file_size(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') + self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B') with vim_module._with('buffer', os.path.join(os.path.dirname(__file__), 'empty')) as segment_info: - self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') + self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B') def test_file_opts(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.file_format(segment_info=segment_info), + self.assertEqual(vim.file_format(pl=pl, segment_info=segment_info), [{'divider_highlight_group': 'background:divider', 'contents': 'unix'}]) - self.assertEqual(vim.file_encoding(segment_info=segment_info), + self.assertEqual(vim.file_encoding(pl=pl, segment_info=segment_info), [{'divider_highlight_group': 'background:divider', 'contents': 'utf-8'}]) - self.assertEqual(vim.file_type(segment_info=segment_info), None) + self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info), None) with vim_module._with('bufoptions', filetype='python'): - self.assertEqual(vim.file_type(segment_info=segment_info), + self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info), [{'divider_highlight_group': 'background:divider', 'contents': 'python'}]) def test_line_percent(self): + pl = Pl() segment_info = vim_module._get_segment_info() segment_info['buffer'][0:-1] = [str(i) for i in range(100)] try: - self.assertEqual(vim.line_percent(segment_info=segment_info), '1') + self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '1') vim_module._set_cursor(50, 0) - self.assertEqual(vim.line_percent(segment_info=segment_info), '50') - self.assertEqual(vim.line_percent(segment_info=segment_info, gradient=True), + self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '50') + self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info, gradient=True), [{'contents': '50', 'highlight_group': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101}]) finally: vim_module._bw(segment_info['bufnr']) def test_cursor_current(self): + pl = Pl() segment_info = vim_module._get_segment_info() - self.assertEqual(vim.line_current(segment_info=segment_info), '1') - self.assertEqual(vim.col_current(segment_info=segment_info), '1') - self.assertEqual(vim.virtcol_current(segment_info=segment_info), + self.assertEqual(vim.line_current(pl=pl, segment_info=segment_info), '1') + self.assertEqual(vim.col_current(pl=pl, segment_info=segment_info), '1') + self.assertEqual(vim.virtcol_current(pl=pl, segment_info=segment_info), [{'highlight_group': ['virtcol_current', 'col_current'], 'contents': '1'}]) def test_modified_buffers(self): - self.assertEqual(vim.modified_buffers(), None) + pl = Pl() + self.assertEqual(vim.modified_buffers(pl=pl), None) def test_branch(self): + pl = Pl() with vim_module._with('buffer', '/foo') as segment_info: - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): - self.assertEqual(vim.branch(segment_info=segment_info), + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): + self.assertEqual(vim.branch(pl=pl, segment_info=segment_info), [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) - self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), + self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True), [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}]) - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): - self.assertEqual(vim.branch(segment_info=segment_info), + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): + self.assertEqual(vim.branch(pl=pl, segment_info=segment_info), [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) - self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), + self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True), [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}]) def test_file_vcs_status(self): + pl = Pl() with vim_module._with('buffer', '/foo') as segment_info: - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): - self.assertEqual(vim.file_vcs_status(segment_info=segment_info), + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): + self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), [{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}]) - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)): - self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)): + self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None) with vim_module._with('buffer', '/bar') as segment_info: with vim_module._with('bufoptions', buftype='nofile'): - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): - self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): + self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None) def test_repository_status(self): + pl = Pl() segment_info = vim_module._get_segment_info() - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): - self.assertEqual(vim.repository_status(segment_info=segment_info), None) - with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): - self.assertEqual(vim.repository_status(segment_info=segment_info), 'DU') + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): + self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), None) + with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): + self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), 'DU') old_cwd = None