Skip to content

Commit

Permalink
Extract progress bars from clustershell event handling.
Browse files Browse the repository at this point in the history
Change-Id: If60faa08b185e779c6b4fa9fb1fe3bf5e33fb3ce
  • Loading branch information
gehel committed Aug 2, 2018
1 parent cf1cbea commit b8431a7
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 53 deletions.
33 changes: 16 additions & 17 deletions cumin/tests/unit/transports/test_clustershell.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from cumin import CuminError, nodeset
from cumin.transports import BaseWorker, Command, clustershell, State, Target, WorkerError
from cumin.transports import BaseWorker, Command, clustershell, ProgressBars, State, Target, WorkerError


def test_node_class_instantiation():
Expand Down Expand Up @@ -206,8 +206,7 @@ class ConcreteBaseEventHandler(clustershell.BaseEventHandler):
def __init__(self, nodes, commands, **kwargs):
"""Initialize progress bars."""
super().__init__(nodes, commands, **kwargs)
self.pbar_ok = mock.Mock()
self.pbar_ko = mock.Mock()
self.progress = mock.Mock(spec_set=ProgressBars)

def close(self, task):
"""Required by the BaseEventHandler class."""
Expand Down Expand Up @@ -243,7 +242,7 @@ def test_on_timeout(self, tqdm):

assert not self.handler.global_timedout
self.handler.on_timeout(self.worker.task)
assert self.handler.pbar_ko.update.called
assert self.handler.progress.update_failed.called
assert self.handler.global_timedout
assert tqdm.write.called

Expand Down Expand Up @@ -293,6 +292,7 @@ def setup_method(self, _, tqdm, colorama, logger): # pylint: disable=arguments-
"""Initialize default properties and instances."""
super().setup_method()
self.handler = clustershell.SyncEventHandler(self.target, self.commands, success_threshold=1)
self.handler.progress = mock.Mock(spec_set=ProgressBars)
self.worker.eh = self.handler
self.colorama = colorama
self.logger = logger
Expand All @@ -302,16 +302,14 @@ def test_instantiation(self):
"""An instance of SyncEventHandler should be an instance of BaseEventHandler."""
assert isinstance(self.handler, clustershell.BaseEventHandler)

@mock.patch('cumin.transports.clustershell.tqdm')
def test_start_command_no_schedule(self, tqdm):
def test_start_command_no_schedule(self):
"""Calling start_command() should reset the success counter and initialize the progress bars."""
self.handler.start_command()
assert tqdm.called
assert self.handler.progress.init.called
assert self.handler.counters['success'] == 0

@mock.patch('cumin.transports.clustershell.Task.task_self')
@mock.patch('cumin.transports.clustershell.tqdm')
def test_start_command_schedule(self, tqdm, task_self):
def test_start_command_schedule(self, task_self):
"""Calling start_command() with schedule should also change the state of the first batch nodes."""
# Reset the state of nodes to pending
for node in self.handler.nodes.values():
Expand All @@ -320,7 +318,7 @@ def test_start_command_schedule(self, tqdm, task_self):
node.state.update(clustershell.State.pending)

self.handler.start_command(schedule=True)
assert tqdm.called
assert self.handler.progress.init.called
assert self.handler.counters['success'] == 0
scheduled_nodes = sorted(node.name for node in self.handler.nodes.values() if node.state.is_scheduled)
assert scheduled_nodes == sorted(['node1', 'node2'])
Expand Down Expand Up @@ -357,7 +355,7 @@ def test_ev_hup_ok(self, timer):
"""Calling ev_hup with a worker that has exit status zero should update the success progress bar."""
self.handler.ev_pickup(self.worker, self.worker.current_node)
self.handler.ev_hup(self.worker, self.worker.current_node, 100)
assert self.handler.pbar_ok.update.called
assert self.handler.progress.update_success.called
assert not timer.called
assert self.handler.nodes[self.worker.current_node].state.is_success

Expand All @@ -366,7 +364,7 @@ def test_ev_hup_ko(self, timer):
"""Calling ev_hup with a worker that has exit status non-zero should update the failed progress bar."""
self.handler.ev_pickup(self.worker, self.worker.current_node)
self.handler.ev_hup(self.worker, self.worker.current_node, 1)
assert self.handler.pbar_ko.update.called
assert self.handler.progress.update_failed.called
assert not timer.called
assert self.handler.nodes[self.worker.current_node].state.is_failed

Expand All @@ -381,10 +379,11 @@ def test_close(self, tqdm): # pylint: disable=arguments-differ
class TestAsyncEventHandler(TestBaseEventHandler):
"""AsyncEventHandler test class."""

@mock.patch('cumin.transports.clustershell.ProgressBars')
@mock.patch('cumin.transports.clustershell.logging')
@mock.patch('cumin.transports.clustershell.colorama')
@mock.patch('cumin.transports.clustershell.tqdm')
def setup_method(self, _, tqdm, colorama, logger): # pylint: disable=arguments-differ
def setup_method(self, _, tqdm, colorama, logger, progress): # pylint: disable=arguments-differ,unused-argument
"""Initialize default properties and instances."""
super().setup_method()
self.handler = clustershell.AsyncEventHandler(self.target, self.commands)
Expand All @@ -396,7 +395,7 @@ def setup_method(self, _, tqdm, colorama, logger): # pylint: disable=arguments-
def test_instantiation(self):
"""An instance of AsyncEventHandler should be an instance of BaseEventHandler and initialize progress bars."""
assert isinstance(self.handler, clustershell.BaseEventHandler)
assert self.handler.pbar_ok.refresh.called
assert self.handler.progress.init.called

