Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel #22

Merged
merged 6 commits into from Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [0.12.1-beta.1] - 2020-01-16

### Added
- new branch spatial added

### Changed

- SequentialFeatureSelection parameters order Changed. *scoring* is now before *standardize*.
Expand Down Expand Up @@ -96,3 +99,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- update rasterMath to prevent bug if user has osgeo/gdal version is lower than 2.1.
- prevent bug when in rasterMath if processor has only 1 core.
- minor fixes and doc update

3 changes: 2 additions & 1 deletion museotoolbox/ai/__init__.py
Expand Up @@ -342,7 +342,8 @@ def predict_image(
higher_confidence=False,
in_image_mask=False,
out_nodata=0,
compress=True):
compress=True,
n_jobs = 1):
"""
Predict label from raster using previous learned model.
This function will call self.predictArray(X).
Expand Down
225 changes: 217 additions & 8 deletions museotoolbox/processing/__init__.py
@@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
# ___ ___ _____ _______
Expand All @@ -23,9 +23,13 @@
# spatial libraries
from osgeo import __version__ as osgeo_version
from osgeo import gdal, ogr

from joblib import Parallel, delayed, cpu_count

from ..internal_tools import ProgressBar, push_feedback
from time import time, sleep

from multiprocessing import Pool # Process pool
from multiprocessing import sharedctypes


def image_mask_from_vector(
Expand Down Expand Up @@ -760,7 +764,6 @@ def add_function(
if self.options == []:
self._init_raster_parameters(compress=compress)
else:
params = self.get_raster_parameters()
params = self.get_raster_parameters()
arg_pos = next(
(x for x in params if x.startswith('compress')), None)
Expand Down Expand Up @@ -1035,6 +1038,65 @@ def get_block(self, block_number=0):
tmp = self._manage_2d_mask(tmp)
tmp = np.ma.copy(tmp)
return tmp



def get_block_coords(self, block_number=0):
"""
Get position of a block :

order as [x,y,width,height]

Parameters
-----------
block_number, int, optional (default=0).
Position of the desired block.

Returns
--------
List of positions of the block [x,y,width,height]

"""
if block_number > self.n_blocks:
raise ValueError(
'There are only {} blocks in your image.'.format(
self.n_blocks))
else:

# for col in range(0, self.n_columns, x_block_size):
# width = min(self.n_columns - col, x_block_size)
# height = min(self.n_lines - row, y_block_size)

# if get_block:
# X = self._generate_block_array(
# col, row, width, height, self.mask)


row = [l for l in range(0, self.n_lines, self.y_block_size)]
col = [c for c in range(0, self.n_columns, self.x_block_size)]

row_number = int(block_number / self.n_x_blocks)
col_number = int(block_number % self.n_x_blocks)

width = min(self.n_columns - col[col_number], self.x_block_size)
height = min(self.n_lines - row[row_number], self.y_block_size)

tmp = self._generate_block_array(
col[col_number], row[row_number], width, height, self.mask)

if self.return_3d is False:
tmp = self._manage_2d_mask(tmp)
tmp = np.ma.copy(tmp)

return [col[col_number], row[row_number], width, height]









def get_random_block(self, random_state=None):
"""
Expand Down Expand Up @@ -1207,21 +1269,23 @@ def run(self):
get_block=True):

if isinstance(X, list):
X_ = [np.ma.copy(arr) for arr in X]
X_ = [np.ma.copy(arr) for arr in X] #de type numpy mask
X = X_[0] # X_[0] is used to get mask
else:
X_ = np.ma.copy(X)

if self.verbose:
self.pb.add_position(self._position)

for idx, fun in enumerate(self.functions):
for idx, fun in enumerate(self.functions):# idx : index et fun les fonctions
maxBands = self.outputs[idx].RasterCount

if not np.all(X.mask == 1):
# if all the block is not masked
if not self.return_3d:
if isinstance(X_, list):
X__ = [arr[~X.mask[:, 0], ...].data for arr in X_]

if isinstance(X_, list): # to manage several input raster
X__ = [self.reshape_ndim(arr[~self.reshape_ndim(X).mask[:, 0], ...].data) for arr in X_]
else:
X = self.reshape_ndim(X)
Expand Down Expand Up @@ -1279,7 +1343,7 @@ def run(self):
\n Please give a nodata value when adding the function.')
if np.__version__ >= '1.17' and self.outputNoData[idx] is not False:
resFun = np.nan_to_num(resFun, nan=self.outputNoData[idx])

