-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
_qn.pyx
135 lines (128 loc) · 4.42 KB
/
_qn.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!python
#cython: wraparound=False, boundscheck=False, cdivision=True
import numpy as np
def _high_weighted_median(double[::1] a, int[::1] weights):
"""
Computes a weighted high median of a. This is defined as the
smallest a[j] such that the sum over all a[i]<=a[j] is strictly
greater than half the total sum of the weights
"""
cdef:
double[::1] a_cp = np.copy(a)
int[::1] weights_cp = np.copy(weights)
int n = a_cp.shape[0]
double[::1] sorted_a = np.zeros((n,), dtype=np.double)
double[::1] a_cand = np.zeros((n,), dtype=np.double)
int[::1] weights_cand = np.zeros((n,), dtype=np.intc)
int kcand = 0
int wleft, wright, wmid, wtot, wrest = 0
double trial = 0
wtot = np.sum(weights_cp)
while True:
wleft = 0
wmid = 0
wright = 0
for i in range(n):
sorted_a[i] = a_cp[i]
sorted_a = np.partition(sorted_a, kth=n//2)
trial = sorted_a[n//2]
for i in range(n):
if a_cp[i] < trial:
wleft = wleft + weights_cp[i]
elif a_cp[i] > trial:
wright = wright + weights_cp[i]
else:
wmid = wmid + weights_cp[i]
kcand = 0
if 2 * (wrest + wleft) > wtot:
for i in range(n):
if a_cp[i] < trial:
a_cand[kcand] = a_cp[i]
weights_cand[kcand] = weights_cp[i]
kcand = kcand + 1
elif 2 * (wrest + wleft + wmid) <= wtot:
for i in range(n):
if a_cp[i] > trial:
a_cand[kcand] = a_cp[i]
weights_cand[kcand] = weights_cp[i]
kcand = kcand + 1
wrest = wrest + wleft + wmid
else:
break
n = kcand
for i in range(n):
a_cp[i] = a_cand[i]
weights_cp[i] = weights_cand[i]
return trial
def _qn(double[:] a, double c):
"""
Computes the Qn robust estimator of scale, a more efficient alternative
to the MAD. The implementation follows the algorithm described in Croux
and Rousseeuw (1992).
Parameters
----------
a : array_like
Input array.
c : float, optional
The normalization constant, used to get consistent estimates of the
standard deviation at the normal distribution. Defined as
1/(np.sqrt(2) * scipy.stats.norm.ppf(5/8)), which is 2.219144.
Returns
-------
The Qn robust estimator of scale
"""
cdef:
int n = a.shape[0]
int h = n/2 + 1
int k = h * (h - 1) / 2
int n_left = n * (n + 1) / 2
int n_right = n * n
int k_new = k + n_left
Py_ssize_t i, j, jh, l = 0
int sump, sumq = 0
double trial, output = 0
double[::1] a_sorted = np.sort(a)
int[::1] left = np.array([n - i + 1 for i in range(0, n)], dtype=np.intc)
int[::1] right = np.array([n if i <= h else n - (i - h) for i in range(0, n)], dtype=np.intc)
int[::1] weights = np.zeros((n,), dtype=np.intc)
double[::1] work = np.zeros((n,), dtype=np.double)
int[::1] p = np.zeros((n,), dtype=np.intc)
int[::1] q = np.zeros((n,), dtype=np.intc)
while n_right - n_left > n:
j = 0
for i in range(1, n):
if left[i] <= right[i]:
weights[j] = right[i] - left[i] + 1
jh = left[i] + weights[j] // 2
work[j] = a_sorted[i] - a_sorted[n - jh]
j = j + 1
trial = _high_weighted_median(work[:j], weights[:j])
j = 0
for i in range(n - 1, -1, -1):
while j < n and (a_sorted[i] - a_sorted[n - j - 1]) < trial:
j = j + 1
p[i] = j
j = n + 1
for i in range(n):
while (a_sorted[i] - a_sorted[n - j + 1]) > trial:
j = j - 1
q[i] = j
sump = np.sum(p)
sumq = np.sum(q) - n
if k_new <= sump:
right = np.copy(p)
n_right = sump
elif k_new > sumq:
left = np.copy(q)
n_left = sumq
else:
output = c * trial
return output
j = 0
for i in range(1, n):
for l in range(left[i], right[i] + 1):
work[j] = a_sorted[i] - a_sorted[n - l]
j = j + 1
k_new = k_new - (n_left + 1)
output = c * np.sort(work[:j])[k_new]
return output