/
areas.py
717 lines (576 loc) · 31.1 KB
/
areas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
"""
Module for working with large geographical areas
"""
from __future__ import annotations
import itertools
import json
import math
import os
from abc import ABCMeta, abstractmethod
from typing import Any, ClassVar, Iterable, TypeVar, cast
import shapely
import shapely.geometry
import shapely.ops
from shapely.geometry import GeometryCollection, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry
from .api import BatchRequest, SentinelHubBatch, SentinelHubCatalog
from .config import SHConfig
from .constants import CRS
from .data_collections import DataCollection
from .geo_utils import transform_point
from .geometry import BBox, Geometry, _BaseGeometry
from .types import JsonDict
T = TypeVar("T", float, int)
class AreaSplitter(metaclass=ABCMeta):
"""Abstract class for splitter classes. It implements common methods used for splitting large area into smaller
parts.
"""
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
reduce_bbox_sizes: bool = False,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit
the given geometry in `shape_list`.
"""
self.crs = CRS(crs)
self.shape_list = [self._parse_shape(shape, crs) for shape in shape_list]
self.area_shape = self._join_shape_list(self.shape_list)
self.reduce_bbox_sizes = reduce_bbox_sizes
self.area_bbox = self.get_area_bbox()
self.bbox_list, self.info_list = self._make_split()
@staticmethod
def _parse_shape(shape: Polygon | MultiPolygon | _BaseGeometry, crs: CRS) -> Polygon | MultiPolygon:
"""Helper method for parsing input shapes"""
if isinstance(shape, (Polygon, MultiPolygon)):
return shape
if isinstance(shape, _BaseGeometry):
return shape.transform(crs).geometry
raise ValueError(
f"The list of shapes must contain shapes of types {Polygon}, {MultiPolygon} or subtype of {_BaseGeometry}"
)
@staticmethod
def _join_shape_list(shape_list: list[Polygon | MultiPolygon]) -> MultiPolygon:
"""Joins a list of shapes together into one shape
:param shape_list: A list of geometrical shapes describing the area of interest
:return: A multipolygon which is a union of shapes in given list
"""
if shapely.__version__ >= "1.8.0":
return shapely.ops.unary_union(shape_list)
return shapely.ops.cascaded_union(shape_list)
@abstractmethod
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
"""The abstract method where the splitting will happen"""
def get_bbox_list(
self,
crs: CRS | None = None,
buffer: None | float | tuple[float, float] = None,
reduce_bbox_sizes: bool | None = None,
) -> list[BBox]:
"""Returns a list of bounding boxes that are the result of the split
:param crs: Coordinate reference system in which the bounding boxes should be returned. If `None` the CRS will
be the default CRS of the splitter.
:param buffer: A percentage of each BBox size increase. This will cause neighbouring bounding boxes to overlap.
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly
fit the given geometry in `shape_list`. This overrides the same parameter from constructor
:return: List of bounding boxes
"""
bbox_list = self.bbox_list
if buffer:
bbox_list = [bbox.buffer(buffer, relative=True) for bbox in bbox_list]
if reduce_bbox_sizes is None:
reduce_bbox_sizes = self.reduce_bbox_sizes
if reduce_bbox_sizes:
bbox_list = self._reduce_sizes(bbox_list)
if crs:
return [bbox.transform(crs) for bbox in bbox_list]
return bbox_list
def get_geometry_list(self) -> list[Polygon | MultiPolygon]:
"""For each bounding box an intersection with the shape of entire given area is calculated. CRS of the returned
shapes is the same as CRS of the given area.
:return: List of polygons or multipolygons corresponding to the order of bounding boxes
"""
return [self._intersection_area(bbox) for bbox in self.bbox_list]
def get_info_list(self) -> list[dict[str, object]]:
"""Returns a list of dictionaries containing information about bounding boxes obtained in split. The order in
the list matches the order of the list of bounding boxes.
:return: List of dictionaries
"""
return self.info_list
def get_area_shape(self) -> MultiPolygon:
"""Returns a single shape of entire area described with `shape_list` parameter
:return: A multipolygon which is a union of shapes describing the area
"""
return self.area_shape
def get_area_bbox(self, crs: CRS | None = None) -> BBox:
"""Returns a bounding box of the entire area
:param crs: Coordinate reference system in which the bounding box should be returned. If `None` the CRS will
be the default CRS of the splitter.
:return: A bounding box of the area defined by the `shape_list`
"""
bbox_list = [BBox(shape.bounds, crs=self.crs) for shape in self.shape_list]
area_min_x = min(bbox.lower_left[0] for bbox in bbox_list)
area_min_y = min(bbox.lower_left[1] for bbox in bbox_list)
area_max_x = max(bbox.upper_right[0] for bbox in bbox_list)
area_max_y = max(bbox.upper_right[1] for bbox in bbox_list)
bbox = BBox((area_min_x, area_min_y, area_max_x, area_max_y), crs=self.crs)
if crs is None:
return bbox
return bbox.transform(crs)
def _intersects_area(self, bbox: BBox) -> bool:
"""Checks if the bounding box intersects the entire area
:param bbox: A bounding box
:return: `True` if bbox intersects the entire area else False
"""
return self._bbox_to_area_polygon(bbox).intersects(self.area_shape)
def _intersection_area(self, bbox: BBox) -> Polygon | MultiPolygon:
"""Calculates the intersection of a given bounding box and the entire area
:param bbox: A bounding box
:return: A shape of intersection
"""
return self._bbox_to_area_polygon(bbox).intersection(self.area_shape)
def _bbox_to_area_polygon(self, bbox: BBox) -> Polygon:
"""Transforms bounding box into a polygon object in the area CRS.
:param bbox: A bounding box
:return: A polygon
"""
projected_bbox = bbox.transform(self.crs)
return projected_bbox.geometry
def _reduce_sizes(self, bbox_list: list[BBox]) -> list[BBox]:
"""Reduces sizes of bounding boxes"""
return [BBox(self._intersection_area(bbox).bounds, self.crs).transform(bbox.crs) for bbox in bbox_list]
class BBoxSplitter(AreaSplitter):
"""A tool that splits the given area into smaller parts. Given the area it calculates its bounding box and splits
it into smaller bounding boxes of equal size. Then it filters out the bounding boxes that do not intersect the
area. If specified by user it can also reduce the sizes of the remaining bounding boxes to best fit the area.
"""
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
split_shape: None | int | tuple[int, int] = None,
split_size: None | int | tuple[int, int] = None,
**kwargs: Any,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param split_shape: Parameter that describes the shape in which the area bounding box will be split.
It can be a tuple of the form `(n, m)` which means the area bounding box will be split into `n` columns
and `m` rows. It can also be a single integer `n` which is the same as `(n, n)`.
:param split_size: Parameter that describes the size of patches (in the same Unit of Measure of the CRS)
into which the area bounding box will be split. It can be a tuple of the form `(width, height)` which means
the area bounding box will be split into patches of size (width, height). It can also be a single integer
`size` which is the same as `(size, size)`.
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit
the given area geometry from `shape_list`.
"""
if (split_shape is not None) and (split_size is None):
self.split_params = ("shape", _parse_to_pair(split_shape, allowed_types=(int,), param_name="split_shape"))
elif (split_shape is None) and (split_size is not None):
self.split_params = ("size", _parse_to_pair(split_size, allowed_types=(int,), param_name="split_size"))
else:
raise ValueError("Exactly one of 'split_shape' or 'split_size' needs to be specified.")
super().__init__(shape_list, crs, **kwargs)
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
mode, split_params = self.split_params
if mode == "shape":
columns, rows = split_params
bbox_partition = self.area_bbox.get_partition(num_x=columns, num_y=rows)
else:
width, height = split_params
bbox_partition = self.area_bbox.get_partition(size_x=width, size_y=height)
columns = len(bbox_partition)
rows = len(bbox_partition[0])
bbox_list, info_list = [], []
for i, j in itertools.product(range(columns), range(rows)):
if self._intersects_area(bbox_partition[i][j]):
bbox_list.append(bbox_partition[i][j])
info = {"parent_bbox": self.area_bbox, "index_x": i, "index_y": j}
info_list.append(info)
return bbox_list, info_list
class OsmSplitter(AreaSplitter):
"""A tool that splits the given area into smaller parts. For the splitting it uses Open Street Map (OSM) grid on
the specified zoom level. It calculates bounding boxes of all OSM tiles that intersect the area. If specified by
user it can also reduce the sizes of the remaining bounding boxes to best fit the area.
"""
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
zoom_level: int,
**kwargs: Any,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param zoom_level: A zoom level defined by OSM. Level 0 is entire world, level 1 splits the world into
4 parts, etc.
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit
the given area geometry from `shape_list`.
"""
self._POP_WEB_MAX = transform_point((180, 0), CRS.WGS84, CRS.POP_WEB)[0] # pylint: disable=invalid-name
self.zoom_level = zoom_level
super().__init__(shape_list, crs, **kwargs)
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
self.area_bbox = self.get_area_bbox(CRS.POP_WEB)
self._check_area_bbox()
bbox_list, info_list = self._recursive_split(self.get_world_bbox(), 0, 0, 0)
for i, bbox in enumerate(bbox_list):
bbox_list[i] = bbox.transform(self.crs)
return bbox_list, info_list
def _check_area_bbox(self) -> None:
"""The method checks if the area bounding box is completely inside the OSM grid. That means that its latitudes
must be contained in the interval (-85.0511, 85.0511)
:raises: ValueError
"""
for coord in self.area_bbox:
if abs(coord) > self._POP_WEB_MAX:
raise ValueError(
"OsmTileSplitter only works for areas which have latitude in interval (-85.0511, 85.0511)"
)
def get_world_bbox(self) -> BBox:
"""Creates a bounding box of the entire world in EPSG: 3857
:return: Bounding box of entire world
"""
return BBox((-self._POP_WEB_MAX, -self._POP_WEB_MAX, self._POP_WEB_MAX, self._POP_WEB_MAX), crs=CRS.POP_WEB)
def _recursive_split(
self,
bbox: BBox,
zoom_level: int,
column: int,
row: int,
) -> tuple[list[BBox], list[dict[str, object]]]:
"""Method that recursively creates bounding boxes of OSM grid that intersect the area.
:param bbox: Bounding box
:param zoom_level: OSM zoom level
:param column: Column in the OSM grid
:param row: Row in the OSM grid
"""
if zoom_level == self.zoom_level:
return [bbox], [{"zoom_level": zoom_level, "index_x": column, "index_y": row}]
bbox_list, info_list = [], []
bbox_partition = bbox.get_partition(num_x=2, num_y=2)
for i, j in itertools.product(range(2), range(2)):
if self._intersects_area(bbox_partition[i][j]):
bboxes, infos = self._recursive_split(
bbox_partition[i][j], zoom_level + 1, 2 * column + i, 2 * row + 1 - j
)
bbox_list.extend(bboxes)
info_list.extend(infos)
return bbox_list, info_list
class TileSplitter(AreaSplitter):
"""A splitter that uses Sentinel Hub Catalog API to obtain geometries of the original tiling grid of a given
data collection. Additionally, it can further split these geometries into smaller parts.
"""
_CATALOG_FILTER: ClassVar[dict[str, list[str]]] = {
"include": ["id", "geometry", "properties.datetime", "properties.proj:bbox", "properties.proj:epsg"],
"exclude": [],
}
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
time_interval: tuple[str, str],
data_collection: DataCollection,
tile_split_shape: int | tuple[int, int] = 1,
config: SHConfig | None = None,
**kwargs: Any,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param time_interval: Interval with start and end date of the form YYYY-MM-DDThh:mm:ss or YYYY-MM-DD
:param data_collection: A satellite data collection
:param tile_split_shape: Parameter that describes the shape in which the satellite tile bounding boxes will be
split. It can be a tuple of the form `(n, m)` which means the tile bounding boxes will be
split into `n` columns and `m` rows. It can also be a single integer `n` which is the same
as `(n, n)`.
:param config: A custom instance of config class to override parameters from the saved configuration.
:param kwargs: Parameters that are propagated to the base `AreaSplitter` class
"""
self.time_interval = time_interval
self.tile_split_shape = tile_split_shape
self.data_collection = data_collection
sh_config = config or SHConfig()
if data_collection.service_url:
sh_config = sh_config.copy()
sh_config.sh_base_url = data_collection.service_url
self.catalog = SentinelHubCatalog(config=sh_config)
super().__init__(shape_list, crs, **kwargs)
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
tile_dict: dict[tuple[tuple[float, ...], int], dict[str, Any]] = {}
search_iterator = self.catalog.search(
self.data_collection, time=self.time_interval, bbox=self.area_bbox, fields=self._CATALOG_FILTER
)
timestamps = search_iterator.get_timestamps()
geometry_list = search_iterator.get_geometries()
for tile_info, (timestamp, geometry) in zip(search_iterator, zip(timestamps, geometry_list)):
tile_properties = tile_info["properties"]
bbox = BBox(tile_properties["proj:bbox"], crs=tile_properties["proj:epsg"])
bbox_hash = tuple(bbox), bbox.crs.epsg
if bbox_hash not in tile_dict:
tile_dict[bbox_hash] = {"bbox": bbox, "timestamps": [], "ids": [], "geometries": []}
tile_dict[bbox_hash]["timestamps"].append(timestamp)
tile_dict[bbox_hash]["ids"].append(tile_info["id"])
tile_dict[bbox_hash]["geometries"].append(geometry)
bbox_list, info_list = [], []
for tile_info in tile_dict.values():
if not self._intersects_area(tile_info["bbox"]):
continue
tile_bbox: BBox = tile_info["bbox"]
bbox_splitter = BBoxSplitter([tile_bbox.geometry], tile_bbox.crs, split_shape=self.tile_split_shape)
for bbox, info in zip(bbox_splitter.get_bbox_list(), bbox_splitter.get_info_list()):
if self._intersects_area(bbox):
bbox_list.append(bbox)
info["ids"] = tile_info["ids"]
info["timestamps"] = tile_info["timestamps"]
info_list.append(info)
return bbox_list, info_list
class CustomGridSplitter(AreaSplitter):
"""Splitting class which can split according to given custom collection of bounding boxes"""
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
bbox_grid: Iterable[BBox],
bbox_split_shape: int | tuple[int, int] = 1,
**kwargs: Any,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param bbox_grid: A collection of bounding boxes defining a grid of splitting. All of them have to be in the
same CRS.
:param bbox_split_shape: Parameter that describes the shape in which each of the bounding boxes in the given
grid will be split. It can be a tuple of the form `(n, m)` which means the tile bounding boxes will be
split into `n` columns and `m` rows. It can also be a single integer `n` which is the same as `(n, n)`.
:param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit
the given geometry in `shape_list`.
"""
self.bbox_grid = list(bbox_grid)
self.bbox_split_shape = bbox_split_shape
super().__init__(shape_list, crs, **kwargs)
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
bbox_list: list[BBox] = []
info_list: list[dict[str, object]] = []
for grid_idx, grid_bbox in enumerate(self.bbox_grid):
if self._intersects_area(grid_bbox):
bbox_splitter = BBoxSplitter([grid_bbox.geometry], grid_bbox.crs, split_shape=self.bbox_split_shape)
for bbox, info in zip(bbox_splitter.get_bbox_list(), bbox_splitter.get_info_list()):
if self._intersects_area(bbox):
info["grid_index"] = grid_idx
bbox_list.append(bbox)
info_list.append(info)
return bbox_list, info_list
class BaseUtmSplitter(AreaSplitter, metaclass=ABCMeta):
"""Base splitter that returns bboxes of fixed size aligned to UTM zones or UTM grid tiles as defined by the MGRS
The generated bounding box grid will have coordinates in form of
`(N * bbox_size_x + offset_x, M * bbox_size_y + offset_y)`
"""
def __init__(
self,
shape_list: Iterable[Polygon | MultiPolygon | _BaseGeometry],
crs: CRS,
bbox_size: float | tuple[float, float],
offset: tuple[float, float] | None = None,
):
"""
:param shape_list: A list of geometrical shapes describing the area of interest
:param crs: Coordinate reference system of the shapes in `shape_list`
:param bbox_size: A size of generated bounding boxes in horizontal and vertical directions in meters. If a
single value is given that will be interpreted as (value, value).
:param offset: Bounding box offset in horizontal and vertical directions in meters.
"""
self.bbox_size = _parse_to_pair(bbox_size, allowed_types=(int, float), param_name="bbox_size")
self.offset = _parse_to_pair(offset or 0.0, allowed_types=(int, float), param_name="offset")
self.utm_grid = self._get_utm_polygons()
super().__init__(shape_list, crs)
@abstractmethod
def _get_utm_polygons(self) -> list[tuple[BaseGeometry, dict[str, Any]]]:
"""Find UTM grid zones overlapping with input area shape."""
@staticmethod
def _get_utm_from_props(utm_dict: dict[str, Any]) -> CRS:
"""Return the UTM CRS corresponding to the UTM described by the properties dictionary
:param utm_dict: Dictionary reporting name of the UTM zone and MGRS grid
:return: UTM coordinate reference system
"""
hemisphere_digit = 6 if utm_dict["direction"] == "N" else 7
zone_number = utm_dict["zone"]
return CRS(f"32{hemisphere_digit}{zone_number:02d}")
def _align_bbox_to_size(self, bbox: BBox) -> BBox:
"""Align input bbox coordinates to be multiples of the bbox size
:param bbox: Bounding box in UTM coordinates
:return: BBox objects with coordinates multiples of the bbox size
"""
size_x, size_y = self.bbox_size
offset_x, offset_y = self.offset
lower_left_x, lower_left_y = bbox.lower_left
aligned_x = math.floor((lower_left_x - offset_x) / size_x) * size_x + offset_x
aligned_y = math.floor((lower_left_y - offset_y) / size_y) * size_y + offset_y
return BBox(((aligned_x, aligned_y), bbox.upper_right), crs=bbox.crs)
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
"""Split each UTM grid into equally sized bboxes in correct UTM zone"""
size_x, size_y = self.bbox_size
bbox_list: list[BBox] = []
info_list: list[dict[str, object]] = []
index = 0
shape_geometry = Geometry(self.area_shape, self.crs).transform(CRS.WGS84)
for utm_cell in self.utm_grid:
utm_cell_geom, utm_cell_prop = utm_cell
# the UTM MGRS grid definition contains four 0 zones at the poles (0A, 0B, 0Y, 0Z)
if utm_cell_prop["zone"] == 0:
continue
utm_crs = self._get_utm_from_props(utm_cell_prop)
cell_info = dict(
crs=utm_crs.name,
utm_zone=str(utm_cell_prop["zone"]).zfill(2),
utm_row=utm_cell_prop["row"],
direction=utm_cell_prop["direction"],
)
intersection = utm_cell_geom.intersection(shape_geometry.geometry)
if not intersection.is_empty and isinstance(intersection, GeometryCollection):
intersection = MultiPolygon(
geo_object for geo_object in intersection if isinstance(geo_object, (Polygon, MultiPolygon))
)
if not intersection.is_empty:
intersection = Geometry(intersection, CRS.WGS84).transform(utm_crs)
bbox_partition = self._align_bbox_to_size(intersection.bbox).get_partition(size_x=size_x, size_y=size_y)
for i, column in enumerate(bbox_partition):
for j, part in enumerate(column):
if part.geometry.intersects(intersection.geometry):
bbox_list.append(part)
info_list.append(dict(**cell_info, index=index, index_x=i, index_y=j))
index += 1
return bbox_list, info_list
def get_bbox_list(self, buffer: None | float | tuple[float, float] = None) -> list[BBox]: # type: ignore[override]
"""Get list of bounding boxes.
The CRS is fixed to the computed UTM CRS. This BBox splitter does not support reducing size of output
bounding boxes
:param buffer: A percentage of each BBox size increase. This will cause neighbouring bounding boxes to overlap.
:return: List of bounding boxes
"""
return super().get_bbox_list(buffer=buffer)
class UtmGridSplitter(BaseUtmSplitter):
"""Splitter that returns bounding boxes of fixed size aligned to the UTM MGRS grid"""
def _get_utm_polygons(self) -> list[tuple[BaseGeometry, dict[str, Any]]]:
"""Find UTM grid zones overlapping with input area shape
:return: List of geometries and properties of UTM grid zones overlapping with input area shape
"""
# file downloaded from faculty.baruch.cuny.edu/geoportal/data/esri/world/utmzone.zip
utm_grid_filename = os.path.join(os.path.dirname(__file__), ".utmzones.geojson")
if not os.path.isfile(utm_grid_filename):
raise OSError(f"UTM grid definition file does not exist: {os.path.abspath(utm_grid_filename)}")
with open(utm_grid_filename) as utm_grid_file:
utm_grid = json.load(utm_grid_file)["features"]
utm_geom_list = [shapely.geometry.shape(utm_zone["geometry"]) for utm_zone in utm_grid]
utm_prop_list = [
dict(
zone=utm_zone["properties"]["ZONE"],
row=utm_zone["properties"]["ROW_"],
direction="N" if utm_zone["properties"]["ROW_"] >= "N" else "S",
)
for utm_zone in utm_grid
]
return list(zip(utm_geom_list, utm_prop_list))
class UtmZoneSplitter(BaseUtmSplitter):
"""Splitter that returns bounding boxes of fixed size aligned to the equator and the UTM zones."""
LNG_MIN, LNG_MAX, LNG_UTM = -180, 180, 6
LAT_MIN, LAT_MAX, LAT_EQ = -80, 84, 0
def _get_utm_polygons(self) -> list[tuple[BaseGeometry, dict[str, Any]]]:
"""Find UTM zones overlapping with input area shape
The returned geometry corresponds to a triangle ranging from the equator to the North/South Pole
:return: List of geometries and properties of UTM zones overlapping with input area shape
"""
utm_geom_list = []
for lat in [(self.LAT_EQ, self.LAT_MAX), (self.LAT_MIN, self.LAT_EQ)]:
for lng in range(self.LNG_MIN, self.LNG_MAX, self.LNG_UTM):
# A new point is added per each degree - this is inline with geometries used by UtmGridSplitter
# In the future the number of points will be calculated according to bbox_size parameter
points = (
[(lng, degree) for degree in range(lat[0], lat[1])]
+ [(degree, lat[1]) for degree in range(lng, lng + self.LNG_UTM)]
+ [(lng + self.LNG_UTM, degree) for degree in range(lat[1], lat[0], -1)]
+ [(degree, lat[0]) for degree in range(lng + self.LNG_UTM, lng, -1)]
)
utm_geom_list.append(Polygon(points))
utm_prop_list = [
dict(zone=zone, row="", direction=direction) for direction in ["N", "S"] for zone in range(1, 61)
]
return list(zip(utm_geom_list, utm_prop_list))
class BatchSplitter(AreaSplitter):
"""A splitter that obtains split bounding boxes from Sentinel Hub Batch API"""
def __init__(
self,
*,
request_id: str | None = None,
batch_request: BatchRequest | None = None,
config: SHConfig | None = None,
):
"""
:param request_id: An ID of a batch request
:param batch_request: A batch request object. It is an alternative to the `request_id` parameter
:param config: A configuration object with credentials and information about which service deployment to
use.
"""
self.batch_client = SentinelHubBatch(config=config)
if batch_request is None:
if request_id is None:
raise ValueError("One of the parameters request_id and batch_request has to be given")
batch_request = self.batch_client.get_request(request_id)
self.batch_request = batch_request
self.tile_size = self._get_tile_size()
self.tile_buffer = self._get_tile_buffer()
batch_geometry: _BaseGeometry | None = batch_request.geometry or batch_request.bbox
if batch_geometry is None:
raise ValueError("Batch request has both `bbox` and `geometry` set to `None`, which is invalid.")
super().__init__([batch_geometry.geometry], batch_geometry.crs)
def _get_tile_size(self) -> tuple[float, float]:
"""Collects a tile size from the tiling grid info in units of the grid CRS."""
tiling_grid_id = self.batch_request.tiling_grid["id"]
grid_info = self.batch_client.get_tiling_grid(tiling_grid_id)
return grid_info["properties"]["tileWidth"], grid_info["properties"]["tileHeight"]
def _get_tile_buffer(self) -> tuple[float, float]:
"""Calculates tile buffer in units of the grid CRS."""
grid_info = self.batch_request.tiling_grid
resolution = grid_info["resolution"]
return grid_info.get("bufferX", 0) * resolution, grid_info.get("bufferY", 0) * resolution
def _make_split(self) -> tuple[list[BBox], list[dict[str, object]]]:
"""This method actually loads bounding boxes from the service and prepares the lists"""
tile_info_list = list(self.batch_client.iter_tiles(self.batch_request))
bbox_list = [self._reconstruct_bbox(tile_info) for tile_info in tile_info_list]
info_list = [
{key: value for key, value in tile_info.items() if key != "geometry"} for tile_info in tile_info_list
]
return bbox_list, info_list
def _reconstruct_bbox(self, tile_info: JsonDict) -> BBox:
"""Reconstructs a bounding box from tile and grid properties."""
tile_crs = CRS(tile_info["origin"]["crs"]["properties"]["name"])
upper_left_corner = tile_info["origin"]["coordinates"]
width, height = self.tile_size
return BBox(
(
upper_left_corner[0] - self.tile_buffer[0],
upper_left_corner[1] - height - self.tile_buffer[1],
upper_left_corner[0] + width + self.tile_buffer[0],
upper_left_corner[1] + self.tile_buffer[1],
),
tile_crs,
)
def _parse_to_pair(parameter: T | tuple[T, T], allowed_types: tuple[type, ...], param_name: str = "") -> tuple[T, T]:
"""Parses the parameters defining the splitting of the BBox."""
if isinstance(parameter, (tuple, list)) and len(parameter) == 2:
split_x, split_y = parameter
if isinstance(split_x, allowed_types) and isinstance(split_y, allowed_types):
return split_x, split_y
parameter = cast(T, parameter)
if isinstance(parameter, allowed_types):
return parameter, parameter # type: ignore[return-value] # mypy 1.6 fails to do type narrowing here
raise ValueError(
f"Parameter {param_name} must be a single instance or a pair, with allowed types {allowed_types}, but"
f" {parameter} was given"
)