Skip to content

Commit

Permalink
Merge pull request #417 from ufo-kit/add-progressbar
Browse files Browse the repository at this point in the history
Add progressbar
  • Loading branch information
tfarago committed Oct 4, 2018
2 parents 8bcb269 + 1f29b6c commit 899a805
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 11 deletions.
12 changes: 12 additions & 0 deletions bin/concert
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import os
import re
import sys
import shutil
import signal
import subprocess
import tempfile
import traceback
Expand Down Expand Up @@ -372,6 +373,8 @@ class StartCommand(Command):
def run(self, session=None, filename=None,
non_interactive=False,
logto='file', logfile=None, loglevel=None):
from concert.session.utils import abort

if session:
cs.exit_if_not_exists(session)

Expand All @@ -395,6 +398,15 @@ class StartCommand(Command):
# Add session path, so that sessions can import other sessions
sys.path.append(cs.path())

# ctrl-c handling
orig_sigint_handler = signal.getsignal(signal.SIGINT)

def concert_sigint_handler(sig, frame):
abort()
orig_sigint_handler(sig, frame)

signal.signal(signal.SIGINT, concert_sigint_handler)

if non_interactive:
if session:
execfile(cs.path(session), globals())
Expand Down
5 changes: 5 additions & 0 deletions concert/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
Turn on gevent support. If geven is not available, fall back to
ThreadPoolExecutor approach.
.. data:: PROGRESS_BAR
Turn on progress bar by long-lasting operations if tqdm package is present
"""

ENABLE_ASYNC = True
ENABLE_GEVENT = False
# Prints the exception source by fake futures
PRINT_NOASYNC_EXCEPTION = True
PROGRESS_BAR = True
33 changes: 30 additions & 3 deletions concert/experiments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"""

import logging
import time
from concert.async import async
from concert.coroutines.base import broadcast, inject
from concert.progressbar import wrap_iterable


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,14 +38,25 @@ def __init__(self, name, producer, consumers=None, acquire=None):
self.consumers = [] if consumers is None else consumers
# Don't bother with checking this for None later
self.acquire = acquire if acquire else lambda: None
self._aborted = False

def connect(self):
"""Connect producer with consumers."""
self._aborted = False
started = []
for not_started in self.consumers:
started.append(not_started())

inject(self.producer(), broadcast(*started))
for item in self.producer():
if self._aborted:
LOG.info("Acquisition '%s' aborted", self.name)
break
for consumer in started:
consumer.send(item)

@async
def abort(self):
self._aborted = True

