Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First attempt to use more gpus #339

Merged
merged 86 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
be980b2
First attempt to use more gpus
tfarago Oct 21, 2014
3f865d7
Don't mess with arch and gpu inside spawned thread
Oct 22, 2014
6d005d8
Support more outputs
tfarago Mar 4, 2015
4573273
Enable specifying output's num_dims in constructor
tfarago Mar 4, 2015
22ef1b0
Simplify multi-GPU output
tfarago Jan 16, 2018
42aca1b
InjectProcess: Add scheduler to constructor
tfarago Jan 16, 2018
266b3a8
Add universal reconstructor prototype
tfarago Jan 16, 2018
f755556
Write volume asynchronously
tfarago Jan 18, 2018
d253b88
Create reconstructors when coroutine starts
tfarago Jan 18, 2018
be30524
Delete reconstructors once they are finished
tfarago Jan 18, 2018
268c1d5
InjectProcess: Don't require new inputs
tfarago Jan 18, 2018
4cb760a
Convert dark and flat to float32
tfarago Jan 18, 2018
44d97af
Optimie projection processing
tfarago Jan 18, 2018
d6cff85
Enable starting the graph immediately
tfarago Jan 18, 2018
1f369aa
Reuse resources
tfarago Jan 20, 2018
8d4030e
Start with cleanup
tfarago Jan 20, 2018
e181587
Add a simple online reconstruction addon
tfarago Jan 20, 2018
e080a93
Fix center-position-x optimization
tfarago Jan 22, 2018
2b3f99b
Optimize manager coroutine
tfarago Jan 22, 2018
c514b30
backproject: Cosmetic changes
tfarago Jan 22, 2018
24fa683
Make deepcopy and option in the queue coroutine
tfarago Jan 22, 2018
de16daa
manager: fix number of reconstructors
tfarago Jan 22, 2018
53183db
reco addon: Enable dark and flat processing
tfarago Jan 22, 2018
88bfa15
writer addon: Don't make deepcopy
tfarago Jan 22, 2018
9042472
Don't rely on 0-based regions from tofu
tfarago Jan 22, 2018
3c18440
manager: Wait for init thread if block is True
tfarago Jan 22, 2018
cfccbc8
Enbable delayed execution
tfarago Jan 24, 2018
dcddc0d
Enable delayed execution in online reco
tfarago Jan 24, 2018
42a4d89
InjectProcess: enable data copying
MarcusZuber Jan 25, 2018
6f44ed6
Enable sleep time specification
MarcusZuber Jan 26, 2018
0aeba48
Enable regions specification
MarcusZuber Jan 26, 2018
8c28e64
Manager constructor takes normalization images
MarcusZuber Jan 26, 2018
a569f84
Adjust online reco for a new manger coroutine
MarcusZuber Jan 26, 2018
80b97cf
Prepare backprojectors in one thread
MarcusZuber Jan 26, 2018
d672030
manager: cleanup the coroutine
MarcusZuber Jan 26, 2018
da2db1b
manager: Fix gpu index
MarcusZuber Jan 26, 2018
2bc29e2
manager: Enable more threads per GPU
MarcusZuber Jan 26, 2018
3e77ab6
queue coroutine: Make sure consumer stops
MarcusZuber Jan 28, 2018
85be134
manager: Use numpy array to store projections
tfarago Jan 29, 2018
e325a88
manager: Enable parallel slice writing
tfarago Jan 29, 2018
0cfbaf7
online: Add walker
tfarago Jan 29, 2018
0bbef78
online: Use numpy to store normalization images
tfarago Jan 29, 2018
60bfa31
online: clean up averaging coroutine
tfarago Jan 29, 2018
c6b269e
online: save the flats and darks correctly
tfarago Jan 29, 2018
dc6131b
online: Enable slice writing
tfarago Jan 31, 2018
04964af
reco: Wait for event before slice download starts
tfarago Jan 31, 2018
8d7aaa5
manager: Rework synchronization
tfarago Jan 31, 2018
3d94736
manager: Allow execution abort
tfarago Feb 2, 2018
6fc5582
manager: raname _in_index
tfarago Feb 2, 2018
7d0a34b
args: Add region_overlap
tfarago Feb 2, 2018
58a8d11
Normalize projections only if supported
tfarago May 2, 2018
7bc13e6
Use tofu's CTGeometry to copy projection region
tfarago May 3, 2018
2b31c8a
Enable more batches
tfarago May 3, 2018
2dd83f3
storage: Check for non-None im_writer
tfarago May 30, 2018
5eb46a4
reco: Enable slice directory specification
tfarago May 30, 2018
b536898
reco: Enable manager to receive more projections
tfarago May 30, 2018
2399444
manager: log number of received projections
tfarago May 30, 2018
3c54f38
Add parameter finding
tfarago May 30, 2018
f5c8bfb
Make z parameter setting safer
tfarago May 30, 2018
4ad839a
Rename center_* to center_position_*
tfarago Jun 1, 2018
af40e3a
find_parameter: don't force splittin policy 'one'
tfarago Jun 1, 2018
cc05850
find_parameter: generalize setting found parameter
tfarago Jun 1, 2018
cd4f314
find_parameter: Enable maximization
tfarago Jun 1, 2018
8ebed28
Document find_parameter
tfarago Jun 1, 2018
4e29e52
Add z_parameters and slice_metrics attributes
tfarago Jun 1, 2018
c7d18af
Rename Universal to General
tfarago Jun 1, 2018
88b54ae
find_parameter: make value storing optional
tfarago Jun 1, 2018
25e2909
manager: enable gpus specification
tfarago Jun 6, 2018
38d1dd2
Enable finding of more parameters at once
tfarago Jun 7, 2018
6cbd4d5
Don't flip the metric by default
tfarago Jun 8, 2018
640c9c4
Use straightforward sag instead of msag
tfarago Jun 8, 2018
a8c705b
imageprocessing: add filter_low_frequencies
tfarago Jun 8, 2018
f90379d
Add fwhm to find_parameters
tfarago Jun 8, 2018
a181273
Enable input specification in find_parameter
tfarago Jun 8, 2018
ccf5d3d
reco addon: Use first normalization image
tfarago Jun 19, 2018
42bc524
Enable OutputTask to return NULL
tfarago Jun 28, 2018
bcd9686
Make manager abortable
tfarago Jun 28, 2018
3544207
Fix flake8
tfarago Jun 28, 2018
e7326bd
Add dummy ImagingExperiment
tfarago Oct 4, 2018
c8b09ba
Simplify normalization computation in online reco
tfarago Oct 15, 2018
ec90dcd
ufo: Add FlatCorrect class
tfarago Oct 15, 2018
8784ec1
Online reco: don't broadcast result if unnecessary
tfarago Nov 12, 2018
d414b3e
Don't require width and height by reco args
tfarago Nov 12, 2018
4c4a048
Allow projection crop disabling
tfarago Nov 12, 2018
acad4df
Add number of projections info to reco manager
tfarago Nov 12, 2018
9ea89fb
Simplify source creation by general reco graph
tfarago Mar 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions concert/coroutines/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ def check_and_create(sl):


