-
Notifications
You must be signed in to change notification settings - Fork 44
/
stat_counter.py
600 lines (498 loc) · 19.7 KB
/
stat_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is based on the PySpark version which in turn
# was ported from spark/util/StatCounter.scala
from __future__ import division
import copy
import math
import numbers
from collections import namedtuple
from pysparkling.sql.functions import parse
from pysparkling.sql.types import row_from_keyed_values
try:
from numpy import maximum, minimum, sqrt
except ImportError:
maximum = max
minimum = min
sqrt = math.sqrt
class StatCounter(object):
def __init__(self, values=None):
self.n = 0 # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
self.maxValue = float("-inf")
self.minValue = float("inf")
if values:
for v in values:
self.merge(v)
# Add a value into this StatCounter, updating the internal statistics.
def merge(self, value):
delta = value - self.mu
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
self.maxValue = maximum(self.maxValue, value)
self.minValue = minimum(self.minValue, value)
return self
# Merge another StatCounter into this one, adding up the
# internal statistics.
def mergeStats(self, other):
if other is self: # reference equality holds
# Avoid overwriting fields in a weird order
return self.mergeStats(other.copy())
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
self.n = other.n
self.maxValue = other.maxValue
self.minValue = other.minValue
elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
self.mu = self.mu + (delta * other.n) / (self.n + other.n)
elif self.n * 10 < other.n:
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = ((self.mu * self.n + other.mu * other.n) /
(self.n + other.n))
self.maxValue = maximum(self.maxValue, other.maxValue)
self.minValue = minimum(self.minValue, other.minValue)
self.m2 += other.m2 + ((delta * delta * self.n * other.n) /
(self.n + other.n))
self.n += other.n
return self
# Clone this StatCounter
def copy(self):
return copy.deepcopy(self)
def count(self):
return int(self.n)
def mean(self):
return self.mu
def sum(self):
return self.n * self.mu
def min(self):
return self.minValue
def max(self):
return self.maxValue
# Return the variance of the values.
def variance(self):
if self.n == 0:
return float('nan')
return self.m2 / self.n
#
# Return the sample variance, which corrects for bias in estimating
# the variance by dividing by N-1 instead of N.
#
def sampleVariance(self):
if self.n <= 1:
return float('nan')
return self.m2 / (self.n - 1)
# Return the standard deviation of the values.
def stdev(self):
return sqrt(self.variance())
#
# Return the sample standard deviation of the values, which corrects for
# bias in estimating the variance by dividing by N-1 instead of N.
#
def sampleStdev(self):
return sqrt(self.sampleVariance())
def __repr__(self):
return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" %
(self.count(), self.mean(), self.stdev(), self.max(), self.min()))
PercentileStats = namedtuple("PercentileStats", ["value", "g", "delta"])
class ColumnStatHelper(object):
"""
This class is used to compute statistics on a column
It computes basic statistics such as count, min, max, average, sample variance and
sample standard deviation
It also approximate percentiles with an implementation based, like Spark, on the algorithm
proposed in the paper "Space-efficient Online Computation of Quantile Summaries"
by Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670)
The result of this algorithm has the following deterministic bound:
If the DataFrame has N elements and if we request the quantile at
probability `p` up to error `err`, then the algorithm will return
a sample `x` from the DataFrame so that the *exact* rank of `x` is
close to (p * N).
More precisely:
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
Its implementation for simple metrics differs from pysparkling.stat_counter.StatCounter
in order to match Spark behaviour when computing stats on a dataset, among the discrepancies:
- handle min and max for strings
- return min and max as int when computed on int
- name "stddev" the *sample* standard deviation metric
"""
def __init__(self, column, percentiles_relative_error=1 / 10000):
self.column = column
self.count = 0
self.sum_of_values = 0
self.m2 = 0
self.m3 = 0
self.m4 = 0
self.min_value = None
self.max_value = None
# Percentiles-related fields
self.sampled = [] # List of PercentileStats
self.head_sampled = [] # List acting as a buffer of the last added values
self.head_max_size = 50000 # Buffer size after which buffer is added to sampled
self.compression_threshold = 10000 # When to compress sampled
self.percentiles_relative_error = percentiles_relative_error
self.compressed = True
def merge(self, row, schema):
value = self.column.eval(row, schema)
if value is not None:
self.update_counters(value)
if isinstance(value, numbers.Number):
self.update_sample(value)
return self
def update_counters(self, value):
if self.count != 0:
self.min_value = min(self.min_value, value)
self.max_value = max(self.max_value, value)
else:
self.min_value = value
self.max_value = value
try:
self.update_moments(value)
self.sum_of_values += value
except TypeError:
self.sum_of_values = None
self.m2 = None
self.m3 = None
self.m4 = None
self.count += 1
def update_moments(self, value):
delta = value - self.mean if self.count > 0 else 0
deltaN = delta / (self.count + 1)
self.m2 = self.m2 + delta * (delta - deltaN)
delta2 = delta * delta
deltaN2 = deltaN * deltaN
self.m3 = self.m3 - 3 * deltaN * self.m2 + delta * (delta2 - deltaN2)
self.m4 = (self.m4 -
4 * deltaN * self.m3 -
6 * deltaN2 * self.m2 +
delta * (delta * delta2 - deltaN * deltaN2))
def update_sample(self, value):
self.head_sampled.append(value)
self.compressed = False
if len(self.head_sampled) >= self.head_max_size:
self.add_head_to_sample()
if len(self.sampled) >= self.compression_threshold:
self.compress()
def add_head_to_sample(self):
if not self.head_sampled:
return
count_without_head = self.count - len(self.head_sampled)
sorted_head = sorted(self.head_sampled)
new_samples = []
sample_idx = 0
for ops_idx, current_sample in enumerate(sorted_head):
for s in self.sampled[sample_idx:]:
if s.value <= current_sample:
new_samples.append(s)
sample_idx += 1
else:
break
count_without_head += 1
if not new_samples or (
sample_idx == len(self.sampled) and ops_idx == len(sorted_head) - 1
):
delta = 0
else:
delta = math.floor(2 * self.percentiles_relative_error * count_without_head)
new_samples.append(PercentileStats(value=current_sample, g=1, delta=delta))
new_samples += self.sampled[sample_idx:]
self.sampled = new_samples
self.head_sampled = []
self.compressed = False
def compress(self):
merge_threshold = self.merge_threshold()
reverse_compressed_sample = []
head = self.sampled[-1]
for sample1 in self.sampled[-2:0:-1]:
if sample1.g + head.g + head.delta < merge_threshold:
head = PercentileStats(value=head.value, g=head.g + sample1.g, delta=head.delta)
else:
reverse_compressed_sample.append(head)
head = sample1
reverse_compressed_sample.append(head)
current_head = self.sampled[0]
if current_head.value <= head.value and len(self.sampled) > 1:
reverse_compressed_sample.append(current_head)
self.sampled = reverse_compressed_sample[::-1]
self.compressed = True
def merge_threshold(self):
return 2 * self.percentiles_relative_error * self.count
def finalize(self):
if self.head_sampled:
self.add_head_to_sample()
if not self.compressed:
self.compress()
def mergeStats(self, other):
"""
:type other: ColumnStatHelper
"""
self.finalize()
other.finalize()
self.sampled = sorted(self.sampled + other.sampled)
self.compress()
if self.count == 0:
self.max_value = other.max_value
self.min_value = other.min_value
elif other.count != 0:
self.max_value = max(self.max_value, other.max_value)
self.min_value = min(self.min_value, other.min_value)
try:
self.merge_moments(other)
self.sum_of_values += other.sum_of_values
except TypeError:
self.sum_of_values = None
self.m2 = None
self.m3 = None
self.m4 = None
self.count += other.count
return self
def merge_moments(self, other):
n1 = self.count
n2 = other.count
new_count = n1 + n2
delta = other.mean - self.mean
deltaN = delta / new_count if new_count != 0 else 0
new_m2 = self.m2 + other.m2 + delta * deltaN * n1 * n2
new_m3 = (self.m3 + other.m3 +
deltaN * deltaN * delta * n1 * n2 * (n1 - n2) +
3 * deltaN * (n1 * other.m2 - n2 * self.m2))
self.m4 = (self.m4 + other.m4 +
deltaN * deltaN * deltaN * delta * n1 * n2 * (n1 * n1 - n1 * n2 + n2 * n2) +
6 * deltaN * deltaN * (n1 * n1 * other.m2 + n2 * n2 * self.m2) +
4 * deltaN * (n1 * other.m3 - n2 * self.m3))
self.m2 = new_m2
self.m3 = new_m3
def get_quantile(self, quantile):
self.finalize()
if not 0 <= quantile <= 1:
raise ValueError("quantile must be between 0 and 1")
if not self.sampled:
return None
if quantile <= self.percentiles_relative_error:
return self.sampled[0].value
if quantile >= 1 - self.percentiles_relative_error:
return self.sampled[-1].value
rank = math.ceil(quantile * self.count)
target_error = self.percentiles_relative_error * self.count
min_rank = 0
for cur_sample in self.sampled:
min_rank += cur_sample.g
max_rank = min_rank + cur_sample.delta
if max_rank - target_error <= rank <= min_rank + target_error:
return cur_sample.value
return self.sampled[-1].value
@property
def mean(self):
if self.count == 0 or self.sum_of_values is None:
return None
return self.sum_of_values / self.count
@property
def variance_pop(self):
if self.count == 0 or self.sum_of_values is None:
return None
return self.m2 / self.count
@property
def variance_samp(self):
if self.count <= 1 or self.sum_of_values is None:
return None
return self.m2 / (self.count - 1)
@property
def variance(self):
if self.count == 0:
return None
return self.variance_samp
@property
def stddev_pop(self):
if self.count == 0 or self.sum_of_values is None:
return None
return math.sqrt(self.variance_pop)
@property
def stddev_samp(self):
if self.count <= 1 or self.sum_of_values is None:
return None
return math.sqrt(self.variance_samp)
@property
def stddev(self):
if self.count == 0:
return None
return self.stddev_samp
@property
def min(self):
if self.count == 0:
return None
return self.min_value
@property
def max(self):
if self.count == 0:
return None
return self.max_value
@property
def sum(self):
if self.count == 0:
return None
return self.sum_of_values
@property
def skewness(self):
if self.count == 0:
return None
if self.m2 == 0:
return float("nan")
return math.sqrt(self.count) * self.m3 / math.sqrt(self.m2 * self.m2 * self.m2)
@property
def kurtosis(self):
if self.count == 0:
return None
if self.m2 == 0:
return float("nan")
return self.count * self.m4 / (self.m2 * self.m2) - 3
class RowStatHelper(object):
"""
Class use to maintain one ColumnStatHelper for each Column found when aggregating a list of Rows
"""
def __init__(self, exprs, percentiles_relative_error=1 / 10000):
self.percentiles_relative_error = percentiles_relative_error
self.column_stat_helpers = {}
self.cols = [parse(e) for e in exprs] if exprs else [parse("*")]
# As python < 3.6 does not guarantee dict ordering
# we need to keep track of in which order the columns were
self.col_names = []
def merge(self, row, schema):
for col in self.cols:
for field in col.output_fields(schema):
col_name = field.name
if col_name not in self.column_stat_helpers:
self.column_stat_helpers[col_name] = ColumnStatHelper(
parse(col_name),
self.percentiles_relative_error
)
self.col_names.append(col_name)
self.column_stat_helpers[col_name].merge(row, schema)
return self
def mergeStats(self, other):
"""
:type other: RowStatHelper
"""
for col_name in other.col_names:
counter = other.column_stat_helpers[col_name]
if col_name in self.column_stat_helpers:
self.column_stat_helpers[col_name] = (
self.column_stat_helpers[col_name].mergeStats(counter)
)
else:
self.column_stat_helpers[col_name] = counter
self.col_names.append(col_name)
return self
def get_as_rows(self, stats=("count", "mean", "stddev", "min", "max")):
"""
Provide a list of Row with the same format as the one in the
Dataset returned by Dataset.stats()
"""
return [
row_from_keyed_values(
[
("summary", stat)
] + [
(col_name, self.get_stat(self.column_stat_helpers[col_name], stat))
for col_name in self.col_names
]
)
for stat in stats
]
@staticmethod
def get_stat(stats_counter, stat):
if stat in ("count", "mean", "stddev", "min", "max"):
value = getattr(stats_counter, stat)
elif stat.endswith("%"):
try:
percentile = float(stat[:-1]) / 100
except ValueError:
raise ValueError("Unable to parse {0} as a percentile".format(stat))
value = stats_counter.get_quantile(percentile)
else:
raise ValueError("{0} is not a recognised statistic".format(stat))
return RowStatHelper.format_stat(value)
def get_col_quantile(self, col, quantile):
if str(col) not in self.column_stat_helpers:
raise Exception("Unable to get quantile for {0}".format(quantile))
quantile = self.column_stat_helpers[str(col)].get_quantile(quantile)
return float(quantile) if quantile is not None else None
@staticmethod
def format_stat(stat):
return str(stat) if stat is not None else None
class CovarianceCounter(object):
def __init__(self, method):
if method != "pearson":
raise ValueError(
"Currently only the calculation of the Pearson Correlation "
"coefficient is supported."
)
self.xAvg = 0.0 # the mean of all examples seen so far in col1
self.yAvg = 0.0 # the mean of all examples seen so far in col2
self.Ck = 0.0 # the co-moment after k examples
self.MkX = 0.0 # sum of squares of differences from the (current) mean for col1
self.MkY = 0.0 # sum of squares of differences from the (current) mean for col2
self.count = 0 # count of observed examples
def add(self, x, y):
deltaX = x - self.xAvg
deltaY = y - self.yAvg
self.count += 1
self.xAvg += deltaX / self.count
self.yAvg += deltaY / self.count
self.Ck += deltaX * (y - self.yAvg)
self.MkX += deltaX * (x - self.xAvg)
self.MkY += deltaY * (y - self.yAvg)
return self
def merge(self, other):
"""
Merge counters from other partitions. Formula can be found at:
http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
"""
if other.count > 0:
totalCount = self.count + other.count
deltaX = self.xAvg - other.xAvg
deltaY = self.yAvg - other.yAvg
self.Ck += other.Ck + deltaX * deltaY * self.count / totalCount * other.count
self.xAvg = (self.xAvg * self.count + other.xAvg * other.count) / totalCount
self.yAvg = (self.yAvg * self.count + other.yAvg * other.count) / totalCount
self.MkX += other.MkX + deltaX * deltaX * self.count / totalCount * other.count
self.MkY += other.MkY + deltaY * deltaY * self.count / totalCount * other.count
self.count = totalCount
return self
@property
def covar_samp(self):
"""
Return the sample covariance for the observed examples
"""
if self.count <= 1:
return None
return self.Ck / (self.count - 1)
@property
def covar_pop(self):
"""
Return the sample covariance for the observed examples
"""
if self.count == 0:
return None
return self.Ck / self.count
@property
def pearson_correlation(self):
return self.Ck / math.sqrt(self.MkX * self.MkY)