/
power_transforms.py
168 lines (122 loc) · 5.34 KB
/
power_transforms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Simple power transformation classes."""
# pylint: disable=unused-variable
from ..numerical_libs import sync_numerical_libs, xp
# TODO this could be better organized...
@sync_numerical_libs
def yeojohnson(y, lam):
"""Yeo-Johnson tranform, batched in the first dimension."""
y_in = y.astype(xp.float64)
lam1 = xp.broadcast_to(lam, (y_in.shape[0], 1)).astype(xp.float64)
ret = xp.empty(y.shape)
zero_mask = xp.around(xp.ravel(lam1), 4) == 0.0
two_mask = xp.around(xp.ravel(lam1), 4) == 2.0
pos_mask = y_in >= 0.0
zero_mask = xp.broadcast_to(zero_mask[:, None], pos_mask.shape)
two_mask = xp.broadcast_to(two_mask[:, None], pos_mask.shape)
lam1 = xp.broadcast_to(lam1, pos_mask.shape)
ret[pos_mask] = ((y_in[pos_mask] + 1.0) ** lam1[pos_mask] - 1.0) / lam1[pos_mask]
ret[pos_mask & zero_mask] = xp.log(y_in[pos_mask & zero_mask] + 1.0)
ret[~pos_mask] = ((1.0 - y_in[~pos_mask]) ** (2.0 - lam1[~pos_mask]) - 1.0) / (lam1[~pos_mask] - 2.0)
ret[(~pos_mask) & two_mask] = -xp.log(1.0 - y_in[(~pos_mask) & two_mask])
return ret, lam1[:, 0][..., None]
@sync_numerical_libs
def inv_yeojohnson(y, lam):
"""Inverse Yeo-Johnson tranform, batched in the first dimension."""
y_in = y.astype(xp.float64)
lam1 = xp.broadcast_to(lam, (y_in.shape[0], 1)).astype(xp.float64)
ret = xp.empty(y.shape)
zero_mask = xp.around(xp.ravel(lam1), 4) == 0.0
two_mask = xp.around(xp.ravel(lam1), 4) == 2.0
pos_mask = y_in >= 0.0
zero_mask = xp.broadcast_to(zero_mask[:, None], pos_mask.shape)
two_mask = xp.broadcast_to(two_mask[:, None], pos_mask.shape)
lam1 = xp.broadcast_to(lam1, pos_mask.shape)
ret[pos_mask] = (lam1[pos_mask] * y_in[pos_mask] + 1.0) ** (1.0 / lam1[pos_mask]) - 1.0
ret[pos_mask & zero_mask] = xp.exp(y_in[pos_mask & zero_mask]) - 1.0
ret[~pos_mask] = -(((lam1[~pos_mask] - 2.0) * y_in[~pos_mask] + 1.0) ** (1.0 / (2.0 - lam1[~pos_mask]))) + 1.0
ret[(~pos_mask) & two_mask] = -xp.exp(-y_in[(~pos_mask) & two_mask]) + 1.0
return ret
@sync_numerical_libs
def boxcox(y, lam, lam2=None):
"""Box-Cox tranform, batched in the first dimension."""
# TODO add axis param
# if axis is None:
# a = xp.ravel(a)
# axis = 0
axis = y.ndim - 1
y_in = y.astype(xp.float64)
lam1 = xp.broadcast_to(lam, (y_in.shape[0], 1)).astype(xp.float64)
if lam2 is None:
lam2 = 1.0 - xp.min(y_in, axis=axis, keepdims=True)
ret = xp.empty(y.shape)
zero_mask = xp.around(xp.ravel(lam1), 4) == 0.0
ret[zero_mask] = xp.log(y_in[zero_mask] + lam2[zero_mask])
ret[~zero_mask] = ((y_in[~zero_mask] + lam2[~zero_mask]) ** lam1[~zero_mask] - 1.0) / lam1[~zero_mask]
return ret, lam1, lam2
@sync_numerical_libs
def inv_boxcox(y, lam1, lam2):
"""Inverse Box-Cox tranform, batched in the first dimension."""
y_in = y.astype(xp.float64)
ret = xp.empty(y.shape)
zero_mask = xp.around(xp.ravel(lam1), 4) == 0.0
ret[zero_mask] = xp.exp(y_in[zero_mask]) - lam2[zero_mask]
ret[~zero_mask] = (lam1[~zero_mask] * y_in[~zero_mask] + 1.0) ** (1.0 / lam1[~zero_mask]) - lam2[~zero_mask]
return ret
def norm_cdf(x, mu, sigma):
"""Normal distribution CDF, batched."""
t = x - mu[:, None]
y = 0.5 * xp.special.erfc(-t / (sigma[:, None] * xp.sqrt(2.0))) # pylint: disable=no-member
y[y > 1.0] = 1.0
return y
@sync_numerical_libs
def fit_lam(y, yj=False, lam_range=(-2, 2, 0.1)):
"""Fit lambda of a power transform using grid search, taking the the most normally distributed result."""
# TODO currently this just minimizes the KS-stat,
# would might better to used shapiro-wilk or 'normaltest' but we'd need a batched version
y_in = xp.atleast_2d(y)
batch_size = y_in.shape[0]
best_ks = xp.full(batch_size, xp.inf)
best_ks_lam = xp.empty(batch_size)
for lam in xp.around(xp.arange(*lam_range), 6):
if yj:
yp, _ = yeojohnson(y, lam)
else:
yp, _, _ = boxcox(y_in, lam)
ys = xp.sort(yp, axis=1)
cdf = xp.cumsum(ys, axis=1) / xp.sum(yp, axis=1, keepdims=True)
ks = xp.max(xp.abs(cdf - norm_cdf(ys, xp.mean(yp, axis=1), xp.var(yp, axis=1))), axis=1)
ks_mask = ks < best_ks
best_ks[ks_mask] = ks[ks_mask]
best_ks_lam[ks_mask] = lam
return (best_ks, best_ks_lam)
class BoxCox:
"""Wrapper class for a Box-Cox transformer."""
def __init__(
self,
):
"""Init lambda storage."""
self.lam1 = None
self.lam2 = None
def fit(self, y):
"""Fit the batched 1d variables in y, store the lambdas for the inv transform."""
ks = fit_lam(y, yj=False)
ret, self.lam1, self.lam2 = boxcox(y, ks[1][:, None])
return ret
def inv(self, y):
"""Inverse tranform using the fitted lambda values."""
return inv_boxcox(y, self.lam1, self.lam2)
class YeoJohnson:
"""Wrapper class for a Yeo-Johnson transformer."""
def __init__(
self,
):
"""Init lambda storage."""
self.lam1 = None
def fit(self, y):
"""Fit the batched 1d variables in y, store the lambdas for the inv transform."""
ks = fit_lam(y, yj=True)
ret, self.lam1 = yeojohnson(y, ks[1][:, None])
return ret
def inv(self, y):
"""Inverse tranform using the fitted lambda values."""
return inv_yeojohnson(y, self.lam1)