/
_core.py
538 lines (478 loc) · 18.3 KB
/
_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
"""Main module managing processes."""
from cachetools import LRUCache
import concurrent.futures
import logging
import multiprocessing
import threading
from mapchete.config import MapcheteConfig
from mapchete.errors import MapcheteNodataTile
from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess
from mapchete.tile import count_tiles
from mapchete._timer import Timer
from mapchete.validate import validate_tile, validate_zooms
logger = logging.getLogger(__name__)
def open(
config, mode="continue", zoom=None, bounds=None, single_input_file=None,
with_cache=False, debug=False
):
"""
Open a Mapchete process.
Parameters
----------
config : MapcheteConfig object, config dict or path to mapchete file
Mapchete process configuration
mode : string
* ``memory``: Generate process output on demand without reading
pre-existing data or writing new data.
* ``readonly``: Just read data without processing new data.
* ``continue``: (default) Don't overwrite existing output.
* ``overwrite``: Overwrite existing output.
zoom : list or integer
process zoom level or a pair of minimum and maximum zoom level
bounds : tuple
left, bottom, right, top process boundaries in output pyramid
single_input_file : string
single input file if supported by process
with_cache : bool
process output data cached in memory
Returns
-------
Mapchete
a Mapchete process object
"""
return Mapchete(
MapcheteConfig(
config, mode=mode, zoom=zoom, bounds=bounds,
single_input_file=single_input_file, debug=debug
),
with_cache=with_cache
)
class Mapchete(object):
"""
Main entry point to every processing job.
From here, the process tiles can be determined and executed.
Parameters
----------
config : MapcheteConfig
Mapchete process configuration
with_cache : bool
cache processed output data in memory (default: False)
Attributes
----------
config : MapcheteConfig
Mapchete process configuration
with_cache : bool
process output data cached in memory
"""
def __init__(self, config, with_cache=False):
"""
Initialize Mapchete processing endpoint.
Parameters
----------
config : MapcheteConfig
Mapchete process configuration
with_cache : bool
cache processed output data in memory (default: False)
"""
logger.info("initialize process")
if not isinstance(config, MapcheteConfig):
raise TypeError("config must be MapcheteConfig object")
self.config = config
self.process_name = self.config.process_name
self.with_cache = True if self.config.mode == "memory" else with_cache
if self.with_cache:
self.process_tile_cache = LRUCache(maxsize=512)
self.current_processes = {}
self.process_lock = threading.Lock()
self._count_tiles_cache = {}
def get_process_tiles(self, zoom=None):
"""
Yield process tiles.
Tiles intersecting with the input data bounding boxes as well as
process bounds, if provided, are considered process tiles. This is to
avoid iterating through empty tiles.
Parameters
----------
zoom : integer
zoom level process tiles should be returned from; if none is given,
return all process tiles
yields
------
BufferedTile objects
"""
if zoom or zoom == 0:
for tile in self.config.process_pyramid.tiles_from_geom(
self.config.area_at_zoom(zoom), zoom
):
yield tile
else:
for zoom in reversed(self.config.zoom_levels):
for tile in self.config.process_pyramid.tiles_from_geom(
self.config.area_at_zoom(zoom), zoom
):
yield tile
def skip_tiles(self, tiles=None):
"""
Quickly determine whether tiles can be skipped for processing.
The skip value is True if process mode is 'continue' and process output already
exists. In all other cases, skip is False.
Parameters
----------
tiles : list of process tiles
Yields
------
tuples : (tile, skip)
"""
def _skip(config, tile):
return tile, config.output_reader.tiles_exist(tile)
# only check for existing output in "continue" mode
if self.config.mode == "continue":
with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_skip, self.config, tile) for tile in tiles)
):
yield future.result()
else:
for tile in tiles:
yield (tile, False)
def batch_process(
self,
zoom=None,
tile=None,
multi=None,
max_chunksize=1,
multiprocessing_module=None,
multiprocessing_start_method="fork",
skip_output_check=False
):
"""
Process a large batch of tiles.
Parameters
----------
process : MapcheteProcess
process to be run
zoom : list or int
either single zoom level or list of minimum and maximum zoom level;
None processes all (default: None)
tile : tuple
zoom, row and column of tile to be processed (cannot be used with
zoom)
multi : int
number of workers (default: number of CPU cores)
max_chunksize : int
maximum number of process tiles to be queued for each worker;
(default: 1)
multiprocessing_module : module
either Python's standard 'multiprocessing' or Celery's 'billiard' module
(default: multiprocessing)
multiprocessing_start_method : str
"fork", "forkserver" or "spawn"
(default: "fork")
skip_output_check : bool
skip checking whether process tiles already have existing output before
starting to process;
"""
list(self.batch_processor(
zoom=zoom,
tile=tile,
multi=multi or multiprocessing.cpu_count(),
max_chunksize=max_chunksize,
multiprocessing_module=multiprocessing_module or multiprocessing,
multiprocessing_start_method=multiprocessing_start_method,
skip_output_check=skip_output_check
))
def batch_processor(
self,
zoom=None,
tile=None,
multi=None,
max_chunksize=1,
multiprocessing_module=None,
multiprocessing_start_method="fork",
skip_output_check=False
):
"""
Process a large batch of tiles and yield report messages per tile.
Parameters
----------
zoom : list or int
either single zoom level or list of minimum and maximum zoom level;
None processes all (default: None)
tile : tuple
zoom, row and column of tile to be processed (cannot be used with
zoom)
multi : int
number of workers (default: number of CPU cores)
max_chunksize : int
maximum number of process tiles to be queued for each worker;
(default: 1)
multiprocessing_module : module
either Python's standard 'multiprocessing' or Celery's 'billiard' module
(default: multiprocessing)
multiprocessing_start_method : str
"fork", "forkserver" or "spawn"
(default: "fork")
skip_output_check : bool
skip checking whether process tiles already have existing output before
starting to process;
"""
if zoom and tile:
raise ValueError("use either zoom or tile")
# run single tile
if tile:
yield _run_on_single_tile(
process=self,
tile=self.config.process_pyramid.tile(*tuple(tile))
)
# run area
else:
for process_info in _run_area(
process=self,
zoom_levels=list(_get_zoom_level(zoom, self)),
multi=multi or multiprocessing.cpu_count(),
max_chunksize=max_chunksize,
multiprocessing_module=multiprocessing_module or multiprocessing,
multiprocessing_start_method=multiprocessing_start_method,
skip_output_check=skip_output_check
):
yield process_info
def count_tiles(self, minzoom, maxzoom, init_zoom=0):
"""
Count number of tiles intersecting with process area at given zoom levels.
Parameters
----------
geometry : shapely geometry
pyramid : TilePyramid
minzoom : int
maxzoom : int
init_zoom : int
Returns
-------
number of tiles
"""
if (minzoom, maxzoom) not in self._count_tiles_cache:
self._count_tiles_cache[(minzoom, maxzoom)] = count_tiles(
self.config.area_at_zoom(), self.config.process_pyramid,
minzoom, maxzoom, init_zoom=0
)
return self._count_tiles_cache[(minzoom, maxzoom)]
def execute(self, process_tile, raise_nodata=False):
"""
Run Mapchete process on a tile.
Execute, write and return data.
Parameters
----------
process_tile : Tile or tile index tuple
Member of the process tile pyramid (not necessarily the output
pyramid, if output has a different metatiling setting)
Returns
-------
data : NumPy array or features
process output
"""
process_tile = validate_tile(process_tile, self.config.process_pyramid)
try:
return self.config.output.streamline_output(
TileProcess(tile=process_tile, config=self.config).execute()
)
except MapcheteNodataTile:
if raise_nodata:
raise
else:
return self.config.output.empty(process_tile)
def read(self, output_tile):
"""
Read from written process output.
Parameters
----------
output_tile : BufferedTile or tile index tuple
Member of the output tile pyramid (not necessarily the process
pyramid, if output has a different metatiling setting)
Returns
-------
data : NumPy array or features
process output
"""
output_tile = validate_tile(output_tile, self.config.output_pyramid)
if self.config.mode not in ["readonly", "continue", "overwrite"]:
raise ValueError("process mode must be readonly, continue or overwrite")
return self.config.output.read(output_tile)
def write(self, process_tile, data):
"""
Write data into output format.
Parameters
----------
process_tile : BufferedTile or tile index tuple
process tile
data : NumPy array or features
data to be written
"""
process_tile = validate_tile(process_tile, self.config.process_pyramid)
if self.config.mode not in ["continue", "overwrite"]:
raise ValueError("cannot write output in current process mode")
if self.config.mode == "continue" and (
self.config.output.tiles_exist(process_tile)
):
message = "output exists, not overwritten"
logger.debug((process_tile.id, message))
return ProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
written=False,
write_msg=message
)
elif data is None:
message = "output empty, nothing written"
logger.debug((process_tile.id, message))
return ProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
written=False,
write_msg=message
)
else:
with Timer() as t:
self.config.output.write(process_tile=process_tile, data=data)
message = "output written in %s" % t
logger.debug((process_tile.id, message))
return ProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
written=True,
write_msg=message
)
def get_raw_output(self, tile, _baselevel_readonly=False):
"""
Get output raw data.
This function won't work with multiprocessing, as it uses the
``threading.Lock()`` class.
Parameters
----------
tile : tuple, Tile or BufferedTile
If a tile index is given, a tile from the output pyramid will be
assumed. Tile cannot be bigger than process tile!
Returns
-------
data : NumPy array or features
process output
"""
tile = validate_tile(tile, self.config.output_pyramid)
tile = (
self.config.baselevels["tile_pyramid"].tile(*tile.id)
if _baselevel_readonly
else tile
)
# Return empty data if zoom level is outside of process zoom levels.
if tile.zoom not in self.config.zoom_levels:
return self.config.output.empty(tile)
# TODO implement reprojection
if tile.crs != self.config.process_pyramid.crs:
raise NotImplementedError(
"reprojection between processes not yet implemented"
)
if self.config.mode == "memory":
# Determine affected process Tile and check whether it is already
# cached.
process_tile = self.config.process_pyramid.intersecting(tile)[0]
return self._extract(
in_tile=process_tile,
in_data=self._execute_using_cache(process_tile),
out_tile=tile
)
# TODO: cases where tile intersects with multiple process tiles
process_tile = self.config.process_pyramid.intersecting(tile)[0]
# get output_tiles that intersect with current tile
if tile.pixelbuffer > self.config.output.pixelbuffer:
output_tiles = list(self.config.output_pyramid.tiles_from_bounds(
tile.bounds, tile.zoom
))
else:
output_tiles = self.config.output_pyramid.intersecting(tile)
if self.config.mode == "readonly" or _baselevel_readonly:
if self.config.output.tiles_exist(process_tile):
return self._read_existing_output(tile, output_tiles)
else:
return self.config.output.empty(tile)
elif self.config.mode == "continue" and not _baselevel_readonly:
if self.config.output.tiles_exist(process_tile):
return self._read_existing_output(tile, output_tiles)
else:
return self._process_and_overwrite_output(tile, process_tile)
elif self.config.mode == "overwrite" and not _baselevel_readonly:
return self._process_and_overwrite_output(tile, process_tile)
def _process_and_overwrite_output(self, tile, process_tile):
if self.with_cache:
output = self._execute_using_cache(process_tile)
else:
output = self.execute(process_tile)
self.write(process_tile, output)
return self._extract(
in_tile=process_tile,
in_data=output,
out_tile=tile
)
def _read_existing_output(self, tile, output_tiles):
return self.config.output.extract_subset(
input_data_tiles=[
(output_tile, self.read(output_tile))
for output_tile in output_tiles
],
out_tile=tile,
)
def _execute_using_cache(self, process_tile):
# Extract Tile subset from process Tile and return.
try:
return self.process_tile_cache[process_tile.id]
except KeyError:
# Lock process for Tile or wait.
with self.process_lock:
process_event = self.current_processes.get(process_tile.id)
if not process_event:
self.current_processes[process_tile.id] = threading.Event()
# Wait and return.
if process_event:
process_event.wait()
return self.process_tile_cache[process_tile.id]
else:
try:
output = self.execute(process_tile)
self.process_tile_cache[process_tile.id] = output
if self.config.mode in ["continue", "overwrite"]:
self.write(process_tile, output)
return self.process_tile_cache[process_tile.id]
finally:
with self.process_lock:
process_event = self.current_processes.get(
process_tile.id)
del self.current_processes[process_tile.id]
process_event.set()
def _extract(self, in_tile=None, in_data=None, out_tile=None):
"""Extract data from tile."""
return self.config.output.extract_subset(
input_data_tiles=[(in_tile, in_data)],
out_tile=out_tile
)
def __enter__(self):
"""Enable context manager."""
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
"""Cleanup on close."""
# run input drivers cleanup
for ip in self.config.input.values():
if ip is not None:
ip.cleanup()
# run output driver cleanup
self.config.output.close(
exc_type=exc_type, exc_value=exc_value, exc_traceback=exc_traceback
)
# clean up internal cache
if self.with_cache:
self.process_tile_cache = None
self.current_processes = None
self.process_lock = None
def _get_zoom_level(zoom, process):
"""Determine zoom levels."""
return reversed(process.config.zoom_levels) if zoom is None else validate_zooms(zoom)