forked from UW-THINKlab/MAW_v2
-
Notifications
You must be signed in to change notification settings - Fork 5
/
incremental_clustering.py
412 lines (340 loc) · 15.8 KB
/
incremental_clustering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
"""
======================
Incremental Clustering
======================
Perform clustering on multiple locations of one user based on a spatial threshold
Each cluster of locations represents a potential stay. This method identifies groups of spatially close points
that are likely to represent the physical location where the user stayed for more than duration_threshold.
input:
gps stay information / cellular stay information
spatial threshold
duration constraint threshold (for detecting common stay)
output:
potential stays represented by stay locations
"""
import logging
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from mawpy.constants import (USER_ID, STAY_DUR, ORIG_LAT, STAY_LAT, STAY_LONG, STAY_UNC, ORIG_LONG, ORIG_UNC, STAY,
UNIX_START_T, IC_COLUMNS)
from mawpy.distance import distance
from mawpy.utilities import (
Cluster,
get_combined_stay,
get_stay_groups,
get_preprocessed_dataframe,
get_list_of_chunks_by_column,
execute_parallel,
validate_input_args
)
logger = logging.getLogger(__name__)
def _get_cluster_center(row: pd.Series, df_columns: list, mapping: dict, dur_constr: float) -> tuple[str, str, str]:
"""
Returns the latitude, longitude, and uncertainty of the cluster to which the given trace belongs.
Parameters
----------
row : pd.Series
A row from the DataFrame containing trace information.
df_columns : list
A list of columns in the DataFrame.
mapping : dict
A dictionary mapping location coordinates to their corresponding cluster center and radius.
dur_constr : float
The duration constraint for determining the cluster center.
Returns
-------
tuple[str, str, str]
The latitude, longitude, and uncertainty of the cluster center.
"""
if dur_constr:
lat_long = (row[STAY_LAT], row[STAY_LONG])
if STAY_UNC in df_columns:
unc = row[STAY_UNC]
else:
unc = -1
else:
lat_long = (row[ORIG_LAT], row[ORIG_LONG])
if ORIG_UNC in df_columns:
unc = row[ORIG_UNC]
else:
unc = -1
if lat_long in mapping:
cluster_lat, cluster_long, cluster_radius = mapping[lat_long]
return cluster_lat, cluster_long, max(unc, cluster_radius)
else:
return lat_long[0], lat_long[1], unc
def _k_means_cluster_lloyd(cluster_list: list[Cluster]) -> list[Cluster]:
"""
Performs K-Means clustering using Lloyd's algorithm on a list of clusters.
Parameters
----------
cluster_list : list[Cluster]
A list of Cluster objects to be clustered.
Returns
-------
list[Cluster]
A list of new Cluster objects after applying K-Means clustering.
"""
uniq_month_gps_list = []
for each_cluster in cluster_list:
uniq_month_gps_list.extend(
each_cluster.pList
) # add everything to this list, plist is some property associated with c - it has only unique location elements from loc4cluster
k_cluster = [each_cluster.pList for each_cluster in cluster_list]
k = len(k_cluster)
##project coordinates on to plane
##search "python lat long onto plane": https://pypi.org/project/stateplane/
##search "python project lat long to x y": https://gis.stackexchange.com/questions/212723/how-can-i-convert-lon-lat-coordinates-to-x-y
###The above methods not used
y_center = np.mean(
[point[0] for point in uniq_month_gps_list]
) # it may mean c.pList will be a list structure
x_center = np.mean([point[1] for point in uniq_month_gps_list])
distance_coord = np.empty((0, 2))
for point in uniq_month_gps_list:
x_distance = distance(y_center, x_center, y_center, point[1]) # distance in km for great arc circle
y_distance = distance(y_center, x_center, point[0], x_center)
if point[0] < y_center: # p is to south of y_center
y_distance = -y_distance
if point[1] < x_center: # p is to the west of x_center
x_distance = -x_distance
distance_coord = np.append(
distance_coord, np.array([[y_distance, x_distance]]), axis=0
) # adding coordinates
initial_centers = np.empty((0, 2))
i = 0
"""
Get initial cluster centers as the mean of points.
"""
for each_cluster in cluster_list:
num_point = len(each_cluster.pList)
points = distance_coord[i: (i + num_point)]
ctr = np.mean(points, axis=0, keepdims=True)
initial_centers = np.append(initial_centers, ctr, axis=0)
i = i + num_point
"""
Assign points to cluster labels after k means clustering.
"""
kmeans = KMeans(n_clusters=k, init=initial_centers).fit(distance_coord)
lab = kmeans.labels_ # cluster labels
membership = {clus: [] for clus in set(lab)}
for j in range(0, len(uniq_month_gps_list)):
membership[lab[j]].append(uniq_month_gps_list[j])
"""
Using cluster class to transform membership dictionary into a cluster object as defined previously.
All cluster objects are appended to L_new.
"""
cluster_list_new = []
for a_cluster in membership:
new_cluster = (
Cluster()
) # every label from k means - algorithm is assigned a class cluster
new_cluster.add_points(membership[a_cluster])
cluster_list_new.append(new_cluster)
return cluster_list_new
def _get_clusters(locations_for_clustering: list[tuple[float, float]], spat_constr: float) -> list[Cluster]:
"""
Clusters trace locations based on a spatial constraint.
Parameters
----------
locations_for_clustering : list[tuple[float, float]]
A list of tuples containing the latitude and longitude coordinates for clustering.
spat_constr : float
The spatial constraint for clustering.
Returns
-------
list[Cluster]
A list of Cluster objects created based on the spatial constraint.
"""
clusters_list = []
new_cluster = Cluster()
new_cluster.add_point(locations_for_clustering[0]) # add first coordinate to this cluster
clusters_list.append(new_cluster) # add cluster with one just point to clusters_list
current_cluster = new_cluster
# Note: Clusters do not take into considearion time of the day - unique clusters are there but no info on how many times was visited by the device at the the cluster location.
### Go from second loc. in locations_for_clustering and if it is below the spatial constraint add it to current cluster
for loc in locations_for_clustering[1:]: # start iterating from second coordinate in loc4cluster
if current_cluster.get_distance_from_center(loc) < spat_constr:
current_cluster.add_point(loc) # add if smaller than spatial constraint
### Check for other clusters existing in the list and add point abiding to the spatial constraint to this cluster
else: # if point is away from spatial constraint
current_cluster = None
for this_cluster in clusters_list:
if (
this_cluster.get_distance_from_center(loc) < spat_constr
): # check again the spatial constraint parameter
this_cluster.add_point(loc)
current_cluster = this_cluster # make this_cluster as the current_cluster
break # loop breaks if for the point the suitable cluster is found
### (Are we not checking duration here) If no cluster is found where the duration constraint is found, then create a new cluster and append it to L
if current_cluster is None: # still if Ccurrent is none
new_cluster = Cluster()
new_cluster.add_point(loc) # add the point
clusters_list.append(new_cluster) # add cluster on top of others
current_cluster = new_cluster
return clusters_list
"""
Note: Do not put the duration constraint parameter if you are running incremental clustering as the first
step since there would be no stay points if this is used as the first step.
Step1: Form clusters based on spatial constraint. If duration constraint is provided,
then work with only stay points else all.
Step2: Perform k means to apply the correction on Step1, since Step1 does the clustering based on
order of traces.
Step3: Once clusters are prepared in Step2, add traces to clusters which are within 0.2 km of cluster centre.
To Do: Figure out the data structure used to perform all actions.
"""
# if a duration constraint is provided, then get loc4cluster as latitude and longitude coordinates
def _get_locations_to_cluster_center_map(clusters_list: list[Cluster]) -> dict:
"""
Creates a mapping from each point in the cluster to the cluster center and cluster radius.
Parameters
----------
clusters_list : list[Cluster]
A list of Cluster objects.
Returns
-------
dict
A dictionary mapping each point in the clusters to its corresponding cluster center and radius.
"""
locations_to_cluster_center_map = {}
for this_cluster in clusters_list: # for each cluster in L
cluster_radius = int(1000 * this_cluster.get_cluster_radius()) # get radius of each cluster
cluster_center = [
np.mean([each_point[0] for each_point in this_cluster.pList]),
np.mean([each_point[1] for each_point in this_cluster.pList]),
] # calculate center
for each_point in this_cluster.pList: # for each coordinate in c.pList
locations_to_cluster_center_map[(each_point[0], each_point[1])] = (
cluster_center[0],
cluster_center[1],
cluster_radius,
) # store the center with cluster radius
return locations_to_cluster_center_map
def _filter_by_durations_constraint(df_by_user: pd.DataFrame, duration_constraint: float) -> pd.DataFrame:
"""
Filters a DataFrame based on a stay duration constraint.
This function calculates the duration of stays for each stay in the DataFrame,
and filters out stays that do not meet the specified duration constraint.
Parameters
----------
df_by_user : pd.DataFrame
The input DataFrame per user containing stay information.
duration_constraint : float
The minimum duration required to keep a stay.
Returns
-------
pd.DataFrame
The filtered DataFrame containing only the stays with durations greater than the duration constraint.
"""
# Calculate the minimum and maximum UNIX_START_T for each STAY group
stay_duration = df_by_user.groupby(STAY)[UNIX_START_T].agg(['min', 'max'])
# Calculate the duration for each group
stay_duration[STAY_DUR] = stay_duration['max'] - stay_duration['min']
# Identify the stays that meet the duration constraint
filtered_groups = stay_duration[stay_duration[STAY_DUR] > duration_constraint].index
# Filter the original DataFrame based on the identified stays
df_by_user = df_by_user[df_by_user[STAY].isin(filtered_groups)]
return df_by_user
def _run_for_user(df_by_user: pd.DataFrame, df_columns: list,
spat_constr: float, dur_constr: float | None = None) -> pd.DataFrame:
"""
Performs incremental clustering on a DataFrame containing traces for a single user.
Parameters
----------
df_by_user : pd.DataFrame
A DataFrame containing trace data for a single user.
df_columns : list
A list of columns in the DataFrame.
spat_constr : float
The spatial constraint for clustering.
dur_constr : float | None, optional
The duration constraint for clustering, by default None.
Returns
-------
pd.DataFrame
The DataFrame with updated clustering information for the user.
"""
# df_by_user = df_by_user.sort_values(by=[UNIX_START_T], ascending=True)
# cluster original locations (orig_lat and orgi_long) to obtain stays.
orig_lat_long_df = df_by_user[[ORIG_LAT, ORIG_LONG]]
# Convert to list of tuples
locations_for_clustering = list(set(zip(orig_lat_long_df[ORIG_LAT], orig_lat_long_df[ORIG_LONG])))
if len(locations_for_clustering) == 0:
return df_by_user
clusters_list = _get_clusters(locations_for_clustering, spat_constr)
# make Ccurrent as Cnew
### apply k means clustering with k same as length of L
clusters_list = _k_means_cluster_lloyd(
clusters_list
) # correct an order issue related to incremental clustering # clusters get appended to list L
### create a dictionary which takes each point and keeps information of its cluster center and radius
## centers of each locations that are clustered
locations_to_cluster_center_map = _get_locations_to_cluster_center_map(clusters_list)
### Update trace itself using clustre center and max(radius, uncertainty)
df_by_user[[STAY_LAT, STAY_LONG, STAY_UNC]] = df_by_user.apply(
lambda row: _get_cluster_center(row, df_columns, locations_to_cluster_center_map, dur_constr), axis=1, result_type='expand')
df_by_user[STAY] = get_stay_groups(df_by_user)
df_by_user = get_combined_stay(df_by_user)
if dur_constr is not None:
df_by_user = _filter_by_durations_constraint(df_by_user, dur_constr)
return df_by_user
def _run(df_by_user_chunk: pd.DataFrame, args: tuple) -> pd.DataFrame:
"""
Helper function to run the clustering process on a chunk of user data.
Parameters
----------
df_by_user_chunk : pd.DataFrame
Input DataFrame per userid chunk.
args : tuple
A tuple containing the spatial and duration constraints.
Returns
-------
pd.DataFrame
The DataFrame with clustering information for the chunk of user data.
"""
spatial_constraint, dur_constraint = args
df_columns = df_by_user_chunk.columns
if dur_constraint <= 0 or dur_constraint is None:
df_by_user_chunk = (df_by_user_chunk.groupby(USER_ID)
.apply(lambda x: _run_for_user(x, df_columns, spatial_constraint)))
else:
df_by_user_chunk = (df_by_user_chunk.groupby(USER_ID)
.apply(lambda x: _run_for_user(x, df_columns, spatial_constraint, dur_constraint)))
return df_by_user_chunk
def incremental_clustering(output_file: str, spatial_constraint: float,
dur_constraint: float,
input_df: pd.DataFrame | None = None, input_file: str = None) -> pd.DataFrame | None:
"""
Main function to perform incremental clustering on location data.
Parameters
----------
output_file : str
The file path to save the output DataFrame.
spatial_constraint : float
The spatial constraint for clustering.
dur_constraint : float
The duration constraint for clustering.
input_df : pd.DataFrame | None, optional
The input DataFrame, by default None.
input_file : str, optional
The file path to the input data, by default None.
Returns
-------
pd.DataFrame | None
The output DataFrame after clustering, or None if input data is not provided.
"""
if input_df is None and input_file is None:
logger.error("At least one of input file path or input dataframe is required")
return None
if input_df is None:
input_df = get_preprocessed_dataframe(input_file)
user_id_chunks = get_list_of_chunks_by_column(input_df, USER_ID)
validate_input_args(duration_constraint=dur_constraint, spatial_constraint=spatial_constraint)
args = (spatial_constraint, dur_constraint)
output_df = execute_parallel(user_id_chunks, input_df, _run, args)
output_columns = list(set(IC_COLUMNS) & set(output_df.columns))
output_df = output_df[output_columns]
output_df.dropna(how="all")
output_df.to_csv(output_file, columns=sorted(output_df.columns), index=False)
return output_df