-
Notifications
You must be signed in to change notification settings - Fork 342
/
_archesmixin.py
332 lines (291 loc) · 11.8 KB
/
_archesmixin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import logging
import warnings
from copy import deepcopy
from typing import Optional, Union
import anndata
import numpy as np
import pandas as pd
import torch
from anndata import AnnData
from scipy.sparse import csr_matrix
from scvi import REGISTRY_KEYS, settings
from scvi.data import _constants
from scvi.data._constants import _MODEL_NAME_KEY, _SETUP_ARGS_KEY
from scvi.model._utils import parse_device_args
from scvi.nn import FCLayers
from scvi.utils._docstrings import devices_dsp
from ._base_model import BaseModelClass
from ._utils import _initialize_model, _load_saved_files, _validate_var_names
logger = logging.getLogger(__name__)
MIN_VAR_NAME_RATIO = 0.8
class ArchesMixin:
"""Universal scArches implementation."""
@classmethod
@devices_dsp.dedent
def load_query_data(
cls,
adata: AnnData,
reference_model: Union[str, BaseModelClass],
inplace_subset_query_vars: bool = False,
accelerator: str = "auto",
device: Union[int, str] = "auto",
unfrozen: bool = False,
freeze_dropout: bool = False,
freeze_expression: bool = True,
freeze_decoder_first_layer: bool = True,
freeze_batchnorm_encoder: bool = True,
freeze_batchnorm_decoder: bool = False,
freeze_classifier: bool = True,
):
"""Online update of a reference model with scArches algorithm :cite:p:`Lotfollahi21`.
Parameters
----------
adata
AnnData organized in the same way as data used to train model.
It is not necessary to run setup_anndata,
as AnnData is validated against the ``registry``.
reference_model
Either an already instantiated model of the same class, or a path to
saved outputs for reference model.
inplace_subset_query_vars
Whether to subset and rearrange query vars inplace based on vars used to
train reference model.
%(param_accelerator)s
%(param_device)s
unfrozen
Override all other freeze options for a fully unfrozen model
freeze_dropout
Whether to freeze dropout during training
freeze_expression
Freeze neurons corersponding to expression in first layer
freeze_decoder_first_layer
Freeze neurons corersponding to first layer in decoder
freeze_batchnorm_encoder
Whether to freeze batchnorm weight and bias during training for encoder
freeze_batchnorm_decoder
Whether to freeze batchnorm weight and bias during training for decoder
freeze_classifier
Whether to freeze classifier completely. Only applies to `SCANVI`.
"""
_, _, device = parse_device_args(
accelerator=accelerator,
devices=device,
return_device="torch",
validate_single_device=True,
)
attr_dict, var_names, load_state_dict = _get_loaded_data(reference_model, device=device)
if inplace_subset_query_vars:
logger.debug("Subsetting query vars to reference vars.")
adata._inplace_subset_var(var_names)
_validate_var_names(adata, var_names)
registry = attr_dict.pop("registry_")
if _MODEL_NAME_KEY in registry and registry[_MODEL_NAME_KEY] != cls.__name__:
raise ValueError("It appears you are loading a model from a different class.")
if _SETUP_ARGS_KEY not in registry:
raise ValueError(
"Saved model does not contain original setup inputs. "
"Cannot load the original setup."
)
cls.setup_anndata(
adata,
source_registry=registry,
extend_categories=True,
allow_missing_labels=True,
**registry[_SETUP_ARGS_KEY],
)
model = _initialize_model(cls, adata, attr_dict)
adata_manager = model.get_anndata_manager(adata, required=True)
if REGISTRY_KEYS.CAT_COVS_KEY in adata_manager.data_registry:
raise NotImplementedError(
"scArches currently does not support models with extra categorical covariates."
)
version_split = adata_manager.registry[_constants._SCVI_VERSION_KEY].split(".")
if int(version_split[1]) < 8 and int(version_split[0]) == 0:
warnings.warn(
"Query integration should be performed using models trained with "
"version >= 0.8",
UserWarning,
stacklevel=settings.warnings_stacklevel,
)
model.to_device(device)
# model tweaking
new_state_dict = model.module.state_dict()
for key, load_ten in load_state_dict.items():
new_ten = new_state_dict[key]
if new_ten.size() == load_ten.size():
continue
# new categoricals changed size
else:
dim_diff = new_ten.size()[-1] - load_ten.size()[-1]
fixed_ten = torch.cat([load_ten, new_ten[..., -dim_diff:]], dim=-1)
load_state_dict[key] = fixed_ten
model.module.load_state_dict(load_state_dict)
model.module.eval()
_set_params_online_update(
model.module,
unfrozen=unfrozen,
freeze_decoder_first_layer=freeze_decoder_first_layer,
freeze_batchnorm_encoder=freeze_batchnorm_encoder,
freeze_batchnorm_decoder=freeze_batchnorm_decoder,
freeze_dropout=freeze_dropout,
freeze_expression=freeze_expression,
freeze_classifier=freeze_classifier,
)
model.is_trained_ = False
return model
@staticmethod
def prepare_query_anndata(
adata: AnnData,
reference_model: Union[str, BaseModelClass],
return_reference_var_names: bool = False,
inplace: bool = True,
) -> Optional[Union[AnnData, pd.Index]]:
"""Prepare data for query integration.
This function will return a new AnnData object with padded zeros
for missing features, as well as correctly sorted features.
Parameters
----------
adata
AnnData organized in the same way as data used to train model.
It is not necessary to run setup_anndata,
as AnnData is validated against the ``registry``.
reference_model
Either an already instantiated model of the same class, or a path to
saved outputs for reference model.
return_reference_var_names
Only load and return reference var names if True.
inplace
Whether to subset and rearrange query vars inplace or return new AnnData.
Returns
-------
Query adata ready to use in `load_query_data` unless `return_reference_var_names`
in which case a pd.Index of reference var names is returned.
"""
_, var_names, _ = _get_loaded_data(reference_model, device="cpu")
var_names = pd.Index(var_names)
if return_reference_var_names:
return var_names
intersection = adata.var_names.intersection(var_names)
inter_len = len(intersection)
if inter_len == 0:
raise ValueError(
"No reference var names found in query data. "
"Please rerun with return_reference_var_names=True "
"to see reference var names."
)
ratio = inter_len / len(var_names)
logger.info(f"Found {ratio * 100}% reference vars in query data.")
if ratio < MIN_VAR_NAME_RATIO:
warnings.warn(
f"Query data contains less than {MIN_VAR_NAME_RATIO:.0%} of reference "
"var names. This may result in poor performance.",
UserWarning,
stacklevel=settings.warnings_stacklevel,
)
genes_to_add = var_names.difference(adata.var_names)
needs_padding = len(genes_to_add) > 0
if needs_padding:
padding_mtx = csr_matrix(np.zeros((adata.n_obs, len(genes_to_add))))
adata_padding = AnnData(
X=padding_mtx.copy(),
layers={layer: padding_mtx.copy() for layer in adata.layers},
)
adata_padding.var_names = genes_to_add
adata_padding.obs_names = adata.obs_names
# Concatenate object
adata_out = anndata.concat(
[adata, adata_padding],
axis=1,
join="outer",
index_unique=None,
merge="unique",
)
else:
adata_out = adata
# also covers the case when new adata has more var names than old
if not var_names.equals(adata_out.var_names):
adata_out._inplace_subset_var(var_names)
if inplace:
if adata_out is not adata:
adata._init_as_actual(adata_out)
else:
return adata_out
def _set_params_online_update(
module,
unfrozen,
freeze_decoder_first_layer,
freeze_batchnorm_encoder,
freeze_batchnorm_decoder,
freeze_dropout,
freeze_expression,
freeze_classifier,
):
"""Freeze parts of network for scArches."""
# do nothing if unfrozen
if unfrozen:
return
mod_inference_mode = {"encoder_z2_z1", "decoder_z1_z2"}
mod_no_hooks_yes_grad = {"l_encoder"}
if not freeze_classifier:
mod_no_hooks_yes_grad.add("classifier")
parameters_yes_grad = {"background_pro_alpha", "background_pro_log_beta"}
def no_hook_cond(key):
one = (not freeze_expression) and "encoder" in key
two = (not freeze_decoder_first_layer) and "px_decoder" in key
return one or two
def requires_grad(key):
mod_name = key.split(".")[0]
# linear weights and bias that need grad
one = "fc_layers" in key and ".0." in key and mod_name not in mod_inference_mode
# modules that need grad
two = mod_name in mod_no_hooks_yes_grad
three = sum([p in key for p in parameters_yes_grad]) > 0
# batch norm option
four = (
"fc_layers" in key
and ".1." in key
and "encoder" in key
and (not freeze_batchnorm_encoder)
)
five = (
"fc_layers" in key
and ".1." in key
and "decoder" in key
and (not freeze_batchnorm_decoder)
)
if one or two or three or four or five:
return True
else:
return False
for key, mod in module.named_modules():
# skip over protected modules
if key.split(".")[0] in mod_no_hooks_yes_grad:
continue
if isinstance(mod, FCLayers):
hook_first_layer = False if no_hook_cond(key) else True
mod.set_online_update_hooks(hook_first_layer)
if isinstance(mod, torch.nn.Dropout):
if freeze_dropout:
mod.p = 0
# momentum freezes the running stats of batchnorm
freeze_batchnorm = ("decoder" in key and freeze_batchnorm_decoder) or (
"encoder" in key and freeze_batchnorm_encoder
)
if isinstance(mod, torch.nn.BatchNorm1d) and freeze_batchnorm:
mod.momentum = 0
for key, par in module.named_parameters():
if requires_grad(key):
par.requires_grad = True
else:
par.requires_grad = False
def _get_loaded_data(reference_model, device=None):
if isinstance(reference_model, str):
attr_dict, var_names, load_state_dict, _ = _load_saved_files(
reference_model, load_adata=False, map_location=device
)
else:
attr_dict = reference_model._get_user_attributes()
attr_dict = {a[0]: a[1] for a in attr_dict if a[0][-1] == "_"}
var_names = reference_model.adata.var_names
load_state_dict = deepcopy(reference_model.module.state_dict())
return attr_dict, var_names, load_state_dict