Skip to content

Commit

Permalink
#126 update mosaic.Layer to work in multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
akorosov committed Jun 29, 2015
1 parent 51e2d8e commit 49fdb34
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 41 deletions.
78 changes: 37 additions & 41 deletions nansat/mosaic.py
Expand Up @@ -89,20 +89,22 @@ class Layer:
def __init__(self, fileName, bands=[1],
opener=Nansat, maskName='mask',
doReproject=True, eResampleAlg=0,
period=(None,None)):
''' Set parameters of processing '''
period=(None,None),
logLevel=30):
# Set parameters of processing
self.fileName = fileName
self.bands = bands
self.opener = opener
self.maskName = maskName
self.doReproject = doReproject
self.period = period
self.n = None
self.eResampleAlg = eResampleAlg
self.logLevel = logLevel

def make_nansat_object(self, domain=None):
''' Open self.fileName with self.opener '''
self.n = self.opener(self.fileName)
def make_nansat_object(self, domain):
# Open self.fileName with self.opener
print 'Layer', self.fileName, self.logLevel
self.n = self.opener(self.fileName, logLevel=self.logLevel)
if self.doReproject:
self.n.reproject(domain, eResampleAlg=self.eResampleAlg)

Expand Down Expand Up @@ -133,29 +135,6 @@ def get_mask_array(self):
return mask



"""
def _get_layer_image(self):
'''Get nansat object from the specifed file
Open file with Nansat
Return, if it is within a given period,
Returns:
--------
Nansat object or None
'''
# open file using Nansat or its child class
n = self.nClass(self.fileName, logLevel=self.logger.level)
# check if image is out of period
self.logger.info('Try to get time from %s' % f)
if n is not None:
return n
"""

class Mosaic(Nansat):
'''Container for mosaicing methods
Expand Down Expand Up @@ -215,17 +194,18 @@ def average(self, files=[], bands=[1], doReproject=True, maskName='mask',
self.logger.error('No input files given!')
return

# create list of layers
domain = Nansat(domain=self)
layers = [Layer(ifile, bands, opener, maskName, doReproject,
eResampleAlg, period) for ifile in files]

# get desired shape
dstShape = self.shape()

# preallocate shared mem array
sharedArray = mp.Array(ctypes.c_float, [0]*(2+len(bands)+len(bands)) * dstShape[0] * dstShape[1])

# create list of layers
domain = Nansat(domain=self)
layers = [Layer(ifile, bands, opener, maskName, doReproject,
eResampleAlg, period, self.logger.level)
for ifile in files]

# test in debug
#sumup(layers[0])

# prepare pool of processors
Expand All @@ -244,6 +224,13 @@ def average(self, files=[], bands=[1], doReproject=True, maskName='mask',
(2+len(bands)*2, dstShape[0], dstShape[1]),
'float32')

# cleanup
pool.terminate()
pool = None
layers = None
metadata = None
sharedArray = None

cntMat = sharedNDArray[0]
maskMat = sharedNDArray[1]
avgMat = sharedNDArray[2:2+len(bands)]
Expand Down Expand Up @@ -281,7 +268,9 @@ def average(self, files=[], bands=[1], doReproject=True, maskName='mask',

def _get_cube(self, files, band, doReproject, maskName, opener,
eResampleAlg,
period):
period,
vmin=-np.inf,
vmax=np.inf):
'''Make cube with data from one band of input files
Open files, reproject, get band, insert into cube
Expand Down Expand Up @@ -317,17 +306,23 @@ def _get_cube(self, files, band, doReproject, maskName, opener,
# for all input files
for i, f in enumerate(files):
self.logger.info('Processing %s' % f)
layer = Layer(f, [band], opener, maskName, doReproject, eResampleAlg, period)

layer = Layer(f, [band], opener, maskName, doReproject,
eResampleAlg, period, logLevel=self.logger.level)
# get nansat from the input Layer
layer.make_nansat_object(domain)

# if not in the period, quit
if not layer.within_period():
continue
# get mask
mask = layer.get_mask_array()
# get arrays with data
bandArray = layer.n[band]
bandArray = layer.n[band].astype('float32')
# remove invalid data
bandArray[mask < 64] = np.nan
bandArray[bandArray < vmin] = np.nan
bandArray[bandArray > vmax] = np.nan

# get metadata
bandMetadata = layer.n.get_metadata(bandID=band)

Expand All @@ -341,7 +336,8 @@ def _get_cube(self, files, band, doReproject, maskName, opener,
return dataCube, maskMat.max(0), bandMetadata

def median(self, files=[], bands=[1], doReproject=True, maskName='mask',
opener=Nansat, threads=1, eResampleAlg=0, period=(None,None)):
opener=Nansat, eResampleAlg=0, period=(None,None),
vmin=-np.inf, vmax=np.inf):
'''Calculate median of input bands
Memory and CPU greedy method. Generates 3D cube from bands of
Expand Down Expand Up @@ -378,7 +374,7 @@ def median(self, files=[], bands=[1], doReproject=True, maskName='mask',
maskName,
opener,
eResampleAlg,
period)
period, vmin, vmax)
median = st.nanmedian(cube, axis=0)

# add band and std with metadata
Expand Down
1 change: 1 addition & 0 deletions nansat/nansat.py
Expand Up @@ -1727,6 +1727,7 @@ def _get_mapper(self, mapperName, **kwargs):
tmpVRT._create_band({'SourceFilename': self.fileName,
'SourceBand': iBand + 1})
tmpVRT.dataset.FlushCache()
self.mapper = 'gdal_bands'

# if GDAL cannot open the file, and no mappers exist which can make VRT
if tmpVRT is None and gdalDataset is None:
Expand Down

0 comments on commit 49fdb34

Please sign in to comment.