-
Notifications
You must be signed in to change notification settings - Fork 0
/
reproject.py
234 lines (198 loc) · 9.26 KB
/
reproject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from rasterio import warp
from rasterio.crs import CRS
from shapely.geometry import MultiPolygon, Polygon, mapping, shape
DEFAULT_PRECISION = 3
def reproject_geometry(
geojson: Dict[str, Any],
src_crs: str,
dst_crs: str,
dst_tolerance: Optional[float] = None,
precision: int = DEFAULT_PRECISION,
) -> Dict[str, Any]:
"""Reprojects a GeoJSON-like geometry from a source to destination CRS with
the option to limit reprojection error to approximately dst_tolerance.
If dst_tolerance is specified, additional vertices are inserted into the
geometry polygon(s) prior to reprojection to capture projection distortion.
The projected geometries are then simplified by removing vertices until the
simplificaton imparts an error that exceeds dst_tolerance. The supplied
dst_tolerance must be in the linear unit (e.g., meters, feet, degrees) of
the destination CRS.
Args:
geojson (Dict[str, Any]): A GeoJSON-like dictionary containing a
polygon or multipolygon to be reprojected.
src_crs (str): Source CRS string, e.g., an EPSG code or WKT2.
dst_crs (str): Destination CRS string.
dst_tolerance (float, optional): Maximum acceptable "error" of the
reprojected Polygon(s) in the destination CRS. Must be in the linear
unit of the destination CRS.
precision (int, optional): The number of decimal places to include in
the coordinates for the reprojected geometry. Defaults to 3 decimal
places.
Returns:
Dict[str, Any]: A GeoJSON-like dictionary containg the reprojected
Polygon(s)
"""
geometry = shape(geojson)
if isinstance(geometry, Polygon):
multipolygon = False
elif isinstance(geometry, MultiPolygon):
multipolygon = True
else:
raise ValueError(
f"Can only reproject Polygons or MultiPolygons, geometry={geometry}"
)
if multipolygon:
reprojected_geometry = reproject_multipolygon(
geometry, src_crs, dst_crs, dst_tolerance, precision
)
else:
reprojected_geometry = reproject_polygon(
geometry, src_crs, dst_crs, dst_tolerance, precision
)
result: Dict[str, Any] = mapping(reprojected_geometry)
return result
def reproject_multipolygon(
multipolygon: MultiPolygon,
src_crs: str,
dst_crs: str,
dst_tolerance: Optional[float] = None,
precision: int = DEFAULT_PRECISION,
) -> MultiPolygon:
"""Reprojects each polygon in a multipolygon from a source to destination
CRS with the option to limit reprojection error to approximately
dst_tolerance.
If dst_tolerance is specified, additional vertices are inserted into the
geometry polygon(s) prior to reprojection to capture projection distortion.
The projected geometries are then simplified by removing vertices until the
simplificaton imparts an error that exceeds dst_tolerance. The supplied
dst_tolerance must be in the linear unit (e.g., meters, feet, degrees) of
the destination CRS.
Args:
multipolygon (MultiPolygon): The multipolygon to be reprojected.
src_crs (str): Source CRS string, e.g., an EPSG code or WKT2.
dst_crs (str): Destination CRS string.
dst_tolerance (float, optional): Maximum acceptable "error" of the
reprojected Polygon(s) in the destination CRS. Must be in the linear
unit of the destination CRS.
precision (int, optional): The number of decimal places to include in
the coordinates for the reprojected geometry. Defaults to 3 decimal
places.
Returns:
MultiPolygon: The reprojected multipolygon
"""
polygons = []
for polygon in multipolygon.geoms:
polygons.extend(
reproject_polygon(polygon, src_crs, dst_crs, dst_tolerance, precision)
)
return MultiPolygon(polygons)
def reproject_polygon(
polygon: Polygon,
src_crs: str,
dst_crs: str,
dst_tolerance: Optional[float] = None,
precision: int = DEFAULT_PRECISION,
) -> Polygon:
"""Reprojects a polygon from a source to destination CRS with the option to
limit reprojection error to approximately dst_tolerance.
If dst_tolerance is specified, additional vertices are inserted into the
geometry polygon(s) prior to reprojection to capture projection distortion.
The projected geometries are then simplified by removing vertices until the
simplificaton imparts an error that exceeds dst_tolerance. The supplied
dst_tolerance must be in the linear unit (e.g., meters, feet, degrees) of
the destination CRS.
Args:
polygon (Polygon): The polygon to be reprojected.
src_crs (str): Source CRS string, e.g., an EPSG code or WKT2.
dst_crs (str): Destination CRS string.
dst_tolerance (float, optional): Maximum acceptable "error" of the
reprojected Polygon(s) in the destination CRS. Must be in the linear
unit of the destination CRS.
precision (int, optional): The number of decimal places to include in
the coordinates for the reprojected geometry. Defaults to 3 decimal
places.
Returns:
Polygon: The reprojected polygon
"""
if dst_tolerance is not None:
src_bbox = polygon.bounds
src_tolerance = _src_tol(src_crs, src_bbox, dst_crs, dst_tolerance)
polygon = Polygon(_densify_by_distance(polygon.exterior.coords, src_tolerance))
polygon = shape(warp.transform_geom(src_crs, dst_crs, polygon, precision=precision))
if dst_tolerance is not None:
return polygon.simplify(dst_tolerance).simplify(0)
else:
return polygon.simplify(0)
def _src_tol(
src_crs: str, src_bbox: List[float], dst_crs: str, dst_tol: float
) -> float:
"""Converts a tolerance (a distance) from a source CRS linear unit to a
destination CRS linear unit.
Longitudinal ground distances vary with latitude, which means the conversion
of the tolerance (distance) between destination and source units also varies
with latitude. A mid-latitude value of the bounding box is therefore used to
'average' this discrepancy when converting between geographic and projected
units. Since this is not an exact computation, a spherical approximation is
used for the shape of the Earth for simplicity.
Args:
src_crs (str): CRS of the source geometry.
src_bbox (List[float]): Bounding box of the geometry in the source CRS.
dst_crs (str): CRS of the destination geometry.
dst_tolerance (float, optional): Maximum acceptable "error" of the
reprojected Polygon(s) in the destination CRS. Must be in the linear
unit of the destination CRS.
Returns:
float: The destination tolerance (distance) in the source CRS linear
units.
"""
d_crs = CRS.from_string(dst_crs)
s_crs = CRS.from_string(src_crs)
src_tol: float
if s_crs.is_geographic and d_crs.is_geographic:
src_tol = dst_tol
elif s_crs.is_projected and d_crs.is_projected:
src_tol = (
d_crs.linear_units_factor[1] / s_crs.linear_units_factor[1]
) * dst_tol
elif s_crs.is_projected and d_crs.is_geographic:
dst_bbox = warp.transform_bounds(src_crs, dst_crs, *src_bbox)
mid_latitude = (dst_bbox[1] + dst_bbox[3]) / 2
meters_per_degree = 111320 * np.cos(np.deg2rad(mid_latitude))
meters_per_src_unit = s_crs.linear_units_factor[1]
src_units_per_degree = meters_per_degree / meters_per_src_unit
tol_src_units = src_units_per_degree * dst_tol
src_tol = tol_src_units
elif s_crs.is_geographic and d_crs.is_projected:
mid_latitude = (src_bbox[1] + src_bbox[3]) / 2
meters_per_degree = 111320 * np.cos(np.deg2rad(mid_latitude))
meters_per_dst_unit = d_crs.linear_units_factor[1]
tol_meters = meters_per_dst_unit * dst_tol
tol_degree = tol_meters / meters_per_degree
src_tol = tol_degree
return src_tol
def _densify_by_distance(
point_list: List[Tuple[float, float]], densification_length: float
) -> List[Tuple[float, float]]:
"""Densifies the number of points in a list of points by inserting points
at densification_length intervals along the polygon formed by the points.
Args:
point_list (List[Tuple[float, float]]): The list of points to be
densified.
densification_length (int): The interval at which to insert additional
points.
Returns:
List[Tuple[float, float]]: The list of densified points.
"""
points: Any = np.asarray(point_list)
dxdy = points[1:, :] - points[:-1, :]
segment_lengths = np.sqrt(np.sum(np.square(dxdy), axis=1))
total_length = np.sum(segment_lengths)
cum_segment_lengths = np.cumsum(segment_lengths)
cum_segment_lengths = np.insert(cum_segment_lengths, 0, [0])
cum_interp_lengths = np.arange(0, total_length, densification_length)
cum_interp_lengths = np.append(cum_interp_lengths, [total_length])
interp_x = np.interp(cum_interp_lengths, cum_segment_lengths, points[:, 0])
interp_y = np.interp(cum_interp_lengths, cum_segment_lengths, points[:, 1])
return [(x, y) for x, y in zip(interp_x, interp_y)]