-
Notifications
You must be signed in to change notification settings - Fork 58
/
intensity.py
704 lines (599 loc) · 22.9 KB
/
intensity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# intensity.py
# definitions of intensity characters
import collections
import warnings
import numpy as np
import pandas as pd
from tqdm.auto import tqdm # progress bar
__all__ = [
"AreaRatio",
"Count",
"Courtyards",
"BlocksCount",
"Reached",
"NodeDensity",
"Density",
]
class AreaRatio:
"""
Calculate covered area ratio or floor area ratio of objects.
Either ``unique_id`` or both ``left_unique_id`` and ``right_unique_id``
are required.
.. math::
\\textit{covering object area} \\over \\textit{covered object area}
Adapted from :cite:`schirmer2015`.
Parameters
----------
left : GeoDataFrame
GeoDataFrame containing objects being covered (e.g. land unit)
right : GeoDataFrame
GeoDataFrame with covering objects (e.g. building)
left_areas : str, list, np.array, pd.Series
the name of the left dataframe column, ``np.array``, or ``pd.Series`` where
area value is stored
right_areas : str, list, np.array, pd.Series
the name of the right dataframe column, ``np.array``, or ``pd.Series`` where
area value is stored
representing either projected or floor area.
unique_id : str (default None)
name of the column with unique id shared amongst left and right gdfs.
If there is none, it could be generated by :py:func:'momepy.unique_id()'.
left_unique_id : str, list, np.array, pd.Series (default None)
the name of the left dataframe column, ``np.array``, or ``pd.Series`` where
shared unique ID is stored
right_unique_id : str, list, np.array, pd.Series (default None)
the name of the left dataframe column, ``np.array``, or ``pd.Series`` where
shared unique ID is stored
Attributes
----------
series : Series
Series containing resulting values
left : GeoDataFrame
original left GeoDataFrame
right : GeoDataFrame
original right GeoDataFrame
left_areas : Series
Series containing used left areas
right_areas : Series
Series containing used right areas
left_unique_id : Series
Series containing used left ID
right_unique_id : Series
Series containing used right ID
Examples
--------
>>> tessellation_df['CAR'] = mm.AreaRatio(tessellation_df,
... buildings_df,
... 'area',
... 'area',
... 'uID').series
"""
def __init__(
self,
left,
right,
left_areas,
right_areas,
unique_id=None,
left_unique_id=None,
right_unique_id=None,
):
self.left = left
self.right = right
left = left.copy()
right = right.copy()
if unique_id:
left_unique_id = unique_id
right_unique_id = unique_id
else:
if left_unique_id is None or right_unique_id is None:
raise ValueError(
"Unique ID not correctly set. Use either network_id or both"
"left_unique_id and right_unique_id."
)
self.left_unique_id = left_unique_id
self.right_unique_id = right_unique_id
if not isinstance(left_areas, str):
left["mm_a"] = left_areas
left_areas = "mm_a"
self.left_areas = left[left_areas]
if not isinstance(right_areas, str):
right["mm_a"] = right_areas
right_areas = "mm_a"
self.right_areas = right[right_areas]
look_for = right[
[right_unique_id, right_areas]
].copy() # keeping only necessary columns
look_for.rename(index=str, columns={right_areas: "lf_area"}, inplace=True)
look_for = look_for.groupby(right_unique_id).sum().reset_index()
objects_merged = left[[left_unique_id, left_areas]].merge(
look_for, left_on=left_unique_id, right_on=right_unique_id, how="left"
)
objects_merged.index = left.index
self.series = objects_merged["lf_area"] / objects_merged[left_areas]
class Count:
"""
Calculate the number of elements within an aggregated structure.
Aggregated structure can be typically block, street segment or street node (their
snapepd objects). Right gdf has to have
unique id of aggregated structure assigned before hand (e.g. using
:py:func:`momepy.get_network_id`).
If ``weighted=True``, number of elements will be divided by the area of
length (based on geometry type) of aggregated
element, to return relative value.
.. math::
\\sum_{i \\in aggr} (n_i);\\space \\frac{\\sum_{i \\in aggr} (n_i)}{area_{aggr}}
Adapted from :cite:`hermosilla2012` and :cite:`feliciotti2018`.
Parameters
----------
left : GeoDataFrame
GeoDataFrame containing aggregation to analyse
right : GeoDataFrame
GeoDataFrame containing objects to analyse
left_id : str
name of the column where unique ID in left gdf is stored
right_id : str
name of the column where unique ID of aggregation in right gdf is stored
weighted : bool (default False)
if ``True``, count will be divided by the area or length
Attributes
----------
series : Series
Series containing resulting values
left : GeoDataFrame
original left GeoDataFrame
right : GeoDataFrame
original right GeoDataFrame
left_id : Series
Series containing used left ID
right_id : Series
Series containing used right ID
weighted : bool
used weighted value
Examples
--------
>>> blocks_df['buildings_count'] = mm.Count(blocks_df,
... buildings_df,
... 'bID',
... 'bID',
... weighted=True).series
"""
def __init__(self, left, right, left_id, right_id, weighted=False):
self.left = left
self.right = right
self.left_id = left[left_id]
self.right_id = right[right_id]
self.weighted = weighted
count = collections.Counter(right[right_id])
df = pd.DataFrame.from_dict(count, orient="index", columns=["mm_count"])
joined = left[[left_id, left.geometry.name]].join(df["mm_count"], on=left_id)
joined.loc[joined["mm_count"].isna(), "mm_count"] = 0
if weighted:
if left.geometry[0].type in ["Polygon", "MultiPolygon"]:
joined["mm_count"] = joined["mm_count"] / left.geometry.area
elif left.geometry[0].type in ["LineString", "MultiLineString"]:
joined["mm_count"] = joined["mm_count"] / left.geometry.length
else:
raise TypeError("Geometry type does not support weighting.")
self.series = joined["mm_count"]
class Courtyards:
"""
Calculate the number of courtyards within the joined structure.
Adapted from :cite:`schirmer2015`.
Parameters
----------
gdf : GeoDataFrame
GeoDataFrame containing objects to analyse
spatial_weights : libpysal.weights, optional
spatial weights matrix - If None, Queen contiguity matrix will be calculated
based on objects. It is to denote adjacent buildings
(note: based on integer index).
verbose : bool (default True)
if True, shows progress bars in loops and indication of steps
Attributes
----------
series : Series
Series containing resulting values
gdf : GeoDataFrame
original GeoDataFrame
sw : libpysal.weights
spatial weights matrix
Examples
--------
>>> buildings_df['courtyards'] = mm.Courtyards(buildings_df).series
Calculating spatial weights...
"""
def __init__(self, gdf, spatial_weights=None, verbose=True):
self.gdf = gdf
results_list = []
gdf = gdf.copy()
# if weights matrix is not passed, generate it from objects
if spatial_weights is None:
print("Calculating spatial weights...") if verbose else None
from libpysal.weights import Queen
spatial_weights = Queen.from_dataframe(gdf, silence_warnings=True)
self.sw = spatial_weights
# dict to store nr of courtyards for each uID
courtyards = {}
components = pd.Series(spatial_weights.component_labels, index=gdf.index)
for i, index in tqdm(
enumerate(gdf.index), total=gdf.shape[0], disable=not verbose
):
# if the id is already present in courtyards, continue (avoid repetition)
if index in courtyards:
continue
else:
comp = spatial_weights.component_labels[i]
to_join = components[components == comp].index
joined = gdf.loc[to_join]
# buffer to avoid multipolygons where buildings touch by corners only
dissolved = joined.geometry.buffer(0.01).unary_union
interiors = len(list(dissolved.interiors))
for b in to_join:
courtyards[b] = interiors # fill dict with values
results_list = [courtyards[index] for index in gdf.index]
self.series = pd.Series(results_list, index=gdf.index)
class BlocksCount:
"""
Calculates the weighted number of blocks
Number of blocks within neighbours defined in ``spatial_weights`` divided by
the area you have covered by the neighbours.
.. math::
Adapted from :cite:`dibble2017`.
Parameters
----------
gdf : GeoDataFrame
GeoDataFrame containing morphological tessellation
block_id : str, list, np.array, pd.Series
the name of the objects dataframe column, ``np.array``, or ``pd.Series``
where block ID is stored
spatial_weights : libpysal.weights
spatial weights matrix
unique_id : str
name of the column with unique id used as ``spatial_weights`` index
weigted : bool, default True
return value weighted by the analysed area (``True``) or pure count (``False``)
verbose : bool (default True)
if True, shows progress bars in loops and indication of steps
Attributes
----------
series : Series
Series containing resulting values
gdf : GeoDataFrame
original GeoDataFrame
block_id : Series
Series containing used block ID
sw : libpysal.weights
spatial weights matrix
id : Series
Series containing used unique ID
weighted : bool
used weighted value
Examples
--------
>>> sw4 = mm.sw_high(k=4, gdf='tessellation_df', ids='uID')
>>> tessellation_df['blocks_within_4'] = mm.BlocksCount(tessellation_df,
... 'bID',
... sw4,
... 'uID').series
"""
def __init__(
self, gdf, block_id, spatial_weights, unique_id, weighted=True, verbose=True
):
self.gdf = gdf
self.sw = spatial_weights
self.id = gdf[unique_id]
self.weighted = weighted
# define empty list for results
results_list = []
data = gdf.copy()
if not isinstance(block_id, str):
data["mm_bid"] = block_id
block_id = "mm_bid"
self.block_id = data[block_id]
data = data.set_index(unique_id)
if weighted is True:
areas = data.geometry.area
for index in tqdm(data.index, total=data.shape[0], disable=not verbose):
if index in spatial_weights.neighbors.keys():
neighbours = [index]
neighbours += spatial_weights.neighbors[index]
vicinity = data.loc[neighbours]
if weighted is True:
results_list.append(
vicinity[block_id].unique().shape[0]
/ sum(areas.loc[neighbours])
)
elif weighted is False:
results_list.append(vicinity[block_id].unique().shape[0])
else:
raise ValueError("Attribute 'weighted' needs to be True or False.")
else:
results_list.append(np.nan)
self.series = pd.Series(results_list, index=gdf.index)
class Reached:
"""
Calculates the number of objects reached within neighbours on street network
Number of elements within neighbourhood defined in ``spatial_weights``. If
``spatial_weights`` are ``None``, it will assume topological distance ``0``
(element itself).
If ``mode='area'``, returns sum of areas of reached elements. Requires unique_id
of network assigned beforehand (e.g. using :py:func:`momepy.get_network_id`).
.. math::
Parameters
----------
left : GeoDataFrame
GeoDataFrame containing streets (either segments or nodes)
right : GeoDataFrame
GeoDataFrame containing elements to be counted
left_id : str, list, np.array, pd.Series (default None)
the name of the left dataframe column, ``np.array``, or ``pd.Series`` where is
stored ID of streets (segments or nodes).
right_id : str, list, np.array, pd.Series (default None)
the name of the right dataframe column, ``np.array``, or ``pd.Series`` where is
stored ID of streets (segments or nodes).
spatial_weights : libpysal.weights (default None)
spatial weights matrix
mode : str (default 'count')
mode of calculation. If ``'count'`` function will return
the count of reached elements.
If ``'sum'``, it will return sum of ``'values'``. If ``'mean'`` it will
return mean value
of ``'values'``. If ``'std'`` it will return standard deviation
of ``'values'``. If ``'values'`` not set it will use of areas
of reached elements.
values : str (default None)
the name of the objects dataframe column with values used for calculations
verbose : bool (default True)
if True, shows progress bars in loops and indication of steps
Attributes
----------
series : Series
Series containing resulting values
left : GeoDataFrame
original left GeoDataFrame
right : GeoDataFrame
original right GeoDataFrame
left_id : Series
Series containing used left ID
right_id : Series
Series containing used right ID
mode : str
mode of calculation
sw : libpysal.weights
spatial weights matrix (if set)
Examples
--------
>>> streets_df['reached'] = mm.Reached(streets_df, buildings_df, 'uID').series
"""
# TODO: allow all modes
def __init__(
self,
left,
right,
left_id,
right_id,
spatial_weights=None,
mode="count",
values=None,
verbose=True,
):
self.left = left
self.right = right
self.sw = spatial_weights
self.mode = mode
# define empty list for results
results_list = []
if not isinstance(right_id, str):
right = right.copy()
right["mm_id"] = right_id
right_id = "mm_id"
self.right_id = right[right_id]
if not isinstance(left_id, str):
left = left.copy()
left["mm_lid"] = left_id
left_id = "mm_lid"
self.left_id = left[left_id]
if mode == "count":
count = collections.Counter(right[right_id])
# iterating over rows one by one
for index, lid in tqdm(
left[left_id].iteritems(), total=left.shape[0], disable=not verbose
):
if spatial_weights is None:
ids = [lid]
else:
neighbours = [index]
neighbours += spatial_weights.neighbors[index]
ids = left.iloc[neighbours][left_id]
if mode == "count":
counts = []
for nid in ids:
counts.append(count[nid])
results_list.append(sum(counts))
else:
if mode == "sum":
func = sum
elif mode == "mean":
func = np.nanmean
elif mode == "std":
func = np.nanstd
mask = right[right_id].isin(ids)
if mask.any():
if values:
results_list.append(func(right.loc[mask][values]))
else:
results_list.append(func(right.loc[mask].geometry.area))
else:
results_list.append(np.nan)
self.series = pd.Series(results_list, index=left.index)
class NodeDensity:
"""
Calculate the density of nodes neighbours on street network
defined in ``spatial_weights``.
Calculated as number of neighbouring nodes / cummulative length
of street network within neighbours.
node_start and node_end is standard output of :py:func:`momepy.nx_to_gdf`
and is compulsory.
Adapted from :cite:`dibble2017`.
Parameters
----------
left : GeoDataFrame
GeoDataFrame containing nodes of street network
right : GeoDataFrame
GeoDataFrame containing edges of street network
spatial_weights : libpysal.weights
spatial weights matrix capturing relationship between nodes
weighted : bool (default False)
if True density will take into account node degree as ``k-1``
node_degree : str (optional)
name of the column of left gdf containing node degree. Used if ``weighted=True``
node_start : str (default 'node_start')
name of the column of right gdf containing id of starting node
node_end : str (default 'node_end')
name of the column of right gdf containing id of ending node
verbose : bool (default True)
if True, shows progress bars in loops and indication of steps
Attributes
----------
series : Series
Series containing resulting values
left : GeoDataFrame
original left GeoDataFrame
right : GeoDataFrame
original right GeoDataFrame
node_start : Series
Series containing used ids of starting node
node_end : Series
Series containing used ids of ending node
sw : libpysal.weights
spatial weights matrix
weighted : bool
used weighted value
node_degree : Series
Series containing used node degree values
Examples
--------
>>> nodes['density'] = mm.NodeDensity(nodes, edges, sw).series
"""
def __init__(
self,
left,
right,
spatial_weights,
weighted=False,
node_degree=None,
node_start="node_start",
node_end="node_end",
verbose=True,
):
self.left = left
self.right = right
self.sw = spatial_weights
self.weighted = weighted
if weighted:
self.node_degree = left[node_degree]
self.node_start = right[node_start]
self.node_end = right[node_end]
# define empty list for results
results_list = []
lengths = right.geometry.length
# iterating over rows one by one
for index in tqdm(left.index, total=left.shape[0], disable=not verbose):
neighbours = [index]
neighbours += spatial_weights.neighbors[index]
if weighted:
neighbour_nodes = left.iloc[neighbours]
number_nodes = sum(neighbour_nodes[node_degree] - 1)
else:
number_nodes = len(neighbours)
length = lengths.loc[
right["node_start"].isin(neighbours)
& right["node_end"].isin(neighbours)
].sum()
if length > 0:
results_list.append(number_nodes / length)
else:
results_list.append(0)
self.series = pd.Series(results_list, index=left.index)
class Density:
"""
Calculate the gross density
.. math::
\\frac{\\sum \\text {values}}{\\sum \\text {areas}}
Adapted from :cite:`dibble2017`.
Parameters
----------
gdf : GeoDataFrame
GeoDataFrame containing objects to analyse
values : str, list, np.array, pd.Series
the name of the dataframe column, ``np.array``, or ``pd.Series`` where is
stored character value.
spatial_weights : libpysal.weight
spatial weights matrix
unique_id : str
name of the column with unique id used as ``spatial_weights`` index
areas : str, list, np.array, pd.Series (optional)
the name of the dataframe column, ``np.array``, or ``pd.Series`` where is
stored area value. If None,
gdf.geometry.area will be used.
verbose : bool (default True)
if True, shows progress bars in loops and indication of steps
Attributes
----------
series : Series
Series containing resulting values
gdf : GeoDataFrame
original GeoDataFrame
values : Series
Series containing used values
sw : libpysal.weights
spatial weights matrix
id : Series
Series containing used unique ID
areas : Series
Series containing used area values
Examples
--------
>>> tessellation_df['floor_area_dens'] = mm.Density(tessellation_df,
... 'floor_area',
... sw,
... 'uID').series
"""
def __init__(
self, gdf, values, spatial_weights, unique_id, areas=None, verbose=True
):
self.gdf = gdf
self.sw = spatial_weights
self.id = gdf[unique_id]
# define empty list for results
results_list = []
data = gdf.copy()
if values is not None:
if not isinstance(values, str):
data["mm_v"] = values
values = "mm_v"
self.values = data[values]
if areas is not None:
if not isinstance(areas, str):
data["mm_a"] = areas
areas = "mm_a"
else:
data["mm_a"] = data.geometry.area
areas = "mm_a"
self.areas = data[areas]
data = data.set_index(unique_id)
# iterating over rows one by one
for index in tqdm(data.index, total=data.shape[0], disable=not verbose):
if index in spatial_weights.neighbors.keys():
neighbours = [index]
neighbours += spatial_weights.neighbors[index]
subset = data.loc[neighbours]
values_list = subset[values]
areas_list = subset[areas]
results_list.append(np.sum(values_list) / np.sum(areas_list))
else:
results_list.append(np.nan)
self.series = pd.Series(results_list, index=gdf.index)