diff --git a/momepy/functional/_dimension.py b/momepy/functional/_dimension.py index 566e9cbd..ff05cc39 100644 --- a/momepy/functional/_dimension.py +++ b/momepy/functional/_dimension.py @@ -1,9 +1,12 @@ +import math + import numpy as np +import pandas as pd import shapely from geopandas import GeoDataFrame, GeoSeries from libpysal.graph import Graph from numpy.typing import NDArray -from pandas import Series +from pandas import DataFrame, Series __all__ = [ "volume", @@ -11,9 +14,18 @@ "courtyard_area", "longest_axis_length", "perimeter_wall", + "street_profile", "weighted_character", ] +try: + from numba import njit + + HAS_NUMBA = True +except (ModuleNotFoundError, ImportError): + HAS_NUMBA = False + from libpysal.common import jit as njit + def volume( area: NDArray[np.float_] | Series, @@ -201,3 +213,208 @@ def weighted_character( agg_area = graph.describe(area, statistics=["sum"])["sum"] return stats / agg_area + + +def street_profile( + streets: GeoDataFrame, + buildings: GeoDataFrame, + distance: float = 10, + tick_length: float = 50, + height: None | Series = None, +) -> DataFrame: + """Calculates the street profile characters. + + This functions returns a DataFrame with widths, standard deviation of width, + openness, heights, standard deviation of height and + ratio height/width. The algorithm generates perpendicular lines to the ``streets`` + dataframe features every ``distance`` and measures values on intersections with + features of ``buildings``. If no feature is reached within ``tick_length`` its value + is set as width (being a theoretical maximum). + + Derived from :cite:`araldi2019`. + + Parameters + ---------- + streets : GeoDataFrame + A GeoDataFrame containing streets to analyse. + buildings : GeoDataFrame + A GeoDataFrame containing buildings along the streets. + Only Polygon geometries are currently supported. + distance : int (default 10) + The distance between perpendicular ticks. + tick_length : int (default 50) + The length of ticks. + height: pd.Series (default None) + The ``pd.Series`` where building height are stored. If set to ``None``, + height and ratio height/width will not be calculated. + + Returns + ------- + DataFrame + + Examples + -------- + >>> street_prof = momepy.street_profile(streets_df, + ... buildings_df, height=buildings_df['height']) + >>> streets_df['width'] = street_prof['width'] + >>> streets_df['deviations'] = street_prof['width_deviation'] + """ + + # filter relevant buildings and streets + inp, res = shapely.STRtree(streets.geometry).query( + buildings.geometry, predicate="dwithin", distance=tick_length // 2 + ) + buildings_near_streets = np.unique(inp) + streets_near_buildings = np.unique(res) + relevant_buildings = buildings.iloc[buildings_near_streets].reset_index(drop=True) + relevant_streets = streets.iloc[streets_near_buildings].reset_index(drop=True) + if height is not None: + height = height.iloc[buildings_near_streets].reset_index(drop=True) + + # calculate street profile on a subset of the data + partial_res = _street_profile( + relevant_streets, + relevant_buildings, + distance=distance, + tick_length=tick_length, + height=height, + ) + + # return full result with defaults + final_res = pd.DataFrame(np.nan, index=streets.index, columns=partial_res.columns) + final_res.iloc[streets_near_buildings[partial_res.index.values]] = ( + partial_res.values + ) + ## streets with no buildings get the theoretical width and max openness + final_res.loc[final_res["width"].isna(), "width"] = tick_length + final_res.loc[final_res["openness"].isna(), "openness"] = 1 + + return final_res + + +def _street_profile( + streets: GeoDataFrame, + buildings: GeoDataFrame, + distance: float = 10, + tick_length: float = 50, + height: None | Series = None, +) -> DataFrame: + """Helper function that actually calculates the street profile characters.""" + + ## generate points for every street at `distance` intervals + segments = streets.segmentize(distance) + coords, coord_indxs = shapely.get_coordinates(segments, return_index=True) + starts = ~pd.Series(coord_indxs).duplicated(keep="first") + ends = ~pd.Series(coord_indxs).duplicated(keep="last") + end_markers = starts | ends + + ## generate tick streings + njit_ticks = generate_ticks(coords, end_markers.values, tick_length) + ticks = shapely.linestrings(njit_ticks.reshape(-1, 2, 2)) + + ## find the length of intersection of the nearest building for every tick + inp, res = buildings.geometry.sindex.query(ticks, predicate="intersects") + intersections = shapely.intersection(ticks[inp], buildings.geometry.array[res]) + distances = shapely.distance(intersections, shapely.points(coords[inp // 2])) + + # streets which intersect buildings have 0 distance to them + distances[np.isnan(distances)] = 0 + min_distances = pd.Series(distances).groupby(inp).min() + dists = np.full((len(ticks),), np.nan) + dists[min_distances.index.values] = min_distances.values + + ## generate tick values and groupby street + tick_coords = np.repeat(coord_indxs, 2) + ## multiple agg to avoid custom apply + left_ticks = ( + pd.Series(dists[::2]) + .groupby(tick_coords[::2]) + .mean() + .replace(np.nan, tick_length // 2) + ) + right_ticks = ( + pd.Series(dists[1::2]) + .groupby(tick_coords[1::2]) + .mean() + .replace(np.nan, tick_length // 2) + ) + w = left_ticks + right_ticks + + grouper = pd.Series(dists).groupby(tick_coords) + openness_agg = grouper.agg(["size", "count"]) + # proportion of NAs + o = (openness_agg["size"] - openness_agg["count"]) / openness_agg["size"] + # needs to be seperate to pass ddof + wd = grouper.std(ddof=0) + + final_result = pd.DataFrame( + np.nan, columns=["width", "openness", "width_deviation"], index=streets.index + ) + final_result["width"] = w + final_result["openness"] = o + final_result["width_deviation"] = wd + + ## if heights are available add heights stats to the result + if height is not None: + min_heights = height.loc[res].groupby(inp).min() + tick_heights = np.full((len(ticks),), np.nan) + tick_heights[min_heights.index.values] = min_heights.values + heights_res = pd.Series(tick_heights).groupby(tick_coords).agg(["mean", "std"]) + final_result["height"] = heights_res["mean"] + final_result["height_deviation"] = heights_res["std"] + final_result["hw_ratio"] = final_result["height"] / final_result[ + "width" + ].replace(0, np.nan) + + return final_result + + +# angle between two points +@njit +def _get_angle_njit(x1, y1, x2, y2): + """ + pt1, pt2 : tuple + """ + x_diff = x2 - x1 + y_diff = y2 - y1 + return math.degrees(math.atan2(y_diff, x_diff)) + + +# get the second end point of a tick +# p1 = bearing + 90 +@njit +def _get_point_njit(x1, y1, bearing, dist): + bearing = math.radians(bearing) + x = x1 + dist * math.cos(bearing) + y = y1 + dist * math.sin(bearing) + return np.array((x, y)) + + +@njit +def generate_ticks(list_points, end_markers, tick_length): + ticks = np.empty((len(list_points) * 2, 4), dtype=np.float64) + + for i in range(len(list_points)): + tick_pos = i * 2 + end = end_markers[i] + pt = list_points[i] + + if end: + ticks[tick_pos, :] = np.array([pt[0], pt[1], pt[0], pt[1]]) + ticks[tick_pos + 1, :] = np.array([pt[0], pt[1], pt[0], pt[1]]) + else: + next_pt = list_points[i + 1] + njit_angle1 = _get_angle_njit(pt[0], pt[1], next_pt[0], next_pt[1]) + njit_end_1 = _get_point_njit( + pt[0], pt[1], njit_angle1 + 90, tick_length / 2 + ) + njit_angle2 = _get_angle_njit(njit_end_1[0], njit_end_1[1], pt[0], pt[1]) + njit_end_2 = _get_point_njit( + njit_end_1[0], njit_end_1[1], njit_angle2, tick_length + ) + ticks[tick_pos, :] = np.array([njit_end_1[0], njit_end_1[1], pt[0], pt[1]]) + ticks[tick_pos + 1, :] = np.array( + [njit_end_2[0], njit_end_2[1], pt[0], pt[1]] + ) + + return ticks diff --git a/momepy/functional/tests/test_dimension.py b/momepy/functional/tests/test_dimension.py index 35435298..da91ebd0 100644 --- a/momepy/functional/tests/test_dimension.py +++ b/momepy/functional/tests/test_dimension.py @@ -93,6 +93,79 @@ def test_perimeter_wall(self): pd.testing.assert_series_equal(result, result_given_graph) assert result[0] == pytest.approx(137.210, rel=1e-3) + @pytest.mark.skipif(not GPD_013, reason="no attribute 'segmentize'") + def test_street_profile(self): + sp = mm.street_profile( + self.df_streets, + self.df_buildings, + tick_length=50, + distance=1, + height=self.df_buildings["height"], + ) + + expected_w = { + "count": 35, + "mean": 43.39405649921903, + "min": 31.017731525484447, + "max": 50.0, + } + expected_wd = { + "count": 22, + "mean": 1.1977356963898373, + "min": 0.09706119360586668, + "max": 5.154163996499861, + } + expected_o = { + "count": 35, + "mean": 0.7186966927475066, + "min": 0.15270935960591134, + "max": 1.0, + } + expected_h = { + "count": 22, + "mean": 16.72499857958264, + "min": 10.0, + "max": 28.1969381969382, + } + expected_hd = { + "count": 22, + "mean": 1.2372251098113227, + "min": 0.0, + "max": 7.947097088834963, + } + expected_p = { + "count": 22, + "mean": 0.43257459410448046, + "min": 0.20379941361273096, + "max": 0.7432052069071473, + } + + assert_result(sp["width"], expected_w, self.df_streets) + assert_result(sp["width_deviation"], expected_wd, self.df_streets) + assert_result(sp["openness"], expected_o, self.df_streets) + assert_result(sp["height"], expected_h, self.df_streets) + assert_result(sp["height_deviation"], expected_hd, self.df_streets) + assert_result(sp["hw_ratio"], expected_p, self.df_streets) + + @pytest.mark.skipif(not GPD_013, reason="no attribute 'segmentize'") + def test_street_profile_infinity(self): + # avoid infinity + from shapely import LineString, Point + + blg = gpd.GeoDataFrame( + {"height": [2, 5]}, + geometry=[ + Point(0, 0).buffer(10, cap_style=3), + Point(30, 0).buffer(10, cap_style=3), + ], + ) + lines = gpd.GeoDataFrame( + geometry=[LineString([(-8, -8), (8, 8)]), LineString([(15, -10), (15, 10)])] + ) + assert mm.street_profile(lines, blg, height=blg["height"], distance=2)[ + "hw_ratio" + ].equals(pd.Series([np.nan, 0.35])) + def test_weighted_char(self): weighted = mm.weighted_character( self.df_buildings.height, self.df_buildings.area, self.graph @@ -117,8 +190,8 @@ def setup_method(self): test_file_path = mm.datasets.get_path("bubenec") self.df_buildings = gpd.read_file(test_file_path, layer="buildings") self.df_streets = gpd.read_file(test_file_path, layer="streets") - self.df_tessellation = gpd.read_file(test_file_path, layer="tessellation") self.df_buildings["height"] = np.linspace(10.0, 30.0, 144) + self.df_tessellation = gpd.read_file(test_file_path, layer="tessellation") self.sw = mm.sw_high(k=3, gdf=self.df_tessellation, ids="uID") self.graph = ( Graph.build_contiguity(self.df_tessellation) @@ -126,6 +199,45 @@ def setup_method(self): .assign_self_weight() ) + @pytest.mark.skipif(not GPD_013, reason="no attribute 'segmentize'") + def test_street_profile(self): + sp_new = mm.street_profile( + self.df_streets, + self.df_buildings, + tick_length=50, + distance=1, + height=self.df_buildings["height"], + ) + sp_old = mm.StreetProfile( + self.df_streets, + self.df_buildings, + tick_length=50, + distance=1, + heights=self.df_buildings["height"], + ) + + ## needs tolerance because the generate ticks are different + assert_series_equal(sp_new["width"], sp_old.w, rtol=1e-2, check_names=False) + assert_series_equal( + sp_new["width_deviation"].replace(np.nan, 0), + sp_old.wd, + rtol=1, + check_names=False, + ) + assert_series_equal(sp_new["openness"], sp_old.o, rtol=1e-1, check_names=False) + assert_series_equal( + sp_new["height"].replace(np.nan, 0), sp_old.h, check_names=False, rtol=1e-1 + ) + assert_series_equal( + sp_new["height_deviation"].replace(np.nan, 0), + sp_old.hd, + check_names=False, + rtol=1e-1, + ) + assert_series_equal( + sp_new["hw_ratio"].replace(np.nan, 0), sp_old.p, check_names=False, rtol=1 + ) + def test_covered_area(self): covered_sw_new = self.graph.describe( self.df_tessellation.area, statistics=["sum"]