forked from koaning/scikit-lego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gmm_regressor.py
222 lines (183 loc) · 9.22 KB
/
gmm_regressor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin, MultiOutputMixin
from sklearn.mixture import GaussianMixture
from sklearn.mixture._gaussian_mixture import _compute_precision_cholesky
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted, check_array, FLOAT_DTYPES
class GMMRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator):
def __init__(
self,
n_components=1,
covariance_type="full",
tol=1e-3,
reg_covar=1e-6,
max_iter=100,
n_init=1,
init_params="kmeans",
weights_init=None,
means_init=None,
precisions_init=None,
random_state=None,
warm_start=False,
verbose=0,
verbose_interval=10,
):
"""
The GMMRegressor trains a Gaussian Mixture Model on a dataset containing both X and y columns.
Predictions are evaluated conditioning the fitted Multivariate Gaussian Mixture on the known
X variables. All parameters of the model are an exact copy of the parameters in scikit-learn.
"""
self.n_components = n_components
self.covariance_type = covariance_type
self.tol = tol
self.reg_covar = reg_covar
self.max_iter = max_iter
self.n_init = n_init
self.init_params = init_params
self.weights_init = weights_init
self.means_init = means_init
self.precisions_init = precisions_init
self.random_state = random_state
self.warm_start = warm_start
self.verbose = verbose
self.verbose_interval = verbose_interval
def fit(self, X: np.array, y: np.array) -> "GMMRegressor":
"""
Fit the model using X, y as training data.
:param X: array-like, shape=(n_columns, n_samples, ) training data.
:param y: array-like, shape=(n_samples, ) training data.
:return: Returns an instance of self.
"""
X, y = check_X_y(X, y, estimator=self, dtype=FLOAT_DTYPES, multi_output=True)
if y.ndim == 1:
y = np.expand_dims(y, 1)
self.joint_gmm_ = GaussianMixture(
n_components=self.n_components,
covariance_type=self.covariance_type,
tol=self.tol,
reg_covar=self.reg_covar,
max_iter=self.max_iter,
n_init=self.n_init,
init_params=self.init_params,
weights_init=self.weights_init,
means_init=self.means_init,
precisions_init=self.precisions_init,
random_state=self.random_state,
warm_start=self.warm_start,
verbose=self.verbose,
verbose_interval=self.verbose_interval,
)
id_X = slice(0, X.shape[1])
id_y = slice(X.shape[1], None)
self.joint_gmm_.fit(np.hstack((X, y)))
covariances = _get_full_matrix(self.joint_gmm_.covariances_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
precisions_cholesky = _get_full_matrix(self.joint_gmm_.precisions_cholesky_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
covYX = covariances[:, id_y, id_X]
precXX = np.einsum("klm,knm->kln", precisions_cholesky[:, id_X, id_X], precisions_cholesky[:, id_X, id_X])
self.coef_ = np.einsum("klm,knm->kln", covYX, precXX)
self.intercept_ = self.joint_gmm_.means_[:, id_y] - np.einsum(
"klm,km->kl", self.coef_, self.joint_gmm_.means_[:, id_X]
)
return self
def predict(self, X):
"""
Predict posterior mean.
:param X: array-like, shape=(n_columns, n_samples, ) training data.
:return: Returns posterior mean, array-like, shape=(n_samples, n_columns_y).
"""
weights, posterior_means = self.condition(X, covariances=False)
return (posterior_means * weights[:, np.newaxis]).sum(axis=0).T
def predict_proba(self, X):
"""
Predict posterior probability of each component given the data.
:param X: array-like, shape=(n_columns, n_samples, ) training data.
:return: Returns the probability each Gaussian (state) in the model given each sample.
"""
check_is_fitted(self, ["joint_gmm_", "coef_", "intercept_"])
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
id_X = slice(0, X.shape[1])
covariances = _get_full_matrix(self.joint_gmm_.covariances_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
precisions = _get_full_matrix(self.joint_gmm_.precisions_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
precisions_cholesky = _get_full_matrix(self.joint_gmm_.precisions_cholesky_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
# evaluate weights based on N(X|mean_x,sigma_x) for each component
marginal_x_gmm = GaussianMixture(n_components=self.n_components)
marginal_x_gmm.weights_ = self.joint_gmm_.weights_
marginal_x_gmm.means_ = self.joint_gmm_.means_[:, id_X]
marginal_x_gmm.covariances_ = covariances[:, id_X, id_X]
marginal_x_gmm.precisions_ = precisions[:, id_X, id_X]
marginal_x_gmm.precisions_cholesky_ = precisions_cholesky[:, id_X, id_X]
return marginal_x_gmm.predict_proba(X)
def condition(self, X, covariances=False):
"""
Condition joint distribution on X.
:param X: array-like, shape=(n_samples, n_columns) training data.
:param covariances: bool, return posterior covariances? (default=False)
:return: Returns posterior_weights, posterior_means, posterior_covariances
"""
check_is_fitted(self, ["joint_gmm_", "coef_", "intercept_"])
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
id_X = slice(0, X.shape[1])
id_y = slice(X.shape[1], None)
posterior_weights = self.predict_proba(X).T
# posterior_means = mean_y + sigma_xx^-1 . sigma_xy . (x - mean_x)
posterior_means = self.joint_gmm_.means_[:, id_y][:, :, np.newaxis] + np.einsum(
"ijk,lik->ijl", self.coef_, (X[:, np.newaxis] - self.joint_gmm_.means_[:, id_X]),
)
if not covariances:
return posterior_weights, posterior_means
else:
full_covariances = _get_full_matrix(self.joint_gmm_.covariances_, self.covariance_type, self.n_components, self.joint_gmm_.n_features_in_)
# posterior_covariances = sigma_yy - sigma_xx^-1 . sigma_xy .sigma_yx.T
posterior_covariances = full_covariances[:, id_y, id_y] - np.einsum(
"klm,kmn->kln", self.coef_, full_covariances[:, id_X, id_y]
)
return posterior_weights, posterior_means, posterior_covariances
def sample(self, X, n_samples=1):
"""
Sample conditional/posterior distribution given X.
:param X: array-like, shape=(n_samples, n_columns) training data.
:param covariances: bool, return posterior covariances? (default=False)
:return: Returns samples, component labels
"""
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
posterior_weights, posterior_means, posterior_covariances = self.condition(X, covariances=True)
posterior_precisions_cholesky = _compute_precision_cholesky(posterior_covariances, 'full')
posterior_precisions = np.einsum("klm,knm->kln", posterior_precisions_cholesky, posterior_precisions_cholesky)
y = np.zeros((X.shape[0], n_samples, posterior_means.shape[1]))
c = np.zeros((X.shape[0], n_samples))
for ix, _ in enumerate(X):
posterior_gmm = GaussianMixture(n_components=self.n_components)
posterior_gmm.weights_ = posterior_weights[:, ix]
posterior_gmm.means_ = posterior_means[:, :, ix]
posterior_gmm.covariances_ = posterior_covariances
posterior_gmm.precisions_ = posterior_precisions
posterior_gmm.precisions_cholesky_ = posterior_precisions_cholesky
y[ix], c[ix] = posterior_gmm.sample(n_samples)
return y, c
def aic(self, X, y):
check_is_fitted(self, ["joint_gmm_", "coef_", "intercept_"])
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
if y.ndim == 1:
y = np.expand_dims(y, 1)
return self.joint_gmm_.aic(np.hstack((X, y)))
def bic(self, X, y):
check_is_fitted(self, ["joint_gmm_", "coef_", "intercept_"])
X = check_array(X, estimator=self, dtype=FLOAT_DTYPES)
if y.ndim == 1:
y = np.expand_dims(y, 1)
return self.joint_gmm_.bic(np.hstack((X, y)))
def _get_full_matrix(m, covariance_type, n_components_, n_features_in_):
if covariance_type == 'full':
return m
elif covariance_type == 'tied':
return np.repeat(m[np.newaxis, :], n_components_, axis=0)
elif covariance_type == 'diag':
full_m = np.zeros((n_components_, n_features_in_, n_features_in_))
for k in range(n_components_):
full_m[k] = np.diag(m[k])
return full_m
elif covariance_type == 'spherical':
full_m = np.zeros((n_components_, n_features_in_, n_features_in_))
for k in range(n_components_):
full_m[k] = np.diag(np.repeat(m[k], n_features_in_))
return full_m