def test_ev_hup_ok(self):
"""Calling ev_hup with a worker that has zero exit status should enqueue the next command."""
Expand All @@ -410,13 +409,13 @@ def test_ev_hup_ok(self):
self.handler.ev_pickup(self.worker, self.worker.current_node)
self.handler.ev_hup(self.worker, self.worker.current_node, 0)
assert self.handler.counters['success'] == 1
assert self.handler.pbar_ok.update.called
assert self.handler.progress.update_success.called

def test_ev_hup_ko(self):
"""Calling ev_hup with a worker that has non-zero exit status should not enqueue the next command."""
self.handler.ev_pickup(self.worker, self.worker.current_node)
self.handler.ev_hup(self.worker, self.worker.current_node, 1)
assert self.handler.pbar_ko.update.called
assert self.handler.progress.update_failed.called

def test_ev_timer(self):
"""Calling ev_timer() should schedule the execution of the next node/command."""
Expand All @@ -429,5 +428,5 @@ def test_close(self, tqdm): # pylint: disable=arguments-differ
self.worker.task.iter_buffers = TestClusterShellWorker.iter_buffers
self.worker.num_timeout.return_value = 0
self.handler.close(self.worker)
assert self.handler.pbar_ok.close.called
assert self.handler.progress.close.called
assert tqdm.write.called
43 changes: 43 additions & 0 deletions cumin/tests/unit/transports/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import cumin # noqa: F401 (dynamically used in TestCommand)

from cumin import transports
from cumin.transports import ProgressBars


class ConcreteBaseWorker(transports.BaseWorker):
Expand Down Expand Up @@ -533,3 +534,45 @@ def test_raise_error(self):
"""Should raise a WorkerError."""
with pytest.raises(transports.WorkerError, match='Test message'):
transports.raise_error('Test', 'message', 'value')


@mock.patch('cumin.transports.tqdm')
class TestProgressBars:
"""A class that tests ProgressBars"""

def test_init_intialize_progress_bars_with_correct_size(self, tqdm):
"""Progress bars are initialized at the correct size"""
progress = ProgressBars()
progress.init(10)

assert tqdm.call_count == 2
_, kwargs = tqdm.call_args
assert kwargs['total'] == 10

def test_progress_bars_are_closed(self, tqdm): # pylint: disable=unused-argument
"""Progress bars are closed"""
progress = ProgressBars()
progress.init(10)

progress.close()

assert progress.pbar_ok.close.called # pylint: disable=no-member
assert progress.pbar_ko.close.called # pylint: disable=no-member

def test_progress_bar_is_updated_on_success(self, tqdm): # pylint: disable=unused-argument
"""Progress bar is updated on success"""
progress = ProgressBars()
progress.init(10)

progress.update_success(5)

assert progress.pbar_ok.update.called_once_with(5) # pylint: disable=no-member

def test_progress_bar_is_updated_on_failure(self, tqdm): # pylint: disable=unused-argument
"""Progress bar is updated on failure"""
progress = ProgressBars()
progress.init(10)

progress.update_failed(3)

assert progress.pbar_ko.update.called_once_with(3) # pylint: disable=no-member
73 changes: 73 additions & 0 deletions cumin/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import logging
import os
import shlex
import sys

from abc import ABCMeta, abstractmethod, abstractproperty

import colorama

from ClusterShell.NodeSet import NodeSet
from tqdm import tqdm

from cumin import CuminError, nodeset_fromlist

Expand Down Expand Up @@ -715,3 +719,72 @@ def raise_error(property_name, message, value):
"""
raise WorkerError("{property_name} {message}, got '{value_type}': {value}".format(
property_name=property_name, message=message, value_type=type(value), value=value))


class ProgressBars:
"""Progress bars for the status of successful / failed hosts.
The ProgressBars needs to be notified of the total number of hosts when the
operation starts, and then notified of successes and failures.
"""

def __init__(self):
"""Create the progress bars.
Note that the progress bars themselves are not initalized at object
creation. `init()` needs to be called before using the progress bars.
"""
self.pbar_ok = None
self.pbar_ko = None
self.bar_format = ('{desc} |{bar}| {percentage:3.0f}% ({n_fmt}/{total_fmt}) '
'[{elapsed}<{remaining}, {rate_fmt}]')

def init(self, num_hosts):
"""Initialize the progress bars.
Arguments:
num_hosts (int): the total number of hosts
"""
self.pbar_ok = tqdm(desc='PASS', total=num_hosts, leave=True, unit='hosts', dynamic_ncols=True,
bar_format=colorama.Fore.GREEN + self.bar_format, file=sys.stderr)
self.pbar_ok.refresh()
self.pbar_ko = tqdm(desc='FAIL', total=num_hosts, leave=True, unit='hosts', dynamic_ncols=True,
bar_format=colorama.Fore.RED + self.bar_format, file=sys.stderr)
self.pbar_ko.refresh()

def close(self):
"""Closes the progress bars."""
self.pbar_ok.close()
self.pbar_ko.close()

def update_success(self, num_hosts=1):
"""Updates the number of successful hosts.
Arguments:
num_hosts (int): increment to the number of hosts that have completed successfully
"""
self.pbar_ok.update(num_hosts)

def update_failed(self, num_hosts=1):
"""Updates the number of failed hosts.
Arguments:
num_hosts (int): increment to the number of hosts that have completed in error
"""
self.pbar_ko.update(num_hosts)


class NoProgress:
"""Used as a null object to disable the display of progress bars."""

def init(self, num_hosts):
"""Does nothing"""

def close(self):
"""Does nothing"""

def update_success(self, num_hosts=1):
"""Does nothing"""

def update_failed(self, num_hosts=1):
"""Does nothing"""

0 comments on commit b8431a7

Please sign in to comment.