@coroutine
def queue(consumer, process_all=True, block=False):
def queue(consumer, process_all=True, block=False, make_deepcopy=True):
"""
queue(consumer, process_all=True, block=False)

Store the incoming data in a queue and dispatch to the *consumer* in a separate thread which
prevents the stalling on the "main" data stream. If *process_all* is True the serve loop may
exit only when all items are sent to *consumer*. If *block* is True this coroutine blocks until
all items in the serve loop are processed, *process_all* must be True as well for this to take
effect.
effect. If *make_deepcopy* is True, insert a deep copy of an item into the queue, otherwise just
a reference.
"""
from concert.async import HAVE_GEVENT
if cfg.ENABLE_GEVENT and HAVE_GEVENT:
Expand All @@ -73,23 +74,27 @@ def queue(consumer, process_all=True, block=False):

@threaded
def serve():
while serve.run or (process_all and not item_queue.empty()):
while True:
item = item_queue.get()
consumer.send(item)
if item is not None:
consumer.send(item)
elif not process_all or item_queue.empty():
break
item_queue.task_done()
serve.stopped.set()
LOG.debug("queue's serve loop stopped")

serve.run = True
serve.stopped = Event()
serve()

try:
while True:
item = yield
item_queue.put(deepcopy(item))
if make_deepcopy:
item = deepcopy(item)
item_queue.put(item)
except GeneratorExit:
serve.run = False
item_queue.put(None)
if block:
serve.stopped.wait()

