diff --git a/ibllib/io/extractors/fibrephotometry.py b/ibllib/io/extractors/fibrephotometry.py index ef17c46b1..e9cb60321 100644 --- a/ibllib/io/extractors/fibrephotometry.py +++ b/ibllib/io/extractors/fibrephotometry.py @@ -46,15 +46,22 @@ NEUROPHOTOMETRICS_LED_STATES = { 'Condition': { 0: 'No additional signal', - 1: 'Output 0 signal HIGH + Stimulation', - 2: 'Output 0 signal HIGH + Input 0 signal HIGH', - 3: 'Input 0 signal HIGH + Stimulation', - 4: 'Output 0 HIGH + Input 0 HIGH + Stimulation' + 1: 'Output 1 signal HIGH', + 2: 'Output 0 signal HIGH', + 3: 'Stimulation ON', + 4: 'GPIO Line 2 HIGH', + 5: 'GPIO Line 3 HIGH', + 6: 'Input 1 HIGH', + 7: 'Input 0 HIGH', + 8: 'Output 0 signal HIGH + Stimulation', + 9: 'Output 0 signal HIGH + Input 0 signal HIGH', + 10: 'Input 0 signal HIGH + Stimulation', + 11: 'Output 0 HIGH + Input 0 HIGH + Stimulation', }, - 'No LED ON': {0: 0, 1: 48, 2: 528, 3: 544, 4: 560}, - 'L415': {0: 1, 1: 49, 2: 529, 3: 545, 4: 561}, - 'L470': {0: 2, 1: 50, 2: 530, 3: 546, 4: 562}, - 'L560': {0: 4, 1: 52, 2: 532, 3: 548, 4: 564} + 'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560}, + 'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561}, + 'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562}, + 'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564} } diff --git a/ibllib/pipes/behavior_tasks.py b/ibllib/pipes/behavior_tasks.py index 0f9badf85..435a5289a 100644 --- a/ibllib/pipes/behavior_tasks.py +++ b/ibllib/pipes/behavior_tasks.py @@ -5,7 +5,9 @@ from pkg_resources import parse_version import one.alf.io as alfio from one.alf.files import session_path_parts +from one.api import ONE +from ibllib.oneibl.registration import get_lab from ibllib.pipes import base_tasks from ibllib.io.raw_data_loaders import load_settings from ibllib.qc.task_extractors import TaskQCExtractor @@ -455,7 +457,14 @@ def _run(self, upload=True): """ Extracts training status for subject """ - df = training_status.get_latest_training_information(self.session_path, self.one) + + lab = get_lab(self.session_path, self.one.alyx) + if lab == 'cortexlab': + one = ONE(base_url='https://alyx.internationalbrainlab.org') + else: + one = self.one + + df = training_status.get_latest_training_information(self.session_path, one) if df is not None: training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload) # Update status map in JSON field of subjects endpoint diff --git a/ibllib/pipes/dynamic_pipeline.py b/ibllib/pipes/dynamic_pipeline.py index c822e91bf..feb6689d1 100644 --- a/ibllib/pipes/dynamic_pipeline.py +++ b/ibllib/pipes/dynamic_pipeline.py @@ -19,7 +19,8 @@ import ibllib.pipes.video_tasks as vtasks import ibllib.pipes.ephys_tasks as etasks import ibllib.pipes.audio_tasks as atasks -from ibllib.pipes.photometry_tasks import TaskFibrePhotometryPreprocess, TaskFibrePhotometryRegisterRaw +import ibllib.pipes.photometry_tasks as ptasks +# from ibllib.pipes.photometry_tasks import FibrePhotometryPreprocess, FibrePhotometryRegisterRaw _logger = logging.getLogger(__name__) @@ -307,9 +308,9 @@ def make_pipeline(session_path, **pkwargs): tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) - if len(video_kwargs['cameras']) == 3: - tasks[tn] = type((tn := 'DLC'), (epp.EphysDLC,), {})( - **kwargs, parents=[dlc_parent_task]) + if sync_kwargs['sync'] != 'bpod': + tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( + **kwargs, **video_kwargs, parents=[dlc_parent_task]) tasks['PostDLC'] = type('PostDLC', (epp.EphysPostDLC,), {})( **kwargs, parents=[tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]) @@ -357,11 +358,11 @@ def make_pipeline(session_path, **pkwargs): if 'photometry' in devices: # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']} photometry_kwargs = devices['photometry'] - tasks['TaskFibrePhotometryRegisterRaw'] = type('TaskFibrePhotometryRegisterRaw', ( - TaskFibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs) - tasks['TaskFibrePhotometryPreprocess'] = type('TaskFibrePhotometryPreprocess', ( - TaskFibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs, - parents=[tasks['TaskFibrePhotometryRegisterRaw']] + sync_tasks) + tasks['FibrePhotometryRegisterRaw'] = type('FibrePhotometryRegisterRaw', ( + ptasks.FibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs) + tasks['FibrePhotometryPreprocess'] = type('FibrePhotometryPreprocess', ( + ptasks.FibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs, + parents=[tasks['FibrePhotometryRegisterRaw']] + sync_tasks) p = mtasks.Pipeline(session_path=session_path, **pkwargs) p.tasks = tasks diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index 750b840b7..5c439987c 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -21,8 +21,7 @@ from ibllib.io.video import label_from_path, assert_valid_label from ibllib.io.extractors import ephys_fpga, ephys_passive, camera from ibllib.pipes import tasks, base_tasks -from ibllib.pipes.training_preprocessing import TrainingRegisterRaw as EphysRegisterRaw -from ibllib.pipes.training_preprocessing import TrainingStatus as EphysTrainingStatus +import ibllib.pipes.training_preprocessing as tpp from ibllib.pipes.misc import create_alyx_probe_insertions from ibllib.qc.alignment_qc import get_aligned_channels from ibllib.qc.task_extractors import TaskQCExtractor @@ -1324,7 +1323,7 @@ def __init__(self, session_path=None, **kwargs): self.session_path = session_path # level 0 tasks['ExperimentDescriptionRegisterRaw'] = base_tasks.ExperimentDescriptionRegisterRaw(self.session_path) - tasks["EphysRegisterRaw"] = EphysRegisterRaw(self.session_path) + tasks["EphysRegisterRaw"] = tpp.TrainingRegisterRaw(self.session_path) tasks["EphysPulses"] = EphysPulses(self.session_path) tasks["EphysRawQC"] = RawEphysQC(self.session_path) tasks["EphysAudio"] = EphysAudio(self.session_path) @@ -1341,7 +1340,7 @@ def __init__(self, session_path=None, **kwargs): self.session_path, parents=[tasks["EphysVideoCompress"], tasks["EphysPulses"], tasks["EphysTrials"]]) tasks["EphysCellsQc"] = EphysCellsQc(self.session_path, parents=[tasks["SpikeSorting"]]) tasks["EphysDLC"] = EphysDLC(self.session_path, parents=[tasks["EphysVideoCompress"]]) - tasks['EphysTrainingStatus'] = EphysTrainingStatus(self.session_path, parents=[tasks["EphysTrials"]]) + tasks['EphysTrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks["EphysTrials"]]) # level 3 tasks["EphysPostDLC"] = EphysPostDLC(self.session_path, parents=[tasks["EphysDLC"], tasks["EphysTrials"], tasks["EphysVideoSyncQc"]]) diff --git a/ibllib/pipes/photometry_tasks.py b/ibllib/pipes/photometry_tasks.py index 425d5d7fd..a5de12717 100644 --- a/ibllib/pipes/photometry_tasks.py +++ b/ibllib/pipes/photometry_tasks.py @@ -4,14 +4,13 @@ from collections import OrderedDict from ibllib.pipes import tasks, base_tasks -from ibllib.pipes.training_preprocessing import ( - TrainingRegisterRaw, TrainingAudio, TrainingTrials, TrainingDLC, TrainingStatus, TrainingVideoCompress) +import ibllib.pipes.training_preprocessing as tpp from ibllib.io.extractors.fibrephotometry import FibrePhotometry _logger = logging.getLogger('ibllib') -class TaskFibrePhotometryRegisterRaw(base_tasks.RegisterRawDataTask): +class FibrePhotometryRegisterRaw(base_tasks.RegisterRawDataTask): priority = 100 job_size = 'small' @@ -19,22 +18,28 @@ class TaskFibrePhotometryRegisterRaw(base_tasks.RegisterRawDataTask): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.collection = self.get_task_collection(kwargs.get('collection', None)) + self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') @property def signature(self): signature = { 'input_files': [], - 'output_files': [('_mcc_DAQdata.raw.tdms', self.collection, True), - ('_neurophotometrics_fpData.raw.pqt', self.collection, True)] + 'output_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True), + ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)] } return signature -class TaskFibrePhotometryPreprocess(base_tasks.DynamicTask): - signature = { - 'input_files': [('*fpData.raw*', 'raw_photometry_data', True), ], - 'output_files': [('photometry.signal.pqt', 'alf', True), ] - } +class FibrePhotometryPreprocess(base_tasks.DynamicTask): + @property + def signature(self): + signature = { + 'input_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True), + ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)], + 'output_files': [('photometry.signal.pqt', 'alf/photometry', True)] + } + return signature + priority = 90 level = 1 @@ -42,11 +47,12 @@ def __init__(self, session_path, regions=None, **kwargs): super().__init__(session_path, **kwargs) # Task collection (this needs to be specified in the task kwargs) self.collection = self.get_task_collection(kwargs.get('collection', None)) + self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') self.regions = regions def _run(self, **kwargs): - _, out_files = FibrePhotometry(self.session_path, collection=self.collection).extract( - regions=self.regions, save=True) + _, out_files = FibrePhotometry(self.session_path, collection=self.device_collection).extract( + regions=self.regions, path_out=self.session_path.joinpath('alf', 'photometry'), save=True) return out_files @@ -63,13 +69,13 @@ def __init__(self, session_path=None, **kwargs): tasks = OrderedDict() self.session_path = session_path # level 0 - tasks['TrainingRegisterRaw'] = TrainingRegisterRaw(self.session_path) - tasks['TrainingTrials'] = TrainingTrials(self.session_path) - tasks['TrainingVideoCompress'] = TrainingVideoCompress(self.session_path) - tasks['TrainingAudio'] = TrainingAudio(self.session_path) + tasks['TrainingRegisterRaw'] = tpp.TrainingRegisterRaw(self.session_path) + tasks['TrainingTrials'] = tpp.TrainingTrials(self.session_path) + tasks['TrainingVideoCompress'] = tpp.TrainingVideoCompress(self.session_path) + tasks['TrainingAudio'] = tpp.TrainingAudio(self.session_path) # level 1 - tasks['BiasedFibrePhotometry'] = TaskFibrePhotometryPreprocess(self.session_path, parents=[tasks['TrainingTrials']]) - tasks['TrainingStatus'] = TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) - tasks['TrainingDLC'] = TrainingDLC( + tasks['BiasedFibrePhotometry'] = FibrePhotometryPreprocess(self.session_path, parents=[tasks['TrainingTrials']]) + tasks['TrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) + tasks['TrainingDLC'] = tpp.TrainingDLC( self.session_path, parents=[tasks['TrainingVideoCompress']]) self.tasks = tasks diff --git a/ibllib/pipes/training_status.py b/ibllib/pipes/training_status.py index 6651fe58a..24fde8602 100644 --- a/ibllib/pipes/training_status.py +++ b/ibllib/pipes/training_status.py @@ -4,6 +4,8 @@ from ibllib.io.raw_data_loaders import load_bpod from ibllib.oneibl.registration import _get_session_times from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type +from ibllib.io.session_params import read_params +import ibllib.pipes.dynamic_pipeline as dyn from ibllib.plots.snapshot import ReportSnapshot from iblutil.numerical import ismember @@ -17,6 +19,8 @@ from matplotlib.lines import Line2D from datetime import datetime import seaborn as sns +import boto3 +from botocore.exceptions import ProfileNotFound, ClientError TRAINING_STATUS = {'untrainable': (-4, (0, 0, 0, 0)), @@ -31,28 +35,92 @@ 'ready4recording': (5, (20, 255, 91, 255))} +def get_training_table_from_aws(lab, subject): + """ + If aws credentials exist on the local server download the latest training table from aws s3 private bucket + :param lab: + :param subject: + :return: + """ + try: + session = boto3.Session(profile_name='ibl_training') + except ProfileNotFound: + return + + local_file_path = f'/mnt/s0/Data/Subjects/{subject}/training.csv' + dst_bucket_name = 'ibl-brain-wide-map-private' + try: + s3 = session.resource('s3') + bucket = s3.Bucket(name=dst_bucket_name) + bucket.download_file(f'resources/training/{lab}/{subject}/training.csv', + local_file_path) + df = pd.read_csv(local_file_path) + except ClientError: + return + + return df + + +def upload_training_table_to_aws(lab, subject): + """ + If aws credentials exist on the local server upload the training table to aws s3 private bucket + :param lab: + :param subject: + :return: + """ + try: + session = boto3.Session(profile_name='ibl_training') + except ProfileNotFound: + return + + local_file_path = f'/mnt/s0/Data/Subjects/{subject}/training.csv' + dst_bucket_name = 'ibl-brain-wide-map-private' + try: + s3 = session.resource('s3') + bucket = s3.Bucket(name=dst_bucket_name) + bucket.upload_file(local_file_path, + f'resources/training/{lab}/{subject}/training.csv') + except (ClientError, FileNotFoundError): + return + + def get_trials_task(session_path, one): + # TODO this eventually needs to be updated for dynamic pipeline tasks - pipeline = get_pipeline(session_path) - if pipeline == 'training': - from ibllib.pipes.training_preprocessing import TrainingTrials - task = TrainingTrials(session_path, one=one) - elif pipeline == 'ephys': - from ibllib.pipes.ephys_preprocessing import EphysTrials - task = EphysTrials(session_path, one=one) + # If experiment description file then process this + experiment_description_file = read_params(session_path) + if experiment_description_file is not None: + tasks = [] + pipeline = dyn.make_pipeline(session_path) + trials_tasks = [t for t in pipeline.tasks if 'Trials' in t] + for task in trials_tasks: + t = pipeline.tasks.get(task) + t.__init__(session_path, **t.kwargs) + tasks.append(t) else: - try: - # try and look if there is a custom extractor in the personal projects extraction class - import projects.base - task_type = get_session_extractor_type(session_path) - PipelineClass = projects.base.get_pipeline(task_type) - pipeline = PipelineClass(session_path, one) - trials_task_name = next(task for task in pipeline.tasks if 'Trials' in task) - task = pipeline.tasks.get(trials_task_name) - except Exception: - task = None + # Otherwise default to old way of doing things + pipeline = get_pipeline(session_path) + if pipeline == 'training': + from ibllib.pipes.training_preprocessing import TrainingTrials + tasks = [TrainingTrials(session_path)] + elif pipeline == 'ephys': + from ibllib.pipes.ephys_preprocessing import EphysTrials + tasks = [EphysTrials(session_path)] + else: + try: + # try and look if there is a custom extractor in the personal projects extraction class + import projects.base + task_type = get_session_extractor_type(session_path) + PipelineClass = projects.base.get_pipeline(task_type) + pipeline = PipelineClass(session_path, one) + trials_task_name = next(task for task in pipeline.tasks if 'Trials' in task) + task = pipeline.tasks.get(trials_task_name) + task.__init__(session_path) + tasks = [task] + except Exception: + tasks = [] - return task + return tasks def save_path(subj_path): @@ -83,7 +151,7 @@ def load_existing_dataframe(subj_path): return None -def load_trials(sess_path, one, force=True): +def load_trials(sess_path, one, collections=None, force=True): """ Load trials data for session. First attempts to load from local session path, if this fails will attempt to download via ONE, if this also fails, will then attempt to re-extraxt locally @@ -91,27 +159,59 @@ def load_trials(sess_path, one, force=True): :param one: ONE instance :return: """ - # try and load trials locally try: - trials = alfio.load_object(sess_path.joinpath('alf'), 'trials', short_keys=True) + # try and load all trials that are found locally in the session path locally + if collections is None: + trial_locations = list(sess_path.rglob('_ibl_trials.goCueTrigger_times.npy')) + else: + trial_locations = [Path(sess_path).joinpath(c, '_ibl_trials.goCueTrigger_times.npy') for c in collections] + + if len(trial_locations) > 1: + trial_dict = {} + for i, loc in enumerate(trial_locations): + trial_dict[i] = alfio.load_object(loc.parent, 'trials', short_keys=True) + trials = training.concatenate_trials(trial_dict) + elif len(trial_locations) == 1: + trials = alfio.load_object(trial_locations[0].parent, 'trials', short_keys=True) + else: + raise ALFObjectNotFound + if 'probabilityLeft' not in trials.keys(): raise ALFObjectNotFound except ALFObjectNotFound: + # Next try and load all trials data through ONE try: if not force: return None - # attempt to download trials using ONE - trials = one.load_object(one.path2eid(sess_path), 'trials') + eid = one.path2eid(sess_path) + if collections is None: + trial_collections = one.list_datasets(eid, '_ibl_trials.goCueTrigger_times.npy') + if len(trial_collections) > 0: + trial_collections = ['/'.join(c.split('/')[:-1]) for c in trial_collections] + else: + trial_collections = collections + + if len(trial_collections) > 1: + trial_dict = {} + for i, collection in enumerate(trial_collections): + trial_dict[i] = one.load_object(eid, 'trials', collection=collection) + trials = training.concatenate_trials(trial_dict) + elif len(trial_collections) == 1: + trials = one.load_object(eid, 'trials', collection=trial_collections[0]) + else: + raise ALFObjectNotFound + if 'probabilityLeft' not in trials.keys(): raise ALFObjectNotFound except Exception: + # Finally try to rextract the trials data locally try: - task = get_trials_task(sess_path, one=one) - if task is not None: - task.run() - trials = alfio.load_object(sess_path.joinpath('alf'), 'trials') - if 'probabilityLeft' not in trials.keys(): - raise ALFObjectNotFound + # Get the tasks that need to be run + tasks = get_trials_task(sess_path, one) + if len(tasks) > 0: + for task in tasks: + task.run() + return load_trials(sess_path, collections=collections, one=one, force=False) else: trials = None except Exception: # TODO how can i make this more specific @@ -154,7 +254,15 @@ def get_latest_training_information(sess_path, one): """ subj_path = sess_path.parent.parent - df = load_existing_dataframe(subj_path) + sub = subj_path.parts[-1] + if one.mode != 'local': + lab = one.alyx.rest('subjects', 'list', nickname=sub)[0]['lab'] + df = get_training_table_from_aws(lab, sub) + else: + df = None + + if df is None: + df = load_existing_dataframe(subj_path) # Find the dates and associated session paths where we don't have data stored in our dataframe missing_dates = check_up_to_date(subj_path, df) @@ -185,22 +293,28 @@ def get_latest_training_information(sess_path, one): df = compute_training_status(df, date, one) df_lim = df.drop_duplicates(subset='session_path', keep='first') + # Detect untrainable - un_df = df_lim[df_lim['training_status'] == 'in training'].sort_values('date') - if len(un_df) >= 40: - sess = un_df.iloc[39].session_path - df.loc[df['session_path'] == sess, 'training_status'] = 'untrainable' + if 'untrainable' not in df_lim.training_status.values: + un_df = df_lim[df_lim['training_status'] == 'in training'].sort_values('date') + if len(un_df) >= 40: + sess = un_df.iloc[39].session_path + df.loc[df['session_path'] == sess, 'training_status'] = 'untrainable' # Detect unbiasable - un_df = df_lim[df_lim['task_protocol'] == 'biased'].sort_values('date') - if len(un_df) >= 40: - tr_st = un_df[0:40].training_status.unique() - if 'ready4ephysrig' not in tr_st: - sess = un_df.iloc[39].session_path - df.loc[df['session_path'] == sess, 'training_status'] = 'unbiasable' + if 'unbiasable' not in df_lim.training_status.values: + un_df = df_lim[df_lim['task_protocol'] == 'biased'].sort_values('date') + if len(un_df) >= 40: + tr_st = un_df[0:40].training_status.unique() + if 'ready4ephysrig' not in tr_st: + sess = un_df.iloc[39].session_path + df.loc[df['session_path'] == sess, 'training_status'] = 'unbiasable' save_dataframe(df, subj_path) + if one.mode != 'local': + upload_training_table_to_aws(lab, sub) + return df @@ -231,7 +345,7 @@ def compute_training_status(df, compute_date, one, force=True): # compute_date = str(one.path2ref(session_path)['date']) df_temp = df[df['date'] <= compute_date] - df_temp = df_temp.drop_duplicates('session_path') + df_temp = df_temp.drop_duplicates(subset=['session_path', 'task_protocol']) df_temp.sort_values('date') dates = df_temp.date.values @@ -255,6 +369,8 @@ def compute_training_status(df, compute_date, one, force=True): # If habituation skip if df_date.iloc[-1]['task_protocol'] == 'habituation': continue + # Here we should split by protocol in an ideal world but that world isn't today. This is only really relevant for + # chained protocols trials[df_date.iloc[-1]['date']] = load_combined_trials(df_date.session_path.values, one, force=force) protocol.append(df_date.iloc[-1]['task_protocol']) status.append(df_date.iloc[-1]['training_status']) @@ -286,7 +402,7 @@ def pass_through_training_hierachy(status_new, status_old): return status_new -def compute_session_duration_delay_location(sess_path, **kwargs): +def compute_session_duration_delay_location(sess_path, collections=None, **kwargs): """ Get meta information about task. Extracts session duration, delay before session start and location of session @@ -294,7 +410,7 @@ def compute_session_duration_delay_location(sess_path, **kwargs): ---------- sess_path : pathlib.Path, str The session path with the pattern subject/yyyy-mm-dd/nnn. - task_collection : str + collections : list The location within the session path directory of task settings and data. Returns @@ -306,18 +422,111 @@ def compute_session_duration_delay_location(sess_path, **kwargs): str {'ephys_rig', 'training_rig'} The location of the session. """ - md, sess_data = load_bpod(sess_path, **kwargs) - start_time, end_time = _get_session_times(sess_path, md, sess_data) - session_duration = int((end_time - start_time).total_seconds() / 60) + if collections is None: + collections, _ = get_data_collection(sess_path) + + session_duration = 0 + session_delay = 0 + session_location = 'training_rig' + for collection in collections: + md, sess_data = load_bpod(sess_path, task_collection=collection) + if md is None: + continue + try: + start_time, end_time = _get_session_times(sess_path, md, sess_data) + session_duration = session_duration + int((end_time - start_time).total_seconds() / 60) + session_delay = session_delay + md.get('SESSION_START_DELAY_SEC', 0) + except Exception: + session_duration = session_duration + 0 + session_delay = session_delay + 0 + + if 'ephys' in md.get('PYBPOD_BOARD', None): + session_location = 'ephys_rig' + else: + session_location = 'training_rig' + + return session_duration, session_delay, session_location - session_delay = md.get('SESSION_START_DELAY_SEC', 0) - if 'ephys' in md.get('PYBPOD_BOARD', None): - session_location = 'ephys_rig' +def get_data_collection(session_path): + """ + Returns the location of the raw behavioral data and extracted trials data for the session path. If + multiple locations in one session (e.g for dynamic) returns all of these + :param session_path: path of session + :return: + """ + experiment_description_file = read_params(session_path) + if experiment_description_file is not None: + pipeline = dyn.make_pipeline(session_path) + trials_tasks = [t for t in pipeline.tasks if 'Trials' in t] + collections = [pipeline.tasks.get(task).kwargs['collection'] for task in trials_tasks] + if len(collections) == 1 and collections[0] == 'raw_behavior_data': + alf_collections = ['alf'] + elif all(['raw_task_data' in c for c in collections]): + alf_collections = [f'alf/task_{c[-2:]}' for c in collections] + else: + alf_collections = None else: - session_location = 'training_rig' + collections = ['raw_behavior_data'] + alf_collections = ['alf'] + + return collections, alf_collections + + +def get_sess_dict(session_path, one, protocol, alf_collections=None, raw_collections=None, force=True): + + sess_dict = {} + sess_dict['date'] = str(one.path2ref(session_path)['date']) + sess_dict['session_path'] = str(session_path) + sess_dict['task_protocol'] = protocol + + if sess_dict['task_protocol'] == 'habituation': + nan_array = np.array([np.nan]) + sess_dict['performance'], sess_dict['contrasts'], _ = (nan_array, nan_array, np.nan) + sess_dict['performance_easy'] = np.nan + sess_dict['reaction_time'] = np.nan + sess_dict['n_trials'] = np.nan + sess_dict['sess_duration'] = np.nan + sess_dict['n_delay'] = np.nan + sess_dict['location'] = np.nan + sess_dict['training_status'] = 'habituation' + sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ + (np.nan, np.nan, np.nan, np.nan) + sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ + (np.nan, np.nan, np.nan, np.nan) + sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ + (np.nan, np.nan, np.nan, np.nan) - return session_duration, session_delay, session_location + else: + # if we can't compute trials then we need to pass + trials = load_trials(session_path, one, collections=alf_collections, force=force) + if trials is None: + return + + sess_dict['performance'], sess_dict['contrasts'], _ = training.compute_performance(trials, prob_right=True) + if sess_dict['task_protocol'] == 'training': + sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ + training.compute_psychometric(trials) + sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ + (np.nan, np.nan, np.nan, np.nan) + sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ + (np.nan, np.nan, np.nan, np.nan) + else: + sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ + training.compute_psychometric(trials, block=0.5) + sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ + training.compute_psychometric(trials, block=0.2) + sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ + training.compute_psychometric(trials, block=0.8) + + sess_dict['performance_easy'] = training.compute_performance_easy(trials) + sess_dict['reaction_time'] = training.compute_median_reaction_time(trials) + sess_dict['n_trials'] = training.compute_n_trials(trials) + sess_dict['sess_duration'], sess_dict['n_delay'], sess_dict['location'] = \ + compute_session_duration_delay_location(session_path, collections=raw_collections) + sess_dict['training_status'] = 'not_computed' + + return sess_dict def get_training_info_for_session(session_paths, one, force=True): @@ -331,59 +540,33 @@ def get_training_info_for_session(session_paths, one, force=True): # return list of dicts to add sess_dicts = [] for session_path in session_paths: + collections, alf_collections = get_data_collection(session_path) session_path = Path(session_path) - sess_dict = {} - sess_dict['date'] = str(one.path2ref(session_path)['date']) - sess_dict['session_path'] = str(session_path) - sess_dict['task_protocol'] = get_session_extractor_type(session_path) - - if sess_dict['task_protocol'] == 'habituation': - nan_array = np.array([np.nan]) - sess_dict['performance'], sess_dict['contrasts'], _ = (nan_array, nan_array, np.nan) - sess_dict['performance_easy'] = np.nan - sess_dict['reaction_time'] = np.nan - sess_dict['n_trials'] = np.nan - sess_dict['sess_duration'] = np.nan - sess_dict['n_delay'] = np.nan - sess_dict['location'] = np.nan - sess_dict['training_status'] = 'habituation' - sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ - (np.nan, np.nan, np.nan, np.nan) - sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ - (np.nan, np.nan, np.nan, np.nan) - sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ - (np.nan, np.nan, np.nan, np.nan) - + protocols = [] + for c in collections: + protocols.append(get_session_extractor_type(session_path, task_collection=c)) + + un_protocols = np.unique(protocols) + # Example, training, training, biased - training would be combined, biased not + if len(un_protocols) != 1: + print(f'Different protocols in same session {session_path} : {protocols}') + for prot in un_protocols: + if prot is False: + continue + try: + alf = alf_collections[np.where(protocols == prot)[0]] + raw = collections[np.where(protocols == prot)[0]] + except TypeError: + alf = None + raw = None + sess_dict = get_sess_dict(session_path, one, prot, alf_collections=alf, raw_collections=raw, force=force) else: - # if we can't compute trials then we need to pass - trials = load_trials(session_path, one, force=force) - if trials is None: - continue - - sess_dict['performance'], sess_dict['contrasts'], _ = training.compute_performance(trials, prob_right=True) - if sess_dict['task_protocol'] == 'training': - sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ - training.compute_psychometric(trials) - sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ - (np.nan, np.nan, np.nan, np.nan) - sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ - (np.nan, np.nan, np.nan, np.nan) - else: - sess_dict['bias_50'], sess_dict['thres_50'], sess_dict['lapsehigh_50'], sess_dict['lapselow_50'] = \ - training.compute_psychometric(trials, block=0.5) - sess_dict['bias_20'], sess_dict['thres_20'], sess_dict['lapsehigh_20'], sess_dict['lapselow_20'] = \ - training.compute_psychometric(trials, block=0.2) - sess_dict['bias_80'], sess_dict['thres_80'], sess_dict['lapsehigh_80'], sess_dict['lapselow_80'] = \ - training.compute_psychometric(trials, block=0.8) - - sess_dict['performance_easy'] = training.compute_performance_easy(trials) - sess_dict['reaction_time'] = training.compute_median_reaction_time(trials) - sess_dict['n_trials'] = training.compute_n_trials(trials) - sess_dict['sess_duration'], sess_dict['n_delay'], sess_dict['location'] = \ - compute_session_duration_delay_location(session_path) - sess_dict['training_status'] = 'not_computed' - - sess_dicts.append(sess_dict) + prot = un_protocols[0] + sess_dict = get_sess_dict(session_path, one, prot, alf_collections=alf_collections, raw_collections=collections, + force=force) + + if sess_dict is not None: + sess_dicts.append(sess_dict) protocols = [s['task_protocol'] for s in sess_dicts] @@ -395,9 +578,9 @@ def get_training_info_for_session(session_paths, one, force=True): combined_trials = load_combined_trials(session_paths, one, force=force) performance, contrasts, _ = training.compute_performance(combined_trials, prob_right=True) psychs = {} - psychs['50'] = training.compute_psychometric(trials, block=0.5) - psychs['20'] = training.compute_psychometric(trials, block=0.2) - psychs['80'] = training.compute_psychometric(trials, block=0.8) + psychs['50'] = training.compute_psychometric(combined_trials, block=0.5) + psychs['20'] = training.compute_psychometric(combined_trials, block=0.2) + psychs['80'] = training.compute_psychometric(combined_trials, block=0.8) performance_easy = training.compute_performance_easy(combined_trials) reaction_time = training.compute_median_reaction_time(combined_trials) diff --git a/ibllib/pipes/video_tasks.py b/ibllib/pipes/video_tasks.py index cc5065851..4e01f98e4 100644 --- a/ibllib/pipes/video_tasks.py +++ b/ibllib/pipes/video_tasks.py @@ -1,11 +1,16 @@ import logging import subprocess +import cv2 +import traceback +from pathlib import Path from ibllib.io import ffmpeg, raw_daq_loaders from ibllib.pipes import base_tasks -from ibllib.io.video import label_from_path, get_video_meta +from ibllib.io.video import get_video_meta from ibllib.io.extractors import camera from ibllib.qc.camera import run_all_qc as run_camera_qc +from ibllib.misc import check_nvidia_driver +from ibllib.io.video import label_from_path, assert_valid_label _logger = logging.getLogger('ibllib') @@ -283,3 +288,159 @@ def _run(self, **kwargs): sync_collection=self.sync_collection, sync_type=self.sync) return output_files + + +class DLC(base_tasks.VideoTask): + """ + This task relies on a correctly installed dlc environment as per + https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# + + If your environment is set up otherwise, make sure that you set the respective attributes: + t = EphysDLC(session_path) + t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') + t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') + """ + gpu = 1 + cpu = 4 + io_charge = 100 + level = 2 + force = True + job_size = 'large' + + dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') + scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') + + @property + def signature(self): + signature = { + 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], + 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + + [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + + [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True)for cam in self.cameras] + } + + return signature + + def _check_dlcenv(self): + """Check that scripts are present, dlcenv can be activated and get iblvideo version""" + assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ + f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' + assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ + f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' + assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}" + command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable="/bin/bash" + ) + info, error = process.communicate() + if process.returncode != 0: + raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") + version = info.decode("utf-8").strip().split('\n')[-1] + return version + + @staticmethod + def _video_intact(file_mp4): + """Checks that the downloaded video can be opened and is not empty""" + cap = cv2.VideoCapture(str(file_mp4)) + frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) + intact = True if frame_count > 0 else False + cap.release() + return intact + + def _run(self, cams=None, overwrite=False): + # Default to all three cams + cams = cams or self.cameras + cams = assert_valid_label(cams) + # Set up + self.session_id = self.one.path2eid(self.session_path) + actual_outputs = [] + + # Loop through cams + for cam in cams: + # Catch exceptions so that following cameras can still run + try: + # If all results exist and overwrite is False, skip computation + expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) + if overwrite is False and expected_outputs_present is True: + actual_outputs.extend(expected_outputs) + return actual_outputs + else: + file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) + if not file_mp4.exists(): + # In this case we set the status to Incomplete. + _logger.error(f"No raw video file available for {cam}, skipping.") + self.status = -3 + continue + if not self._video_intact(file_mp4): + _logger.error(f"Corrupt raw video file {file_mp4}") + self.status = -1 + continue + # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable + self.version = self._check_dlcenv() + _logger.info(f'iblvideo version {self.version}') + check_nvidia_driver() + + _logger.info(f'Running DLC on {cam}Camera.') + command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable="/bin/bash", + ) + info, error = process.communicate() + # info_str = info.decode("utf-8").strip() + # _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode("utf-8").strip() + _logger.error(f'DLC failed for {cam}Camera.\n\n' + f'++++++++ Output of subprocess for debugging ++++++++\n\n' + f'{error_str}\n' + f'++++++++++++++++++++++++++++++++++++++++++++\n') + self.status = -1 + # We dont' run motion energy, or add any files if dlc failed to run + continue + dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) + actual_outputs.append(dlc_result) + + _logger.info(f'Computing motion energy for {cam}Camera') + command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable="/bin/bash", + ) + info, error = process.communicate() + # info_str = info.decode("utf-8").strip() + # _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode("utf-8").strip() + _logger.error(f'Motion energy failed for {cam}Camera.\n\n' + f'++++++++ Output of subprocess for debugging ++++++++\n\n' + f'{error_str}\n' + f'++++++++++++++++++++++++++++++++++++++++++++\n') + self.status = -1 + continue + actual_outputs.append(next(self.session_path.joinpath('alf').glob( + f'{cam}Camera.ROIMotionEnergy*.npy'))) + actual_outputs.append(next(self.session_path.joinpath('alf').glob( + f'{cam}ROIMotionEnergy.position*.npy'))) + except BaseException: + _logger.error(traceback.format_exc()) + self.status = -1 + continue + # If status is Incomplete, check that there is at least one output. + # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip + if self.status == -3 and len(actual_outputs) == 0: + actual_outputs = None + self.status = -1 + return actual_outputs diff --git a/release_notes.md b/release_notes.md index 45eb76fc6..16451bf49 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,3 +1,12 @@ +## Develop +### features +- Training status pipeline now compatible with dynamic pipeline +- Dynamic DLC task using description file +- Full photometry lookup table + +### bugfixes +- fix for untrainable, unbiasable don't repolulate if already exists + ## Release Notes 2.23 ### Release Notes 2.23.1 2023-06-15 ### features diff --git a/requirements.txt b/requirements.txt index 963934e0d..e2589b722 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,5 +27,5 @@ iblutil>=1.5.0 labcams # widefield extractor ONE-api>=2.0 slidingRP>=1.0.0 # steinmetz lab refractory period metrics -wfield>=0.3.6 # widefield extractor +wfield==0.3.7 # widefield extractor frozen for now (2023/07/15) until Joao fixes latest version psychofit