-
Notifications
You must be signed in to change notification settings - Fork 4
/
api.py
2828 lines (2474 loc) · 122 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Classes for searching, listing and (down)loading ALyx Files."""
import collections.abc
import urllib.parse
import warnings
import logging
from datetime import datetime, timedelta
from functools import lru_cache, partial
from inspect import unwrap
from pathlib import Path, PurePosixPath
from typing import Any, Union, Optional, List
from uuid import UUID
from urllib.error import URLError
import time
import threading
import os
import pandas as pd
import numpy as np
import requests.exceptions
import packaging.version
from iblutil.io import parquet, hashfile
from iblutil.util import Bunch, flatten
import one.params
import one.webclient as wc
import one.alf.io as alfio
import one.alf.files as alfiles
import one.alf.exceptions as alferr
from .alf.cache import make_parquet_db, DATASETS_COLUMNS, SESSIONS_COLUMNS
from .alf.spec import is_uuid_string, QC
from . import __version__
from one.converters import ConversionMixin, session_record2path
import one.util as util
_logger = logging.getLogger(__name__)
__all__ = ['ONE', 'One', 'OneAlyx']
N_THREADS = os.environ.get('ONE_HTTP_DL_THREADS', 4)
"""int: The number of download threads."""
class One(ConversionMixin):
"""An API for searching and loading data on a local filesystem."""
_search_terms = (
'dataset', 'date_range', 'laboratory', 'number',
'projects', 'subject', 'task_protocol', 'dataset_qc_lte'
)
uuid_filenames = None
"""bool: whether datasets on disk have a UUID in their filename."""
def __init__(self, cache_dir=None, mode='auto', wildcards=True, tables_dir=None):
"""An API for searching and loading data on a local filesystem
Parameters
----------
cache_dir : str, Path
Path to the data files. If Alyx parameters have been set up for this location,
an OneAlyx instance is returned. If data_dir and base_url are None, the default
location is used.
mode : str
Query mode, options include 'auto' (reload cache daily), 'local' (offline) and
'refresh' (always reload cache tables). Most methods have a `query_type` parameter
that can override the class mode.
wildcards : bool
If true, use unix shell style matching instead of regular expressions.
tables_dir : str, pathlib.Path
An optional location of the cache tables. If None, the tables are assumed to be in the
cache_dir.
"""
# get parameters override if inputs provided
super().__init__()
if not getattr(self, 'cache_dir', None): # May already be set by subclass
self.cache_dir = cache_dir or one.params.get_cache_dir()
self._tables_dir = tables_dir or self.cache_dir
self.cache_expiry = timedelta(hours=24)
self.mode = mode
self.wildcards = wildcards # Flag indicating whether to use regex or wildcards
self.record_loaded = False
# assign property here as different instances may work on separate filesystems
self.uuid_filenames = False
# init the cache file
self._reset_cache()
self.load_cache()
def __repr__(self):
return f'One ({"off" if self.offline else "on"}line, {self.cache_dir})'
@property
def offline(self):
"""bool: True if mode is local or no Web client set."""
return self.mode == 'local' or not getattr(self, '_web_client', False)
@util.refresh
def search_terms(self, query_type=None) -> tuple:
"""List the search term keyword args for use in the search method."""
return self._search_terms
def _reset_cache(self):
"""Replace the cache object with a Bunch that contains the right fields."""
self._cache = Bunch({'_meta': {
'expired': False,
'created_time': None,
'loaded_time': None,
'modified_time': None,
'saved_time': None,
'raw': {} # map of original table metadata
}})
def load_cache(self, tables_dir=None, **kwargs):
"""
Load parquet cache files from a local directory.
Parameters
----------
tables_dir : str, pathlib.Path
An optional directory location of the parquet files, defaults to One._tables_dir.
"""
self._reset_cache()
meta = self._cache['_meta']
INDEX_KEY = '.?id'
self._tables_dir = Path(tables_dir or self._tables_dir or self.cache_dir)
for cache_file in self._tables_dir.glob('*.pqt'):
table = cache_file.stem
# we need to keep this part fast enough for transient objects
cache, meta['raw'][table] = parquet.load(cache_file)
if 'date_created' not in meta['raw'][table]:
_logger.warning(f"{cache_file} does not appear to be a valid table. Skipping")
continue
meta['loaded_time'] = datetime.now()
# Set the appropriate index if none already set
if isinstance(cache.index, pd.RangeIndex):
idx_columns = cache.filter(regex=INDEX_KEY).columns.tolist()
if len(idx_columns) == 0:
raise KeyError('Failed to set index')
cache.set_index(idx_columns, inplace=True)
# Patch older tables
cache = util.patch_cache(cache, meta['raw'][table].get('min_api_version'), table)
# Check sorted
# Sorting makes MultiIndex indexing O(N) -> O(1)
if not cache.index.is_monotonic_increasing:
cache.sort_index(inplace=True)
self._cache[table] = cache
if len(self._cache) == 1:
# No tables present
meta['expired'] = True
meta['raw'] = {}
self._cache.update({
'datasets': pd.DataFrame(columns=DATASETS_COLUMNS).set_index(['eid', 'id']),
'sessions': pd.DataFrame(columns=SESSIONS_COLUMNS).set_index('id')})
if self.offline: # In online mode, the cache tables should be downloaded later
warnings.warn(f'No cache tables found in {self._tables_dir}')
created = [datetime.fromisoformat(x['date_created'])
for x in meta['raw'].values() if 'date_created' in x]
if created:
meta['created_time'] = min(created)
meta['expired'] |= datetime.now() - meta['created_time'] > self.cache_expiry
self._cache['_meta'] = meta
return self._cache['_meta']['loaded_time']
def save_cache(self, save_dir=None, force=False):
"""Save One._cache attribute into parquet tables if recently modified.
Parameters
----------
save_dir : str, pathlib.Path
The directory path into which the tables are saved. Defaults to cache directory.
force : bool
If True, the cache is saved regardless of modification time.
"""
threading.Thread(target=lambda: self._save_cache(save_dir=save_dir, force=force)).start()
def _save_cache(self, save_dir=None, force=False):
"""
Checks if another process is writing to file, if so waits before saving.
Parameters
----------
save_dir : str, pathlib.Path
The directory path into which the tables are saved. Defaults to cache directory.
force : bool
If True, the cache is saved regardless of modification time.
"""
TIMEOUT = 5 # Delete lock file this many seconds after creation/modification or waiting
lock_file = Path(self.cache_dir).joinpath('.cache.lock')
save_dir = Path(save_dir or self.cache_dir)
meta = self._cache['_meta']
modified = meta.get('modified_time') or datetime.min
update_time = max(meta.get(x) or datetime.min for x in ('loaded_time', 'saved_time'))
if modified < update_time and not force:
return # Not recently modified; return
# Check if in use by another process
while lock_file.exists():
if time.time() - lock_file.stat().st_ctime > TIMEOUT:
lock_file.unlink(missing_ok=True)
else:
time.sleep(.1)
_logger.info('Saving cache tables...')
lock_file.touch()
try:
for table in filter(lambda x: not x[0] == '_', self._cache.keys()):
metadata = meta['raw'][table]
metadata['date_modified'] = modified.isoformat(sep=' ', timespec='minutes')
filename = save_dir.joinpath(f'{table}.pqt')
parquet.save(filename, self._cache[table], metadata)
_logger.debug(f'Saved {filename}')
meta['saved_time'] = datetime.now()
finally:
lock_file.unlink()
def refresh_cache(self, mode='auto'):
"""Check and reload cache tables.
Parameters
----------
mode : {'local', 'refresh', 'auto', 'remote'}
Options are 'local' (don't reload); 'refresh' (reload); 'auto' (reload if expired);
'remote' (don't reload).
Returns
-------
datetime.datetime
Loaded timestamp.
"""
# NB: Currently modified table will be lost if called with 'refresh';
# May be instances where modified cache is saved then immediately replaced with a new
# remote cache. Also it's too slow :(
# self.save_cache() # Save cache if modified
if mode in {'local', 'remote'}:
pass
elif mode == 'auto':
if datetime.now() - self._cache['_meta']['loaded_time'] >= self.cache_expiry:
_logger.info('Cache expired, refreshing')
self.load_cache()
elif mode == 'refresh':
_logger.debug('Forcing reload of cache')
self.load_cache(clobber=True)
else:
raise ValueError(f'Unknown refresh type "{mode}"')
return self._cache['_meta']['loaded_time']
def _update_cache_from_records(self, strict=False, **kwargs):
"""
Update the cache tables with new records.
Parameters
----------
strict : bool
If not True, the columns don't need to match. Extra columns in input tables are
dropped and missing columns are added and filled with np.nan.
kwargs
pandas.DataFrame or pandas.Series to insert/update for each table.
Returns
-------
datetime.datetime:
A timestamp of when the cache was updated.
Example
-------
>>> session, datasets = util.ses2records(self.get_details(eid, full=True))
... self._update_cache_from_records(sessions=session, datasets=datasets)
Raises
------
AssertionError
When strict is True the input columns must exactly match those oo the cache table,
including the order.
KeyError
One or more of the keyword arguments does not match a table in One._cache.
"""
updated = None
for table, records in kwargs.items():
if records is None or records.empty:
continue
if table not in self._cache:
raise KeyError(f'Table "{table}" not in cache')
if isinstance(records, pd.Series):
records = pd.DataFrame([records])
if not strict:
# Deal with case where there are extra columns in the cache
extra_columns = set(self._cache[table].columns) - set(records.columns)
for col in extra_columns:
n = list(self._cache[table].columns).index(col)
records.insert(n, col, np.nan)
# Drop any extra columns in the records that aren't in cache table
to_drop = set(records.columns) - set(self._cache[table].columns)
records.drop(to_drop, axis=1, inplace=True)
records = records.reindex(columns=self._cache[table].columns)
assert set(self._cache[table].columns) == set(records.columns)
# Update existing rows
to_update = records.index.isin(self._cache[table].index)
self._cache[table].loc[records.index[to_update], :] = records[to_update]
# Assign new rows
to_assign = records[~to_update]
if isinstance(self._cache[table].index, pd.MultiIndex) and not to_assign.empty:
# Concatenate and sort (no other way for non-unique index within MultiIndex)
self._cache[table] = pd.concat([self._cache[table], to_assign]).sort_index()
else:
for index, record in to_assign.iterrows():
self._cache[table].loc[index, :] = record[self._cache[table].columns].values
updated = datetime.now()
self._cache['_meta']['modified_time'] = updated
return updated
def save_loaded_ids(self, sessions_only=False, clear_list=True):
"""
Save list of UUIDs corresponding to datasets or sessions where datasets were loaded.
Parameters
----------
sessions_only : bool
If true, save list of experiment IDs, otherwise the full list of dataset IDs.
clear_list : bool
If true, clear the current list of loaded dataset IDs after saving.
Returns
-------
list of str
List of UUIDs.
pathlib.Path
The file path of the saved list.
"""
if '_loaded_datasets' not in self._cache or self._cache['_loaded_datasets'].size == 0:
warnings.warn('No datasets loaded; check "record_datasets" attribute is True')
return [], None
if sessions_only:
name = 'session_uuid'
idx = self._cache['datasets'].index.isin(self._cache['_loaded_datasets'], 'id')
ids = self._cache['datasets'][idx].index.unique('eid').values
else:
name = 'dataset_uuid'
ids = self._cache['_loaded_datasets']
timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S%z")
filename = Path(self._tables_dir or self.cache_dir) / f'{timestamp}_loaded_{name}s.csv'
pd.DataFrame(ids, columns=[name]).to_csv(filename, index=False)
if clear_list:
self._cache['_loaded_datasets'] = np.array([])
return ids, filename
def _download_datasets(self, dsets, **kwargs) -> List[Path]:
"""
Download several datasets given a set of datasets.
NB: This will not skip files that are already present. Use check_filesystem instead.
Parameters
----------
dsets : list
List of dataset dictionaries from an Alyx REST query OR URL strings
Returns
-------
list of pathlib.Path
A local file path list
"""
# Looking to entirely remove method
pass
def _download_dataset(self, dset, cache_dir=None, **kwargs) -> Path:
"""
Download a dataset from an Alyx REST dictionary
Parameters
----------
dset : pandas.Series, dict, str
A single dataset dictionary from an Alyx REST query OR URL string
cache_dir : str, pathlib.Path
The root directory to save the data in (home/downloads by default)
Returns
-------
pathlib.Path
The local file path
"""
pass # pragma: no cover
def search(self, details=False, query_type=None, **kwargs):
"""
Searches sessions matching the given criteria and returns a list of matching eids
For a list of search terms, use the method
one.search_terms()
For all search parameters, a single value or list may be provided. For `dataset`, the
sessions returned will contain all listed datasets. For the other parameters, the session
must contain at least one of the entries.
For all but `date_range` and `number`, any field that contains the search string is
returned. Wildcards are not permitted, however if wildcards property is True, regular
expressions may be used (see notes and examples).
Parameters
----------
dataset : str, list
One or more dataset names. Returns sessions containing all these datasets.
A dataset matches if it contains the search string e.g. 'wheel.position' matches
'_ibl_wheel.position.npy'.
dataset_qc_lte : str, int, one.alf.spec.QC
A dataset QC value, returns sessions with datasets at or below this QC value, including
those with no QC set. If `dataset` not passed, sessions with any passing QC datasets
are returned, otherwise all matching datasets must have the QC value or below.
date_range : str, list, datetime.datetime, datetime.date, pandas.timestamp
A single date to search or a list of 2 dates that define the range (inclusive). To
define only the upper or lower date bound, set the other element to None.
lab : str
A str or list of lab names, returns sessions from any of these labs.
number : str, int
Number of session to be returned, i.e. number in sequence for a given date.
subject : str, list
A list of subject nicknames, returns sessions for any of these subjects.
task_protocol : str
The task protocol name (can be partial, i.e. any task protocol containing that str
will be found).
projects : str, list
The project name(s) (can be partial, i.e. any project containing that str
will be found).
details : bool
If true also returns a dict of dataset details.
query_type : str, None
Query cache ('local') or Alyx database ('remote').
Returns
-------
list
A list of eids.
(list)
(If details is True) a list of dictionaries, each entry corresponding to a matching
session.
Examples
--------
Search for sessions with 'training' in the task protocol.
>>> eids = one.search(task='training')
Search for sessions by subject 'MFD_04'.
>>> eids = one.search(subject='MFD_04')
Do an exact search for sessions by subject 'FD_04'.
>>> assert one.wildcards is True, 'the wildcards flag must be True for regex expressions'
>>> eids = one.search(subject='^FD_04$')
Search for sessions on a given date, in a given lab, containing trials and spike data.
>>> eids = one.search(date='2023-01-01', lab='churchlandlab', dataset=['trials', 'spikes'])
Search for sessions containing trials and spike data where QC for both are WARNING or less.
>>> eids = one.search(dataset_qc_lte='WARNING', dataset=['trials', 'spikes'])
Search for sessions with any datasets that have a QC of PASS or NOT_SET.
>>> eids = one.search(dataset_qc_lte='PASS')
Notes
-----
- In default and local mode, most queries are case-sensitive partial matches. When lists
are provided, the search is a logical OR, except for `datasets`, which is a logical AND.
- If `dataset_qc` and `datasets` are defined, the QC criterion only applies to the provided
datasets and all must pass for a session to be returned.
- All search terms are true for a session to be returned, i.e. subject matches AND project
matches, etc.
- In remote mode most queries are case-insensitive partial matches.
- In default and local mode, when the one.wildcards flag is True (default), queries are
interpreted as regular expressions. To turn this off set one.wildcards to False.
- In remote mode regular expressions are only supported using the `django` argument.
"""
def all_present(x, dsets, exists=True):
"""Returns true if all datasets present in Series"""
return all(any(x.str.contains(y, regex=self.wildcards) & exists) for y in dsets)
# Iterate over search filters, reducing the sessions table
sessions = self._cache['sessions']
# Ensure sessions filtered in a particular order, with datasets last
search_order = ('date_range', 'number', 'dataset')
def sort_fcn(itm):
return -1 if itm[0] not in search_order else search_order.index(itm[0])
# Validate and get full name for queries
search_terms = self.search_terms(query_type='local')
queries = {util.autocomplete(k, search_terms): v for k, v in kwargs.items()}
for key, value in sorted(queries.items(), key=sort_fcn):
# key = util.autocomplete(key) # Validate and get full name
# No matches; short circuit
if sessions.size == 0:
return ([], None) if details else []
# String fields
elif key in ('subject', 'task_protocol', 'laboratory', 'projects'):
query = '|'.join(util.ensure_list(value))
key = 'lab' if key == 'laboratory' else key
mask = sessions[key].str.contains(query, regex=self.wildcards)
sessions = sessions[mask.astype(bool, copy=False)]
elif key == 'date_range':
start, end = util.validate_date_range(value)
session_date = pd.to_datetime(sessions['date'])
sessions = sessions[(session_date >= start) & (session_date <= end)]
elif key == 'number':
query = util.ensure_list(value)
sessions = sessions[sessions[key].isin(map(int, query))]
# Dataset/QC check is biggest so this should be done last
elif key == 'dataset' or (key == 'dataset_qc_lte' and 'dataset' not in queries):
datasets = self._cache['datasets']
qc = QC.validate(queries.get('dataset_qc_lte', 'FAIL')).name # validate value
has_dset = sessions.index.isin(datasets.index.get_level_values('eid'))
datasets = datasets.loc[(sessions.index.values[has_dset], ), :]
query = util.ensure_list(value if key == 'dataset' else '')
# For each session check any dataset both contains query and exists
mask = (
(datasets
.groupby('eid', sort=False)
.apply(lambda x: all_present(
x['rel_path'], query, x['exists'] & x['qc'].le(qc))
))
)
# eids of matching dataset records
idx = mask[mask].index
# Reduce sessions table by datasets mask
sessions = sessions.loc[idx]
# Return results
if sessions.size == 0:
return ([], None) if details else []
sessions = sessions.sort_values(['date', 'subject', 'number'], ascending=False)
eids = sessions.index.to_list()
if details:
return eids, sessions.reset_index(drop=True).to_dict('records', into=Bunch)
else:
return eids
def _check_filesystem(self, datasets, offline=None, update_exists=True, check_hash=True):
"""Update the local filesystem for the given datasets.
Given a set of datasets, check whether records correctly reflect the filesystem.
Called by load methods, this returns a list of file paths to load and return.
TODO This needs changing; overload for downloading?
This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to
update and save tables. Download_datasets can also call this function.
Parameters
----------
datasets : pandas.Series, pandas.DataFrame, list of dicts
A list or DataFrame of dataset records
offline : bool, None
If false and Web client present, downloads the missing datasets from a remote
repository
update_exists : bool
If true, the cache is updated to reflect the filesystem
check_hash : bool
Consider dataset missing if local file hash does not match. In online mode, the dataset
will be re-downloaded.
Returns
-------
A list of file paths for the datasets (None elements for non-existent datasets)
"""
if isinstance(datasets, pd.Series):
datasets = pd.DataFrame([datasets])
elif not isinstance(datasets, pd.DataFrame):
# Cast set of dicts (i.e. from REST datasets endpoint)
datasets = util.datasets2records(list(datasets))
indices_to_download = [] # indices of datasets that need (re)downloading
files = [] # file path list to return
# If the session_path field is missing from the datasets table, fetch from sessions table
if 'session_path' not in datasets.columns:
if 'eid' not in datasets.index.names:
# Get slice of full frame with eid in index
_dsets = self._cache['datasets'][
self._cache['datasets'].index.get_level_values(1).isin(datasets.index)
]
idx = _dsets.index.get_level_values(1)
else:
_dsets = datasets
idx = pd.IndexSlice[:, _dsets.index.get_level_values(1)]
# Ugly but works over unique sessions, which should be quicker
session_path = (self._cache['sessions']
.loc[_dsets.index.get_level_values(0).unique()]
.apply(session_record2path, axis=1))
datasets.loc[idx, 'session_path'] = \
pd.Series(_dsets.index.get_level_values(0)).map(session_path).values
# First go through datasets and check if file exists and hash matches
for i, rec in datasets.iterrows():
file = Path(self.cache_dir, *rec[['session_path', 'rel_path']])
if self.uuid_filenames:
file = alfiles.add_uuid_string(file, i[1] if isinstance(i, tuple) else i)
if file.exists():
# Check if there's a hash mismatch
# If so, add this index to list of datasets that need downloading
if rec['file_size'] and file.stat().st_size != rec['file_size']:
_logger.warning('local file size mismatch on dataset: %s',
PurePosixPath(rec.session_path, rec.rel_path))
indices_to_download.append(i)
elif check_hash and rec['hash'] is not None:
if hashfile.md5(file) != rec['hash']:
_logger.warning('local md5 mismatch on dataset: %s',
PurePosixPath(rec.session_path, rec.rel_path))
indices_to_download.append(i)
files.append(file) # File exists so add to file list
else:
# File doesn't exist so add None to output file list
files.append(None)
# Add this index to list of datasets that need downloading
indices_to_download.append(i)
if rec['exists'] != file.exists():
with warnings.catch_warnings():
# Suppress future warning: exist column should always be present
msg = '.*indexing on a MultiIndex with a nested sequence of labels.*'
warnings.filterwarnings('ignore', message=msg)
datasets.at[i, 'exists'] = not rec['exists']
if update_exists:
_logger.debug('Updating exists field')
if isinstance(i, tuple):
self._cache['datasets'].loc[i, 'exists'] = not rec['exists']
else: # eid index level missing in datasets input
i = pd.IndexSlice[:, i]
self._cache['datasets'].loc[i, 'exists'] = not rec['exists']
self._cache['_meta']['modified_time'] = datetime.now()
# If online and we have datasets to download, call download_datasets with these datasets
if not (offline or self.offline) and indices_to_download:
dsets_to_download = datasets.loc[indices_to_download]
# Returns list of local file paths and set to variable
new_files = self._download_datasets(dsets_to_download, update_cache=update_exists)
# Add each downloaded file to the output list of files
for i, file in zip(indices_to_download, new_files):
files[datasets.index.get_loc(i)] = file
if self.record_loaded:
loaded = np.fromiter(map(bool, files), bool)
loaded_ids = np.array(datasets.index.to_list())[loaded]
if '_loaded_datasets' not in self._cache:
self._cache['_loaded_datasets'] = np.unique(loaded_ids)
else:
loaded_set = np.hstack([self._cache['_loaded_datasets'], loaded_ids])
self._cache['_loaded_datasets'] = np.unique(loaded_set, axis=0)
# Return full list of file paths
return files
@util.refresh
@util.parse_id
def get_details(self, eid: Union[str, Path, UUID], full: bool = False):
"""Return session details for a given session ID
Parameters
----------
eid : str, UUID, pathlib.Path, dict
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path.
full : bool
If True, returns a DataFrame of session and dataset info
Returns
-------
pd.Series, pd.DataFrame
A session record or full DataFrame with dataset information if full is True
"""
# Int ids return DataFrame, making str eid a list ensures Series not returned
try:
det = self._cache['sessions'].loc[[eid]]
assert len(det) == 1
except KeyError:
raise alferr.ALFObjectNotFound(eid)
except AssertionError:
raise alferr.ALFMultipleObjectsFound(f'Multiple sessions in cache for eid {eid}')
if not full:
return det.iloc[0]
# .reset_index('eid', drop=True)
return self._cache['datasets'].join(det, on='eid', how='right')
@util.refresh
def list_subjects(self) -> List[str]:
"""
List all subjects in database
Returns
-------
list
Sorted list of subject names
"""
return self._cache['sessions']['subject'].sort_values().unique().tolist()
@util.refresh
def list_datasets(
self, eid=None, filename=None, collection=None, revision=None, qc=QC.FAIL,
ignore_qc_not_set=False, details=False, query_type=None
) -> Union[np.ndarray, pd.DataFrame]:
"""
Given an eid, return the datasets for those sessions.
If no eid is provided, a list of all datasets is returned. When details is false, a sorted
array of unique datasets is returned (their relative paths).
Parameters
----------
eid : str, UUID, pathlib.Path, dict
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path.
filename : str, dict, list
Filters datasets and returns only the ones matching the filename.
Supports lists asterisks as wildcards. May be a dict of ALF parts.
collection : str, list
The collection to which the object belongs, e.g. 'alf/probe01'.
This is the relative path of the file from the session root.
Supports asterisks as wildcards.
revision : str
Filters datasets and returns only the ones matching the revision.
Supports asterisks as wildcards.
qc : str, int, one.alf.spec.QC
Returns datasets at or below this QC level. Integer values should correspond to the QC
enumeration NOT the qc category column codes in the pandas table.
ignore_qc_not_set : bool
When true, do not return datasets for which QC is NOT_SET.
details : bool
When true, a pandas DataFrame is returned, otherwise a numpy array of
relative paths (collection/revision/filename) - see one.alf.spec.describe for details.
query_type : str
Query cache ('local') or Alyx database ('remote').
Returns
-------
np.ndarray, pd.DataFrame
Slice of datasets table or numpy array if details is False.
Examples
--------
List all unique datasets in ONE cache
>>> datasets = one.list_datasets()
List all datasets for a given experiment
>>> datasets = one.list_datasets(eid)
List all datasets for an experiment that match a collection name
>>> probe_datasets = one.list_datasets(eid, collection='*probe*')
List datasets for an experiment that have 'wheel' in the filename
>>> datasets = one.list_datasets(eid, filename='*wheel*')
List datasets for an experiment that are part of a 'wheel' or 'trial(s)' object
>>> datasets = one.list_datasets(eid, {'object': ['wheel', 'trial?']})
"""
datasets = self._cache['datasets']
filter_args = dict(
collection=collection, filename=filename, wildcards=self.wildcards, revision=revision,
revision_last_before=False, assert_unique=False, qc=qc,
ignore_qc_not_set=ignore_qc_not_set)
if not eid:
datasets = util.filter_datasets(datasets, **filter_args)
return datasets.copy() if details else datasets['rel_path'].unique().tolist()
eid = self.to_eid(eid) # Ensure we have a UUID str list
if not eid:
return datasets.iloc[0:0] # Return empty
try:
datasets = datasets.loc[(eid,), :]
except KeyError:
return datasets.iloc[0:0] # Return empty
datasets = util.filter_datasets(datasets, **filter_args)
# Return only the relative path
return datasets if details else datasets['rel_path'].sort_values().values.tolist()
@util.refresh
def list_collections(self, eid=None, filename=None, collection=None, revision=None,
details=False, query_type=None) -> Union[np.ndarray, dict]:
"""
List the collections for a given experiment.
If no experiment ID is given, all collections are returned.
Parameters
----------
eid : [str, UUID, Path, dict]
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path
filename : str, dict, list
Filters datasets and returns only the collections containing matching datasets.
Supports lists asterisks as wildcards. May be a dict of ALF parts.
collection : str, list
Filter by a given pattern. Supports asterisks as wildcards.
revision : str
Filters collections and returns only the ones with the matching revision.
Supports asterisks as wildcards
details : bool
If true a dict of pandas datasets tables is returned with collections as keys,
otherwise a numpy array of unique collections
query_type : str
Query cache ('local') or Alyx database ('remote')
Returns
-------
list, dict
A list of unique collections or dict of datasets tables
Examples
--------
List all unique collections in ONE cache
>>> collections = one.list_collections()
List all collections for a given experiment
>>> collections = one.list_collections(eid)
List all collections for a given experiment and revision
>>> revised = one.list_collections(eid, revision='2020-01-01')
List all collections that have 'probe' in the name.
>>> collections = one.list_collections(eid, collection='*probe*')
List collections for an experiment that have datasets with 'wheel' in the name
>>> collections = one.list_collections(eid, filename='*wheel*')
List collections for an experiment that contain numpy datasets
>>> collections = one.list_collections(eid, {'extension': 'npy'})
"""
filter_kwargs = dict(eid=eid, collection=collection, filename=filename,
revision=revision, query_type=query_type)
datasets = self.list_datasets(details=True, **filter_kwargs).copy()
datasets['collection'] = datasets.rel_path.apply(
lambda x: alfiles.rel_path_parts(x, assert_valid=False)[0] or ''
)
if details:
return {k: table.drop('collection', axis=1)
for k, table in datasets.groupby('collection')}
else:
return datasets['collection'].unique().tolist()
@util.refresh
def list_revisions(self, eid=None, filename=None, collection=None, revision=None,
details=False, query_type=None):
"""
List the revisions for a given experiment.
If no experiment id is given, all collections are returned.
Parameters
----------
eid : str, UUID, Path, dict
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path.
filename : str, dict, list
Filters datasets and returns only the revisions containing matching datasets.
Supports lists asterisks as wildcards. May be a dict of ALF parts.
collection : str, list
Filter by a given collection. Supports asterisks as wildcards.
revision : str, list
Filter by a given pattern. Supports asterisks as wildcards.
details : bool
If true a dict of pandas datasets tables is returned with collections as keys,
otherwise a numpy array of unique collections.
query_type : str
Query cache ('local') or Alyx database ('remote').
Returns
-------
list, dict
A list of unique collections or dict of datasets tables.
Examples
--------
List all revisions in ONE cache
>>> revisions = one.list_revisions()
List all revisions for a given experiment
>>> revisions = one.list_revisions(eid)
List all revisions for a given experiment that contain the trials object
>>> revisions = one.list_revisions(eid, filename={'object': 'trials'})
List all revisions for a given experiment that start with 2020 or 2021
>>> revisions = one.list_revisions(eid, revision=['202[01]*'])
"""
datasets = self.list_datasets(eid=eid, details=True, query_type=query_type).copy()
# Call filter util ourselves with the revision_last_before set to False
kwargs = dict(collection=collection, filename=filename, revision=revision,
revision_last_before=False, wildcards=self.wildcards, assert_unique=False)
datasets = util.filter_datasets(datasets, **kwargs)
datasets['revision'] = datasets.rel_path.apply(
lambda x: (alfiles.rel_path_parts(x, assert_valid=False)[1] or '').strip('#')
)
if details:
return {k: table.drop('revision', axis=1)
for k, table in datasets.groupby('revision')}
else:
return datasets['revision'].unique().tolist()
@util.refresh
@util.parse_id
def load_object(self,
eid: Union[str, Path, UUID],
obj: str,
collection: Optional[str] = None,
revision: Optional[str] = None,
query_type: Optional[str] = None,
download_only: bool = False,
check_hash: bool = True,
**kwargs) -> Union[alfio.AlfBunch, List[Path]]:
"""
Load all attributes of an ALF object from a Session ID and an object name.
Any datasets with matching object name will be loaded.
Parameters
----------
eid : str, UUID, pathlib.Path, dict
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path.
obj : str
The ALF object to load. Supports asterisks as wildcards.
collection : str
The collection to which the object belongs, e.g. 'alf/probe01'.
This is the relative path of the file from the session root.
Supports asterisks as wildcards.
revision : str
The dataset revision (typically an ISO date). If no exact match, the previous
revision (ordered lexicographically) is returned. If None, the default revision is
returned (usually the most recent revision). Regular expressions/wildcards not
permitted.
query_type : str
Query cache ('local') or Alyx database ('remote').
download_only : bool
When true the data are downloaded and the file path is returned. NB: The order of the
file path list is undefined.
check_hash : bool
Consider dataset missing if local file hash does not match. In online mode, the dataset
will be re-downloaded.
kwargs
Additional filters for datasets, including namespace and timescale. For full list
see the :func:`one.alf.spec.describe` function.
Returns
-------
one.alf.io.AlfBunch, list
An ALF bunch or if download_only is True, a list of Paths objects.
Examples
--------
>>> load_object(eid, 'moves')
>>> load_object(eid, 'trials')
>>> load_object(eid, 'spikes', collection='*probe01') # wildcards is True
>>> load_object(eid, 'spikes', collection='.*probe01') # wildcards is False
>>> load_object(eid, 'spikes', namespace='ibl')
>>> load_object(eid, 'spikes', timescale='ephysClock')
Load specific attributes:
>>> load_object(eid, 'spikes', attribute=['times*', 'clusters'])
"""
query_type = query_type or self.mode
datasets = self.list_datasets(eid, details=True, query_type=query_type)
if len(datasets) == 0:
raise alferr.ALFObjectNotFound(obj)
dataset = {'object': obj, **kwargs}
datasets = util.filter_datasets(datasets, dataset, collection, revision,
assert_unique=False, wildcards=self.wildcards)
# Validate result before loading
if len(datasets) == 0:
raise alferr.ALFObjectNotFound(obj)
parts = [alfiles.rel_path_parts(x) for x in datasets.rel_path]
unique_objects = set(x[3] or '' for x in parts)
unique_collections = set(x[0] or '' for x in parts)
if len(unique_objects) > 1:
raise alferr.ALFMultipleObjectsFound(*unique_objects)