for ind in range(maxBands):
# write result band per band
indGdal = ind + 1
Expand All @@ -1293,7 +1357,6 @@ def run(self):

curBand.WriteArray(resToWrite, col, line)
curBand.FlushCache()

self._position += 1

self.pb.add_position(self.n_blocks)
Expand All @@ -1313,6 +1376,145 @@ def run(self):
self.outputs[idx] = None




def run_parallel (self, n_jobs = 1, size = '1G'):
"""
Function under construction and used for tests
Apply a function of idx [0] to an image with parallel mode using joblib.
Warning : this function does not write the results in an output.

Parameters
----------
n_jobs : int, optional, ( default value : 1)
Numbers of workers or process that will work in parallel.
size : str, optional ( default value : '1G')
maximun size of ram the program can use to store temporary the results

Returns
-------
None
"""
#create a list of all the blocks
size_value=size[:-1]
if size[-1]=='M':
size_value=1048576*int(size_value)
elif size[-1]=='G':
size_value=1073741824*int(size_value)
elif size[-1]=='T':
size_value=1099511627776*int(size_value)
else :
raise ValueError(' {} is not a valid value. use for example 100G, 10M, 1T '.format(size))
tmp=self.get_random_block()
length=int(size_value/tmp.size*tmp.itemsize)

if length>self.n_blocks:
length=self.n_blocks


if n_jobs < 0 :
n_jobs = cpu_count()
elif n_jobs == 0 :
raise ValueError(' {} is not a valid value.'.ormat(n_jobs))

self.pb = ProgressBar(self.n_blocks, message=self.message)

for i in range(0,self.n_blocks,length):
idx_masked=[]
idx_blocks=[]
if i <=self.n_blocks-length :
for j in range(i,i+length):
if not self.get_block(j).size ==0:
idx_blocks+=[j]
else:
idx_masked+=[j]
else :
for j in range(i,self.n_blocks):
if not self.get_block(j).size ==0:
idx_blocks+=[j]
else:
idx_masked+=[j]

for idx,fun in enumerate(self.functions):
self.pb.add_position(self._position)

#fun get the first function of the rastermath object
fun = self.functions[idx]
if self.functionsKwargs[idx] is not False:
res=Parallel(n_jobs, verbose = self.verbose)(delayed(fun)(self.get_block(i).data,**self.functionsKwargs[idx])for i in idx_blocks)
else :
res=Parallel(n_jobs,verbose = self.verbose)(delayed(fun)(self.get_block(i).data)for i in idx_blocks)

self._write_few_blocks(idx_blocks , res ,idx)
if len(idx_masked)!=0:
self._write_few_blocks(idx_masked,[self.outputNoData[idx] for i in idx_masked] ,idx)
self._position += 1


for idx in range(len(self.outputs)):
self.outputs[idx] = None
self.pb.add_position(self.n_blocks)

def _write_few_blocks (self, idx_blocks, tab_blocks, idx_func):
"""
Uses _write_block to write some blocks from a list to a raster

Parameters
----------
idx_blocks : list.
List of indexes of all blocks
tab_blocks : list.
List of values or tab that will be written in the output image
idx_func : int
function's index

Returns
-------
None.

"""

for i in range (len(idx_blocks)):
self._write_block( idx_blocks[i] ,tab_blocks[i] , idx_func)


def _write_block(self, idx_block, tab_block, idx_func):
"""
Write a block at a position on a output image

Parameters
----------
idx_block : int.
List of indexes of all blocks
tab_blocks : numpy tab.
List of values or tab that will be written in the output image
idx_func : int
function's index

Returns
-------
None.
"""
maxBands = self.outputs[idx_func].RasterCount
for ind in range(maxBands):
# write result band per band
indGdal = ind + 1
curBand = self.outputs[idx_func].GetRasterBand(indGdal)

resToWrite = tab_block[..., ind]
coords = self.get_block_coords(idx_block)
if self.return_3d is False:
# need to reshape as block
tmparr = resToWrite.reshape(coords[3], coords[2])
resToWrite = self.reshape_ndim(tmparr)

curBand.WriteArray(resToWrite,coords[0],coords[1])
curBand.FlushCache()





def sample_extraction(
in_image,
in_vector,
Expand Down Expand Up @@ -1884,3 +2086,10 @@ def _reshape_ndim(X):
if X.ndim == 1:
X = X.reshape(-1, 1)
return X