/
pro_curve_util.py
215 lines (167 loc) · 7.67 KB
/
pro_curve_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
Utility functions that compute a PRO curve and its definite integral, given
pairs of anomaly and ground truth maps.
The PRO curve can also be integrated up to a constant integration limit.
"""
import numpy as np
import tqdm
from scipy.ndimage.measurements import label
from generic_util import trapezoid, generate_toy_dataset
class GroundTruthComponent:
"""
Stores sorted anomaly scores of a single ground truth component.
Used to efficiently compute the region overlap for many increasing
thresholds.
"""
def __init__(self, anomaly_scores):
"""
Initialize the module.
Args:
anomaly_scores: List of all anomaly scores within the ground truth
component as numpy array.
"""
# Keep a sorted list of all anomaly scores within the component.
self.anomaly_scores = anomaly_scores.copy()
self.anomaly_scores.sort()
# Pointer to the anomaly score where the current threshold divides
# the component into OK / NOK pixels.
self.index = 0
# The last evaluated threshold.
self.last_threshold = None
def compute_overlap(self, threshold):
"""
Compute the region overlap for a specific threshold.
Thresholds must be passed in increasing order.
Args:
threshold: Threshold to compute the region overlap.
Returns:
Region overlap for the specified threshold.
"""
if self.last_threshold is not None:
assert self.last_threshold <= threshold
# Increase the index until it points to an anomaly score that is just
# above the specified threshold.
while (self.index < len(self.anomaly_scores) and
self.anomaly_scores[self.index] <= threshold):
self.index += 1
# Compute the fraction of component pixels that are correctly segmented
# as anomalous.
return 1.0 - self.index / len(self.anomaly_scores)
def collect_anomaly_scores(anomaly_maps, ground_truth_maps):
"""
Extract anomaly scores for each ground truth connected component
as well as anomaly scores for each potential false positive pixel from
anomaly maps.
Args:
anomaly_maps: List of anomaly maps (2D numpy arrays) that contain a
real-valued anomaly score at each pixel.
ground_truth_maps: List of ground truth maps (2D numpy arrays) that
contain binary-valued ground truth labels for each
pixel.
0 indicates that a pixel is anomaly-free.
1 indicates that a pixel contains an anomaly.
Returns:
ground_truth_components: A list of all ground truth connected components
that appear in the dataset. For each component,
a sorted list of its anomaly scores is stored.
anomaly_scores_ok_pixels: A sorted list of anomaly scores of all
anomaly-free pixels of the dataset. This list
can be used to quickly select thresholds that
fix a certain false positive rate.
"""
# Make sure an anomaly map is present for each ground truth map.
assert len(anomaly_maps) == len(ground_truth_maps)
# Initialize ground truth components and scores of potential fp pixels.
ground_truth_components = []
anomaly_scores_ok_pixels = np.zeros(
len(ground_truth_maps) * ground_truth_maps[0].size)
# Structuring element for computing connected components.
structure = np.ones((3, 3), dtype=int)
# Collect anomaly scores within each ground truth region and for all
# potential fp pixels.
print("Collect anomaly scores ..")
ok_index = 0
for gt_map, prediction in tqdm.tqdm(zip(ground_truth_maps, anomaly_maps),
total=len(ground_truth_maps)):
# Compute the connected components in the ground truth map.
labeled, n_components = label(gt_map, structure)
# Store all potential fp scores.
num_ok_pixels = len(prediction[labeled == 0])
anomaly_scores_ok_pixels[ok_index:ok_index + num_ok_pixels] = \
prediction[labeled == 0].copy()
ok_index += num_ok_pixels
# Fetch anomaly scores within each GT component.
for k in range(n_components):
component_scores = prediction[labeled == (k + 1)]
ground_truth_components.append(
GroundTruthComponent(component_scores))
# Sort all potential false positive scores.
anomaly_scores_ok_pixels = np.resize(anomaly_scores_ok_pixels, ok_index)
print(f"Sort {len(anomaly_scores_ok_pixels)} anomaly scores ..")
anomaly_scores_ok_pixels.sort()
return ground_truth_components, anomaly_scores_ok_pixels
def compute_pro(anomaly_maps, ground_truth_maps, num_thresholds):
"""
Compute the PRO curve at equidistant interpolation points for a set of
anomaly maps with corresponding ground truth maps. The number of
interpolation points can be set manually.
Args:
anomaly_maps: List of anomaly maps (2D numpy arrays) that contain a
real-valued anomaly score at each pixel.
ground_truth_maps: List of ground truth maps (2D numpy arrays) that
contain binary-valued ground truth labels for each
pixel.
0 indicates that a pixel is anomaly-free.
1 indicates that a pixel contains an anomaly.
num_thresholds: Number of thresholds to compute the PRO curve.
Returns:
fprs: List of false positive rates.
pros: List of correspoding PRO values.
"""
# Fetch sorted anomaly scores.
ground_truth_components, anomaly_scores_ok_pixels = \
collect_anomaly_scores(anomaly_maps, ground_truth_maps)
# Select equidistant thresholds.
threshold_positions = np.linspace(0, len(anomaly_scores_ok_pixels) - 1,
num=num_thresholds, dtype=int)
print("Compute PRO curve..")
fprs = [1.0]
pros = [1.0]
for pos in tqdm.tqdm(threshold_positions):
threshold = anomaly_scores_ok_pixels[pos]
# Compute the false positive rate for this threshold.
fpr = 1.0 - (pos + 1) / len(anomaly_scores_ok_pixels)
# Compute the PRO value for this threshold.
pro = 0.0
for component in ground_truth_components:
pro += component.compute_overlap(threshold)
pro /= len(ground_truth_components)
fprs.append(fpr)
pros.append(pro)
# Return (FPR/PRO) pairs in increasing FPR order.
fprs = fprs[::-1]
pros = pros[::-1]
return fprs, pros
def main():
"""
Compute the area under the PRO curve for a toy dataset and an algorithm
that randomly assigns anomaly scores to each pixel. The integration
limit can be specified.
"""
# Fix a random seed for reproducibility.
np.random.seed(1338)
integration_limit = 0.3
num_thresholds = 5000
# Generate a toy dataset.
anomaly_maps, ground_truth_maps = generate_toy_dataset(
num_images=200, image_width=500, image_height=300, gt_size=10)
# Compute the PRO curve for this dataset.
all_fprs, all_pros = compute_pro(
anomaly_maps=anomaly_maps,
ground_truth_maps=ground_truth_maps,
num_thresholds=num_thresholds)
au_pro = trapezoid(all_fprs, all_pros, x_max=integration_limit)
au_pro /= integration_limit
print(f"AU-PRO (FPR limit: {integration_limit}: {au_pro}")
if __name__ == "__main__":
main()