def __call__(self):
"""Run the acquisition, i.e. acquire the data and connect the producer and consumers."""
Expand Down Expand Up @@ -95,6 +107,7 @@ def __init__(self, acquisitions, walker=None, separate_scans=True, name_fmt='sca
self.separate_scans = separate_scans
self.name_fmt = name_fmt
self.iteration = 1
self._aborted = False

if self.separate_scans and self.walker:
# The data is not supposed to be overwritten, so find an iteration which
Expand Down Expand Up @@ -159,13 +172,22 @@ def get_acquisition(self, name):
return acq
raise ExperimentError("Acquisition with name `{}' not found".format(name))

@async
def abort(self):
LOG.info('Experiment aborted')
self._aborted = True
for acq in self.acquisitions:
acq.abort().join()

def acquire(self):
"""
Acquire data by running the acquisitions. This is the method which implements
the data acquisition and should be overriden if more functionality is required,
unlike :meth:`~.Experiment.run`.
"""
for acq in self._acquisitions:
for acq in wrap_iterable(self._acquisitions):
if self._aborted:
break
acq()

@async
Expand All @@ -175,6 +197,9 @@ def run(self):
Compute the next iteration and run the :meth:`~.base.Experiment.acquire`.
"""
start_time = time.time()
self._aborted = False
LOG.debug('Experiment iteration %d start', self.iteration)
if self.separate_scans and self.walker:
self.walker.descend(self.name_fmt.format(self.iteration))

Expand All @@ -188,6 +213,8 @@ def run(self):
finally:
if self.separate_scans and self.walker:
self.walker.ascend()
LOG.debug('Experiment iteration %d duration: %.2f s',
self.iteration, time.time() - start_time)
self.iteration += 1


Expand Down
71 changes: 71 additions & 0 deletions concert/experiments/dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Dummy experiments."""

import os
from concert.experiments.base import Acquisition, Experiment
from concert.devices.cameras.dummy import FileCamera
from concert.progressbar import wrap_iterable


class ImagingFileExperiment(Experiment):

"""
A typical imaging experiment which consists of acquiring dark, flat and radiographic images, in
this case located on a disk.
.. py:attribute:: directory
Top directory with subdirectories containing the individual images
.. py:attribute:: num_darks
Number of dark images (no beam, just dark current)
.. py:attribute:: num_flats
Number of flat images (beam present, sample not)
.. py:attribute:: num_radios
Number of radiographic images
.. py:attribute:: darks_dir
Subdirectory name with dark images
.. py:attribute:: flats_dir
Subdirectory name with flat images
.. py:attribute:: radio_dir
Subdirectory name with radiographic images
"""

def __init__(self, directory, num_darks, num_flats, num_radios, darks_dir='darks',
flats_dir='flats', radios_dir='projections', walker=None, separate_scans=True,
name_fmt='scan_{:>04}'):
self.directory = directory
self.num_darks = num_darks
self.num_flats = num_flats
self.num_radios = num_radios
self.darks_dir = darks_dir
self.flats_dir = flats_dir
self.radios_dir = radios_dir
darks = Acquisition('darks', self.take_darks)
flats = Acquisition('flats', self.take_flats)
radios = Acquisition('radios', self.take_radios)
super(ImagingFileExperiment, self).__init__([darks, flats, radios], walker=walker)

def _produce_images(self, subdirectory, num):
camera = FileCamera(os.path.join(self.directory, subdirectory))
for i in wrap_iterable(range(num)):
yield camera.grab()

def take_darks(self):
return self._produce_images(self.darks_dir, self.num_darks)

def take_flats(self):
return self._produce_images(self.flats_dir, self.num_flats)

def take_radios(self):
return self._produce_images(self.radios_dir, self.num_radios)
3 changes: 2 additions & 1 deletion concert/processes/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from concert.helpers import expects, Numeric
from concert.devices.motors.base import LinearMotor, RotationMotor
from concert.devices.cameras.base import Camera
from concert.progressbar import wrap_iterable


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,7 +104,7 @@ def get_value(index, tup, previous):

future = None

for i, tup in enumerate(product(*regions)):
for i, tup in wrap_iterable(enumerate(product(*regions))):
future = get_value(i, tup, future)
yield future

Expand Down
17 changes: 17 additions & 0 deletions concert/progressbar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Progress bar allows you to see the progress of long-term operations."""

import concert.config as cfg


try:
import tqdm
except:
cfg.PROGRESS_BAR = False


def wrap_iterable(iterable, **kwargs):
"""Wrap *iterable* so that a progress bar will be shown on iteration."""
if cfg.PROGRESS_BAR:
iterable = tqdm.tqdm(iterable, **kwargs)

return iterable
9 changes: 8 additions & 1 deletion concert/session/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,15 @@ def code_of(func):
def abort():
"""Abort all actions related with parameters on all devices."""
from concert.devices.base import Device
from concert.experiments.base import Acquisition, Experiment

return device_abort((device for (name, device) in _current_instances(Device)))
# First abort experiments
futures = [ex.abort() for (name, ex) in _current_instances(Experiment)]
# Then acquisitions, in case there are some standalone ones
futures += [acq.abort() for (name, acq) in _current_instances(Acquisition)]
futures += device_abort((device for (name, device) in _current_instances(Device)))

return futures


@threaded
Expand Down
16 changes: 10 additions & 6 deletions docs/user/getstarted.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ Aborting

In case the devices in the session support aborting (stopping whatever they are
currently doing) you can abort all of them at once by :func:`.abort`. This may
be useful for emergency aborts. If you want to abort all devices automatically
you can use :func:`.check_emergency_stop`, which takes a callable as its first
argument and aborts all the devices if the callable evaluates to ``True``. You
can even exit the session afterwards to make sure there are no more actions,
like pending device movements, to make sure nothing can be damaged after the
emergency stop occurs.
be useful for emergency aborts. Experiments and acquisisions within them also
support aborting and are included if you call :func:`.abort`. You can either
call this function directly or press ctrl-c, which has the same effect.

If you want to abort all devices automatically you can use
:func:`.check_emergency_stop`, which takes a callable as its first argument and
aborts all the devices if the callable evaluates to ``True``. You can even exit
the session afterwards to make sure there are no more actions, like pending
device movements, to make sure nothing can be damaged after the emergency stop
occurs.

0 comments on commit 899a805

Please sign in to comment.