Permalink
Browse files

use multiprocessing instead of parallelpython, even on windows

  • Loading branch information...
1 parent 527eaca commit 58e3f0307490340ec8ba4f39e02a5e8d2aeec02b @ddale ddale committed Dec 18, 2011
View
48 doc/users/installation.rst
@@ -4,19 +4,18 @@ Installation
Praxes depends on a few python packages, most of which can be installed on
Linux using the distribution's package manager, or on Mac OS X with MacPorts.
-Note to windows users: you currently need to install the 32-bit python and
-32-bit package installers, and you may need to run .exe installers as
-administrator by right-clicking on them and choose "run as Administrator").
-Check that you have the following installed:
-
-#. Python_ (version 2.7)
-#. Cython_ (version 0.13 or later, only required for Mac OS X and Linux)
-#. NumPy_ (version 1.5.1 or later)
-#. PyQt4_ (version 4.5.2 or later) [#f1]_
-#. PyMca_ (version 4.4.0 or later, Windows users, please see footnote) [#f2]_
-#. matplotlib_ (0.98.3 or later)
-#. h5py_ (1.3.0 or later) [#f3]_
-#. parallelpython_ (1.6.1 or later, only required for Windows)
+Note to windows users: you may need to run .exe installers as administrator by
+right-clicking on them and choose "run as Administrator"). Check that you have
+the following installed:
+
+#. Python_ (version 2.6.x or 2.7.x. May already be installed on Linux)
+#. Cython_ (version 0.15 or later, only required for Mac OS X and Linux)
+#. NumPy_ (version 1.5.1 or later) [#f1]_
+#. PyQt4_ (version 4.5.2 or later) [#f2]_
+#. PyMca_ (version 4.4.0 or later) [#f3]_
+#. matplotlib_ (1.1.0 or later) [#f1]_
+#. h5py_ (2.1.0 or later) [#f1]_, [#f4]_
+#. quantities_ (optional, only required by physref package)
#. Praxes_
To install Praxes on OS X or Linux, download the source tar.gz file, unpack it,
@@ -27,25 +26,28 @@ and run the following in the source directory::
.. rubric:: Footnotes
-.. [#f1] May require installing Qt_ on Mac, and development tools
+
+.. [#f1] Windows installers for 64-bit Python environments can be found
+ `here <http://www.lfd.uci.edu/~gohlke/pythonlibs>`_
+.. [#f2] May require installing Qt_ on Mac, and development tools
like pyqt4-dev and pyqt4-dev-tools through the package manager on
Linux.
-.. [#f2] Windows users, please install
- the file that includes the python version in the name: e.g.
- PyMca-4.4.0.win32-py2.6.exe. Mac and linux users please install
- from source: e.g. pymca4.4.0-src.tgz.
-.. [#f3] May require installing hdf5 on Linux and OS X, and development
+.. [#f3] Mac and linux users please install from source: e.g.
+ pymca4.4.1-src.tgz. Windows users should follow the `PythonPackages` link,
+ and download the file that includes the platform and python version in the
+ name: e.g. PyMca-4.4.1.win-amd64-py2.7.exe.
+.. [#f4] May require installing hdf5 on Linux and OS X, and development
libraries like libhdf5-dev if installing with a packager manager on
- some linux distributions. hdf5-1.8.4-patch1 is the recommended version.
+ some linux distributions.
-.. _Python: http://www.python.org/
+.. _Python: http://www.python.org/download/releases/2.7.2/
.. _Cython: http://pypi.python.org/pypi/Cython
.. _NumPy: http://pypi.python.org/pypi/numpy
.. _PyQt4: http://pypi.python.org/pypi/PyQt
.. _Qt: http://qt.nokia.com/
.. _matplotlib: http://pypi.python.org/pypi/matplotlib
-.. _PyMca: http://pypi.python.org/pypi/PyMca
+.. _PyMca: http://sourceforge.net/projects/pymca/files/pymca/PyMca4.4.1
.. _h5py: http://pypi.python.org/pypi/h5py
+.. _quantities: http://pypi.python.org/pypi/quantities
.. _Praxes: http://github.com/praxes/praxes/downloads
-.. _parallelpython: http://www.parallelpython.com
View
168 praxes/dispatch/pptaskmanager.py
@@ -1,168 +0,0 @@
-"""
-"""
-from __future__ import with_statement
-
-import copy
-import gc
-import hashlib
-import logging
-import threading
-import time
-
-import pp
-#from PyQt4 import QtCore
-import numpy as np
-np.seterr(all='ignore')
-
-
-logger = logging.getLogger(__file__)
-DEBUG = False
-
-
-class TaskManager(threading.Thread):
-
- @property
- def job_server(self):
- return self._job_server
-
- @property
- def lock(self):
- return self.__lock
-
- @property
- def n_cpus(self):
- with self.lock:
- return copy.copy(self._n_cpus)
-
- @property
- def n_points(self):
- with self.lock:
- return copy.copy(self._n_points)
-
- @property
- def n_processed(self):
- with self.lock:
- return copy.copy(self._n_processed)
- @n_processed.setter
- def n_processed(self, val):
- with self.lock:
- self._n_processed = copy.copy(val)
-
- @property
- def n_submitted(self):
- with self.lock:
- return copy.copy(self._n_submitted)
- @n_submitted.setter
- def n_submitted(self, val):
- with self.lock:
- self._n_submitted = copy.copy(val)
-
- @property
- def scan(self):
- return self._scan
-
- @property
- def stopped(self):
- with self.lock:
- return copy.copy(self.__stopped)
- @stopped.setter
- def stopped(self, val):
- with self.lock:
- self.__stopped = copy.copy(val)
-
- def __init__(self, scan, progress_queue, **kwargs):
- super(TaskManager, self).__init__()
-
- self.__lock = threading.RLock()
-
- self._scan = scan
- self._n_points = scan.entry.npoints
-
- self.progress_queue = progress_queue
- self.job_queue = []
-
- self._job_server = pp.Server(ppservers=('*',))
-
- n_cpus = self.job_server.get_ncpus()
- n_local_processes = kwargs.get('n_local_processes', n_cpus)
- self.job_server.set_ncpus(n_local_processes)
-
- # total cpus, including local and remote:
- self._n_cpus = np.sum(
- [i for i in self.job_server.get_active_nodes().values()]
- )
-
- self.__stopped = False
-
- self._next_index = 0
- self._n_processed = 0
- self._n_submitted = 0
-
- def __iter__(self):
- return self
-
- def next(self):
- """
- This needs to be reimplemented to return either:
-
- * (func, args) tuple such that func(*args) will work
- * 0 if the point was masked
- * None if the data is not yet available
- """
- raise NotImplementedError
-
- def flush(self):
- #self.job_server.wait()
- while True:
- try:
- res = self.job_queue.pop(0)()
- if res is not None:
- self.update_records(res)
- except IndexError:
- break
- self.n_submitted = 0
- self.queue_results()
-
- def process_data(self):
- for item in self:
- if self.stopped:
- break
-
- if item is None:
- # next data point is not yet available
- if self.n_submitted > 0:
- self.flush()
- else:
- time.sleep(0.1)
- continue
-
- if item:
- f, args = item
- job = self.job_server.submit(
- f, args, modules=("time", )
- )
- self.job_queue.append(job)
-
- self.n_processed += 1
- self.n_submitted += 1
-
- if self.n_submitted >= self.n_cpus*3:
- self.flush()
-
- if self.n_submitted > 0:
- self.flush()
-
- def queue_results(self):
- stats = copy.deepcopy(self.job_server.get_stats())
- stats['n_processed'] = self.n_processed
- self.progress_queue.put(stats)
-
- def run(self):
- self.process_data()
- self.scan.file.flush()
-
- def stop(self):
- self.stopped = True
-
- def update_records(self, data):
- raise NotImplementedError()
View
11 praxes/fluorescence/mcaanalysiswindow.py
@@ -342,10 +342,7 @@ def processComplete(self):
self._results.flush()
def processData(self):
- if sys.platform.startswith('win'):
- from .pptaskmanager import XfsTaskManager
- else:
- from .mptaskmanager import XfsTaskManager
+ from .mptaskmanager import XfsTaskManager
self.setMenuToolsActionsEnabled(False)
@@ -411,9 +408,11 @@ def setMenuToolsActionsEnabled(self, enabled=True):
@classmethod
def offersService(cls, h5Node):
- return isinstance(
+ if isinstance(
h5Node, (phynx.Entry, phynx.Measurement, phynx.MultiChannelAnalyzer)
- )
+ ):
+ return len(h5Node.entry.measurement.mcas) > 0
+ return False
#if __name__ == "__main__":
View
178 praxes/fluorescence/pptaskmanager.py
@@ -1,178 +0,0 @@
-"""
-"""
-from __future__ import with_statement
-
-import copy
-import logging
-import sys
-import time
-
-import numpy as np
-import pp
-from PyMca import ClassMcaTheory
-from PyMca.ConcentrationsTool import ConcentrationsTool
-import numpy as np
-np.seterr(all='ignore')
-
-from praxes.dispatch.pptaskmanager import TaskManager
-
-
-logger = logging.getLogger(__file__)
-DEBUG = False
-
-def analyze_spectrum(index, spectrum, advanced_fit, mass_fraction_tool):
- start = time.time()
- advanced_fit.config['fit']['use_limit'] = 1
- # TODO: get the channels from the controller
- advanced_fit.setdata(y=spectrum)
- advanced_fit.estimate()
- estimate = time.time()
- if ('concentrations' in advanced_fit.config) and \
- (advanced_fit._fluoRates is None):
- fitresult, result = advanced_fit.startfit(digest=1)
- else:
- fitresult = advanced_fit.startfit(digest=0)
- result = advanced_fit.imagingDigestResult()
- result['index'] = index
- fit = time.time()
-
- if mass_fraction_tool:
- temp = {}
- temp['fitresult'] = fitresult
- temp['result'] = result
- temp['result']['config'] = advanced_fit.config
- conc = mass_fraction_tool.processFitResult(
- fitresult=temp,
- elementsfrommatrix=False,
- fluorates=advanced_fit._fluoRates
- )
- result['concentrations'] = conc
- fitconc = time.time()
- report = {'estimate':estimate-start,
- 'fit': fit-estimate,
- 'fitconc': fitconc-fit}
-
- return {
- 'index': index,
- 'result': result,
- 'advanced_fit': advanced_fit,
- 'report': report
- }
-
-
-class XfsTaskManager(TaskManager):
-
- @property
- def advanced_fit(self):
- with self.lock:
- return copy.deepcopy(self._advanced_fit)
- @advanced_fit.setter
- def advanced_fit(self, val):
- with self.lock:
- self._advanced_fit = val
-
- @property
- def mass_fraction_tool(self):
- with self.lock:
- return copy.copy(self._mass_fraction_tool)
- @mass_fraction_tool.setter
- def mass_fraction_tool(self, val):
- with self.lock:
- self._mass_fraction_tool = val
-
- def __init__(self, scan, config, progress_queue, **kwargs):
- super(XfsTaskManager, self).__init__(scan, progress_queue, **kwargs)
-
- with scan:
- self._measurement = scan.entry.measurement
- self._indices = self._measurement.scalar_data['i']
- self._masked = self._measurement.masked
- try:
- # are we processing a group of mca elements...
- mcas = scan.mcas.values()
- self._counts = [mca['counts'] for mca in mcas]
- self._monitor = mcas[0].monitor.corrected_value
- except AttributeError:
- # or a single element?
- self._counts = [scan['counts']]
- self._monitor = scan.monitor.corrected_value
-
- self._advanced_fit = ClassMcaTheory.McaTheory(config=config)
- self._advanced_fit.enableOptimizedLinearFit()
- self._mass_fraction_tool = None
- if 'concentrations' in config:
- self._mass_fraction_tool = ConcentrationsTool(config['concentrations'])
- self._mass_fraction_tool.config['time'] = 1
-
- def next(self):
- i = self._next_index
- if i >= self.n_points:
- raise StopIteration()
-
- with self.scan:
- try:
- suspect = i != self._indices[i]
- except IndexError:
- suspect = True
- if suspect:
- if i >= self._measurement.entry.npoints:
- # entry.npoints has changed, scan was aborted
- raise StopIteration()
- # expected the datapoint, but not yet acquired
- return None
-
- self._next_index = i + 1
-
- if self._masked[i]:
- return 0
-
- cts = [counts.corrected_value[i] for counts in self._counts]
-
- spectrum = np.sum(cts, 0)
- mft = self.mass_fraction_tool
- if self._monitor is not None:
- mft.config['flux'] = self._monitor[i]
- args = (
- i, spectrum, self.advanced_fit, self.mass_fraction_tool
- )
- return analyze_spectrum, args
-
- def update_element_map(self, element, map_type, index, val):
- with self.scan:
- try:
- entry = '%s_%s'%(element, map_type)
- self.scan['element_maps'][entry][index] = val
- except ValueError:
- print "index %d out of range for %s" % (index, entry)
- except KeyError:
- print "%s not found in element_maps" % entry
- except TypeError:
- print entry, index, val
-
- def update_records(self, data):
- if data is None:
- return
-
- index = data['index']
- self.advanced_fit = data['advanced_fit']
-
- result = data['result']
- for group in result['groups']:
- g = group.replace(' ', '_')
-
- fit_area = result[group]['fitarea']
- if fit_area:
- sigma_area = result[group]['sigmaarea']/fit_area
- else:
- sigma_area = np.nan
-
- self.update_element_map(g, 'fit', index, fit_area)
- self.update_element_map(g, 'fit_error', index, sigma_area)
-
- try:
- mass_fractions = result['concentrations']['mass fraction']
- for key, val in mass_fractions.items():
- k = key.replace(' ', '_')
- self.update_element_map(k, 'mass_fraction', index, val)
- except KeyError:
- pass

0 comments on commit 58e3f03

Please sign in to comment.