/
trajectory_aggregator.py
243 lines (208 loc) · 9.23 KB
/
trajectory_aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# -*- coding: utf-8 -*-
from pandas import DataFrame
from geopandas import GeoDataFrame
from shapely.geometry import LineString
from shapely.ops import nearest_points
from movingpandas.point_clusterer import PointClusterer
from .geometry_utils import azimuth, angular_difference, measure_distance_geodesic, measure_distance_euclidean
class TrajectoryCollectionAggregator:
def __init__(self, traj_collection, max_distance, min_distance, min_stop_duration, min_angle=45):
"""
Aggregates trajectories by extracting significant points, clustering those points, and extracting
flows between clusters.
Based on the algorithm described by Andrienko, N., & Andrienko, G. (2011). Spatial generalization and
aggregation of massive movement data. IEEE Transactions on visualization and computer graphics, 17(2), 205-219.
Parameters
----------
traj_collection : TrajectoryCollection
TrajectoryCollection to be aggregated
max_distance : float
Maximum distance between significant points (distance is calculated in CRS units, except if the CRS
is geographic, e.g. EPSG:4326 WGS84, then distance is calculated in meters)
min_distance : float
Minimum distance between significant points
min_stop_duration : integer
Minimum duration required for stop detection (in seconds)
min_angle : float
Minimum angle for significant point extraction
"""
self.traj_collection = traj_collection
if self.traj_collection.trajectories:
self._crs = traj_collection.trajectories[0].crs
else:
self._crs = None
self.max_distance = max_distance
self.min_distance = min_distance
self.min_stop_duration = min_stop_duration
self.min_angle = min_angle
self.is_latlon = self.traj_collection.trajectories[0].is_latlon
#print('Extracting significant points ...')
self.significant_points = self._extract_significant_points()
#print(' No. significant points: {}'.format(len(self.significant_points)))
#print('Clustering significant points ...')
self.clusters = PointClusterer(self.significant_points, self.max_distance, self.is_latlon).get_clusters()
#print(' No. clusters: {}'.format(len(self.clusters)))
#print('Computing flows ...')
self.flows = self._compute_flows_between_clusters()
#print('Flows ready!')
def get_significant_points_gdf(self):
"""
Return the extracted significant points
Returns
-------
GeoDataFrame
Significant points
"""
if not self.significant_points:
self._extract_significant_points()
df = DataFrame(self.significant_points, columns=['geometry'])
return GeoDataFrame(df, crs=self._crs)
def get_clusters_gdf(self):
"""
Return the extracted cluster centroids
Returns
-------
GeoDataFrame
Cluster centroids, incl. the number of clustered significant points (n)
"""
if not self.clusters:
self._cluster_significant_points()
df = DataFrame([cluster.centroid for cluster in self.clusters], columns=['geometry'])
df['n'] = [len(cluster.points) for cluster in self.clusters]
return GeoDataFrame(df, crs=self._crs)
def get_flows_gdf(self):
"""
Return the extracted flows
Returns
-------
GeoDataFrame
Flow lines, incl. the number of trajectories summarized in the flow (weight)
"""
if not self.flows:
self._compute_flows_between_clusters()
return GeoDataFrame(self.flows, crs=self._crs)
def _extract_significant_points(self):
significant_points = []
for traj in self.traj_collection:
a = _PtsExtractor(traj, self.max_distance, self.min_distance, self.min_stop_duration, self.min_angle)
significant_points = significant_points + a.find_significant_points()
return significant_points
def _compute_flows_between_clusters(self):
sg = _SequenceGenerator(self.get_clusters_gdf(), self.traj_collection)
return sg.create_flow_lines()
class _PtsExtractor:
def __init__(self, traj, max_distance, min_distance, min_stop_duration, min_angle=45):
self.traj = traj
self.n = self.traj.df.geometry.count()
self.max_distance = max_distance
self.min_distance = min_distance
self.min_stop_duration = min_stop_duration
self.min_angle = min_angle
self.significant_points = [self.traj.get_start_location(), self.traj.get_end_location()]
def find_significant_points(self):
i = 0
j = 1
while j < self.n - 1:
if self.is_significant_distance(i, j):
self.add_point(j)
i = j
j = i + 1
continue
k, has_points = self.locate_points_beyond_min_distance(j)
if has_points:
#print(f"has points (i={i} | j={j} | k={k})")
if k > j + 1:
if self.is_significant_stop(j, k):
self.add_point(j)
i = j
j = k
continue
else: # compute the average spatial position to represent the similar points
m = j + (k - 1 - j) / 2
j = int(m)
if self.is_significant_turn(i, j, k):
self.add_point(j)
i = j
j = k
else:
j += 1
else:
return self.significant_points
return self.significant_points
def is_significant_turn(self, i, j, k):
turn_angle = self.compute_angle_between_vectors(i, j, k)
return turn_angle >= self.min_angle and turn_angle <= (360 - self.min_angle)
def is_significant_stop(self, j, k):
delta_t = self.traj.df.iloc[k - 1].name - self.traj.df.iloc[j].name
return delta_t >= self.min_stop_duration
def add_point(self, j):
pt = self.get_pt(j)
self.append_point(pt)
def append_point(self, pt):
if pt != self.traj.get_start_location():
self.significant_points.append(pt)
def compute_angle_between_vectors(self, i, j, k):
p_i = self.get_pt(i)
p_j = self.get_pt(j)
p_k = self.get_pt(k)
azimuth_ij = azimuth(p_i, p_j)
azimuth_jk = azimuth(p_j, p_k)
return angular_difference(azimuth_ij, azimuth_jk)
def locate_points_beyond_min_distance(self, j):
for k in range(j + 1, self.n):
if self.distance_greater_than(j, k, self.min_distance):
return k, True
return k, False
def distance_greater_than(self, loc1, loc2, dist):
pt1 = self.get_pt(loc1)
pt2 = self.get_pt(loc2)
if self.traj.is_latlon:
d = measure_distance_geodesic(pt1, pt2)
else:
d = measure_distance_euclidean(pt1, pt2)
return d >= dist
def get_pt(self, the_loc):
return self.traj.df.iloc[the_loc][self.traj.get_geom_column_name()]
def is_significant_distance(self, i, j):
if self.distance_greater_than(i, j, self.max_distance):
self.append_point(self.get_pt(i))
return True
return False
class _SequenceGenerator:
def __init__(self, cells, traj_collection):
self.cells = cells
self.id_to_centroid = {i: [f, [0, 0, 0, 0, 0]] for i, f in cells.iterrows()}
self.sequences = {}
for traj in traj_collection:
self.evaluate_trajectory(traj)
def evaluate_trajectory(self, trajectory):
this_sequence = []
prev_cell_id = None
for t, row in trajectory.df.iterrows():
nearest_id = self.get_nearest(row[trajectory.get_geom_column_name()])
nearest_cell = self.id_to_centroid[nearest_id][0]
nearest_cell_id = nearest_cell.name
if len(this_sequence) >= 1:
prev_cell_id = this_sequence[-1]
if nearest_cell_id != prev_cell_id:
if (prev_cell_id, nearest_cell_id) in self.sequences:
self.sequences[(prev_cell_id, nearest_cell_id)] += 1
else:
self.sequences[(prev_cell_id, nearest_cell_id)] = 1
if nearest_cell_id != prev_cell_id:
# we have changed to a new cell --> up the counter
h = t.hour
self.id_to_centroid[nearest_id][1][0] += 1
self.id_to_centroid[nearest_id][1][int(h / 6 + 1)] += 1
this_sequence.append(nearest_cell_id)
def create_flow_lines(self):
lines = []
for key, value in self.sequences.items():
p1 = self.id_to_centroid[key[0]][0].geometry
p2 = self.id_to_centroid[key[1]][0].geometry
lines.append({'geometry': LineString([p1, p2]), 'weight': value})
return lines
def get_nearest(self, pt):
pts = self.cells.geometry.unary_union
nearest = self.cells.geometry.geom_equals(nearest_points(pt, pts)[1])
return self.cells[nearest].iloc[0].name