-
Notifications
You must be signed in to change notification settings - Fork 3
/
clustering_base.py
174 lines (143 loc) · 6.85 KB
/
clustering_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
from abc import ABC, abstractmethod
from typing import Union, Set, Callable, Iterable, Optional
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from ..util.cache import PickleLoadSaveMixin
log = logging.getLogger(__name__)
# TODO at some point in the future: generalize to other input and deal with algorithms that allow prediction of labels
class EuclideanClusterer(PickleLoadSaveMixin, ABC):
"""
Base class for all clustering algorithms. Supports noise clusters and relabelling of identified clusters as noise
based on their size.
:param noise_label: label that is associated with the noise cluster or None
:param min_cluster_size: if not None, clusters below this size will be labeled as noise
:param max_cluster_size: if not None, clusters above this size will be labeled as noise
"""
def __init__(self, noise_label=-1, min_cluster_size: int = None, max_cluster_size: int = None):
self._datapoints: Optional[np.ndarray] = None
self._labels: Optional[np.ndarray] = None
self._clusterIdentifiers: Optional[Set[int]] = None
self._nonNoiseClusterIdentifiers: Optional[Set[int]] = None
if min_cluster_size is not None or max_cluster_size is not None:
if noise_label is None:
raise ValueError("the noise label has to be not None for non-trivial bounds on cluster sizes")
self.noiseLabel = noise_label
self.maxClusterSize = max_cluster_size if max_cluster_size is not None else np.inf
self.minClusterSize = min_cluster_size if min_cluster_size is not None else -np.inf
self._clusterDict = {}
self._numClusters: Optional[int] = None
class Cluster:
def __init__(self, datapoints: np.ndarray, identifier: Union[int, str]):
self.datapoints = datapoints
self.identifier = identifier
self._radius: Optional[float] = None
self._centroid: Optional[np.ndarray] = None
def __len__(self):
return len(self.datapoints)
def __str__(self):
return f"{self.__class__.__name__}_{self.identifier}"
def _compute_radius(self):
return np.max(distance_matrix([self.centroid()], self.datapoints))
def _compute_centroid(self):
return np.mean(self.datapoints, axis=0)
def centroid(self):
if self._centroid is None:
self._centroid = self._compute_centroid()
return self._centroid
def radius(self):
if self._radius is None:
self._radius = self._compute_radius()
return self._radius
def summary_dict(self):
"""
:return: dictionary containing coarse information about the cluster (e.g. num_members and centroid)
"""
return {
"identifier": self.identifier,
"centroid": self.centroid(),
"numMembers": len(self),
"radius": self.radius()
}
@classmethod
def __str__(cls):
return cls.__name__
def clusters(self, condition: Callable[[Cluster], bool] = None) -> Iterable[Cluster]:
"""
:param condition: if provided, only clusters fulfilling the condition will be included
:return: generator of clusters
"""
percentage_to_log = 0
for i, clusterId in enumerate(self._nonNoiseClusterIdentifiers):
# logging process through the loop
percentage_generated = int(100 * i / self.num_clusters)
if percentage_generated == percentage_to_log:
log.info(f"Processed {percentage_to_log}% of clusters")
percentage_to_log += 5
cluster = self.get_cluster(clusterId)
if condition is None or condition(cluster):
yield cluster
def noise_cluster(self):
if self.noiseLabel is None:
raise NotImplementedError(f"The algorithm {self} does not provide a noise cluster")
return self.get_cluster(self.noiseLabel)
def summary_df(self, condition: Callable[[Cluster], bool] = None):
"""
:param condition: if provided, only clusters fulfilling the condition will be included
:return: pandas DataFrame containing coarse information about the clusters
"""
summary_dicts = [cluster.summary_dict() for cluster in self.clusters(condition=condition)]
return pd.DataFrame(summary_dicts).set_index("identifier", drop=True)
def fit(self, data: np.ndarray) -> None:
log.info(f"Fitting {self} to {len(data)} coordinate datapoints.")
labels = self._compute_labels(data)
if len(labels) != len(data):
raise Exception(f"Bad Implementation: number of labels does not match number of datapoints")
# Relabel clusters that do not fulfill size bounds as noise
if self.minClusterSize != -np.inf or self.maxClusterSize != np.inf:
for clusterId, clusterSize in zip(*np.unique(labels, return_counts=True)):
if not self.minClusterSize <= clusterSize <= self.maxClusterSize:
labels[labels == clusterId] = self.noiseLabel
self._datapoints = data
self._clusterIdentifiers = set(labels)
self._labels = labels
if self.noiseLabel is not None:
self._nonNoiseClusterIdentifiers = self._clusterIdentifiers.difference({self.noiseLabel})
log.info(f"{self} found {self.num_clusters} clusters")
@property
def is_fitted(self):
return self._datapoints is not None
@property
def datapoints(self) -> np.ndarray:
assert self.is_fitted
return self._datapoints
@property
def labels(self) -> np.ndarray:
assert self.is_fitted
return self._labels
@property
def cluster_identifiers(self) -> Set[int]:
assert self.is_fitted
return self._clusterIdentifiers
# unfortunately, there seems to be no way to annotate the return type correctly
# https://github.com/python/mypy/issues/3993
def get_cluster(self, cluster_id: int) -> Cluster:
if cluster_id not in self.labels:
raise KeyError(f"no cluster for id {cluster_id}")
result = self._clusterDict.get(cluster_id)
if result is None:
result = self.Cluster(self.datapoints[self.labels == cluster_id], identifier=cluster_id)
self._clusterDict[cluster_id] = result
return result
@property
def num_clusters(self) -> int:
return len(self._nonNoiseClusterIdentifiers)
@abstractmethod
def _compute_labels(self, x: np.ndarray) -> np.ndarray:
"""
Fit the clustering model and return an array of integer cluster labels
:param x: the datapoints
:return: list of the same length as the input datapoints; it represents the mapping coordinate -> cluster_id
"""
pass