Expand Down
111 changes: 109 additions & 2 deletions concert/experiments/addons.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
the acquired data, e.g. write images to disk, do tomographic reconstruction etc.
"""
import logging
import numpy as np
from concert.async import threaded
from concert.coroutines.base import broadcast, coroutine
from concert.coroutines.filters import queue
from concert.coroutines.sinks import Accumulate
from concert.coroutines.sinks import Accumulate, Result


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -166,16 +169,120 @@ def wrapped_writer():
self.walker.descend(acquisition.name)
coro = self.walker.write()
if self._async:
coro = queue(coro, process_all=True, block=block)
coro = queue(coro, process_all=True, block=block, make_deepcopy=False)
return coro
finally:
self.walker.ascend()

return wrapped_writer


class OnlineReconstruction(Addon):
def __init__(self, experiment, reco_args, process_normalization=False,
consumer=None, block=False, wait_for_projections=False,
walker=None, slice_directory='online-slices'):
from multiprocessing.pool import ThreadPool
from threading import Event
from concert.ext.ufo import GeneralBackprojectManager

self.experiment = experiment
self.dark_result = Result()
self.flat_result = Result()
self.manager = GeneralBackprojectManager(reco_args)
self.walker = walker
self.slice_directory = slice_directory
self.process_normalization = process_normalization
self._pool = ThreadPool(processes=2)
self._events = {'darks': Event(), 'flats': Event()}
self.consumer = consumer
self.block = block
self.wait_for_projections = wait_for_projections
self._consumers = {}
super(OnlineReconstruction, self).__init__(experiment.acquisitions)

@threaded
def _average_images(self, queue, im_type):
average = None
i = 0
while True:
image = queue.get()
if image is None:
if im_type == 'darks':
self.manager.dark = average
else:
self.manager.flat = average
self._events[im_type].set()
LOG.debug('%s pre-processing done', im_type)
break

if self.process_normalization:
if average is None:
average = np.zeros_like(image, dtype=np.float32)
average = (average * i + image) / (i + 1)
i += 1
else:
average = image

def _create_averaging(self, im_type):
@coroutine
def create_averaging_coro():
try:
import Queue as queue_module
except ImportError:
import queue as queue_module

self._events[im_type].clear()
queue = queue_module.Queue()
self._average_images(queue, im_type)
try:
while True:
image = yield
queue.put(image)
except GeneratorExit:
queue.put(None)

return create_averaging_coro

def _reconstruct(self):
if hasattr(self.experiment, 'darks') and hasattr(self.experiment, 'flats'):
events = self._events.values() if self.process_normalization else None
else:
events = None
consumers = []
write_coro = None

if self.consumer:
consumers.append(self.consumer)
if self.walker:
self.walker.descend(self.slice_directory)
write_coro = self.walker.write(dsetname='slice_{:>04}.tif')
self.walker.ascend()
consumers.append(write_coro)
consumer = broadcast(*consumers) if consumers else None

return self.manager(consumer=consumer, block=self.block, wait_for_events=events,
wait_for_projections=self.wait_for_projections)

def _attach(self):
if hasattr(self.experiment, 'darks') and hasattr(self.experiment, 'flats'):
self._consumers[self.experiment.darks] = self._create_averaging('darks')
self._consumers[self.experiment.flats] = self._create_averaging('flats')
self._consumers[self.experiment.radios] = self._reconstruct

for acq, consumer in self._consumers.items():
acq.consumers.append(consumer)

def _detach(self):
for acq, consumer in self._consumers.items():
acq.consumers.remove(consumer)


class AddonError(Exception):

"""Addon errors."""

pass


class OnlineReconstructionError(Exception):
pass
65 changes: 65 additions & 0 deletions concert/experiments/dummy.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,76 @@
"""Dummy experiments."""

import os
import numpy as np
from concert.experiments.base import Acquisition, Experiment
from concert.devices.cameras.dummy import FileCamera
from concert.progressbar import wrap_iterable


class ImagingExperiment(Experiment):

"""
A typical imaging experiment which consists of acquiring dark, flat and radiographic images, in
this case zeros or random data.

.. py:attribute:: num_darks

Number of dark images (no beam, just dark current)

.. py:attribute:: num_flats

Number of flat images (beam present, sample not)

.. py:attribute:: num_radios

Number of radiographic images

.. py:attribute:: shape

Shape of the generated images (H x W) (default: 1024 x 1024)

.. py:attribute:: random

If True, one random image is created and re-used, otherwise zeros

.. py:attribute:: dtype

Data type of the generated images (default: unsigned short)
"""

def __init__(self, num_darks, num_flats, num_radios, shape=(1024, 1024), walker=None,
random=False, dtype=np.ushort, separate_scans=True, name_fmt='scan_{:>04}'):
self.num_darks = num_darks
self.num_flats = num_flats
self.num_radios = num_radios
self.shape = shape
self.random = random
self.dtype = dtype
darks = Acquisition('darks', self.take_darks)
flats = Acquisition('flats', self.take_flats)
radios = Acquisition('radios', self.take_radios)
super(ImagingExperiment, self).__init__([darks, flats, radios], walker=walker)

def _produce_images(self, num):
if self.random:
image = np.random.normal(128., 10., size=self.shape)
else:
image = np.zeros(self.shape)

image = image.astype(self.dtype)
for i in wrap_iterable(range(num)):
yield image

def take_darks(self):
return self._produce_images(self.num_darks)

def take_flats(self):
return self._produce_images(self.num_flats)

def take_radios(self):
return self._produce_images(self.num_radios)


class ImagingFileExperiment(Experiment):

"""
Expand Down
Loading