In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pathlib import Path
import sys
import logging
import inspect
logging.basicConfig(level=logging.INFO)
print(sys.executable)

/home/peromoseq/anaconda3/envs/airflow/bin/python3


### Get recording info (google sheets)

In [3]:
import requests
import pandas as pd
from io import BytesIO

In [4]:
# spreadsheet_url = 'https://docs.google.com/spreadsheet/ccc?key=14HIqUaSl_n-91hpAvmACY_iVY9nLKdlA6qklhxfZon0&output=csv&gid=0'
spreadsheet_url = "https://docs.google.com/spreadsheet/ccc?key=1jACsUmxuJ9Une59qmvzZGc1qXezKhKzD1zho2sEfcrU&output=csv&gid=0"
response = requests.get(spreadsheet_url)
recording_df = pd.read_csv(BytesIO(response.content))

In [5]:
recording_df[:3]

Unnamed: 0,Subject,duration_m,video_recording_id,ephys_id,calibration_id,calibration_board_shape,calibration_square_size,video_location_on_o2,ephys_location_on_o2,calibration_location_on_o2,samplerate
0,M04002,10,24-05-01-13-26-43-110846,2024-05-01_13-26-37,24-05-01-13-45-07-825493,,,/n/groups/datta/tim_sainburg/datasets/chronic2...,/n/groups/datta/tim_sainburg/datasets/chronic2...,/n/groups/datta/tim_sainburg/datasets/chronic2...,150


### Submit job

In [6]:
output_directory = Path("/n/groups/datta/tim_sainburg/datasets/scratch/") / "240808-3d-pipeline"

In [7]:
from multicamera_airflow_pipeline.tim_240731.interface.o2 import O2Runner
from pathlib import Path
from datetime import datetime

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Python interpreter binary location: /home/peromoseq/anaconda3/envs/airflow/bin/python3
  from .autonotebook import tqdm as notebook_tqdm


In [8]:
for idx, recording_row in recording_df.iterrows():
    break

In [9]:
recording_row

Subject                                                                  M04002
duration_m                                                                   10
video_recording_id                                     24-05-01-13-26-43-110846
ephys_id                                                    2024-05-01_13-26-37
calibration_id                                         24-05-01-13-45-07-825493
calibration_board_shape                                                     NaN
calibration_square_size                                                     NaN
video_location_on_o2          /n/groups/datta/tim_sainburg/datasets/chronic2...
ephys_location_on_o2          /n/groups/datta/tim_sainburg/datasets/chronic2...
calibration_location_on_o2    /n/groups/datta/tim_sainburg/datasets/chronic2...
samplerate                                                                  150
Name: 0, dtype: object

In [10]:
def convert_minutes_to_hms(minutes_float):
    # Convert minutes to total seconds
    total_seconds = int(minutes_float * 60)
    
    # Extract hours, minutes, and seconds using divmod
    hours, remainder = divmod(total_seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    
    # Format as HH:MM:SS
    return f"{hours:02}:{minutes:02}:{seconds:02}"

In [11]:
duration_requested = convert_minutes_to_hms(recording_row.duration_m)

In [12]:
samplerate = recording_row.samplerate
#trigger_pin = recording_row.trigger_pin

In [13]:
job_directory = Path('/n/groups/datta/tim_sainburg/datasets/scratch/jobs')
job_directory.mkdir(exist_ok=True, parents=True)

In [14]:
current_datetime_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
current_datetime_str

'20240808_151100_078322'

In [15]:
remote_job_directory = job_directory / current_datetime_str

In [16]:
# where the data is located
recording_directory = Path(recording_row.video_location_on_o2) / recording_row.video_recording_id
# where to save data
output_directory_camera_sync = output_directory / 'camera_sync2' / recording_row.video_recording_id
output_directory_camera_sync.mkdir(parents=True, exist_ok=True)

In [17]:
config_file = Path("/n/groups/datta/tim_sainburg/projects/multicamera_airflow_pipeline/multicamera_airflow_pipeline/tim_240731/default_config.yaml")

In [18]:
import yaml; 

In [19]:
config = yaml.safe_load(open(config_file, 'r'))

In [20]:
config

{'sync_cameras': {'trigger_pin': 2}}

In [21]:
params = {
    "recording_directory": recording_directory.as_posix(),
    "output_directory_camera_sync": output_directory_camera_sync.as_posix(),
    "samplerate": samplerate,
    #"trigger_pin": trigger_pin,
}

In [22]:
runner = O2Runner(
    job_name_prefix = 'test_submit_camera_sync',
    remote_job_directory = remote_job_directory,
    conda_env = "/n/groups/datta/tim_sainburg/conda_envs/peromoseq",
    o2_username = "tis697",
    o2_server="login.o2.rc.hms.harvard.edu",
    job_params = params, 
    o2_n_cpus = 1,
    o2_memory="2G",
    o2_time_limit=duration_requested,
    o2_queue="priority",
)

INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4)
INFO:paramiko.transport:Auth banner: b'Problems logging in?\nUse your lower case HMS ID, like abc123, not ABC123.\nIf locked out, see:\nhttps://it.hms.harvard.edu/i-want/reset-password-or-unlock-your-hms-account\n'
INFO:paramiko.transport:Authentication (publickey) successful!


In [23]:
def sync_cameras(params, config):
    from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
    synchronizer = CameraSynchronizer(
        recording_directory = params["recording_directory"],
        output_directory = params["output_directory_camera_sync"],
        samplerate = params["samplerate"], # camera sample rate
        #trigger_pin = params["trigger_pin"], # Which pin camera trigger was on
        **config
    )
    synchronizer.run()

In [24]:
runner.python_script = f"""
# load params
import yaml
params_file = "{runner.remote_job_directory / f"{runner.job_name}.params.yaml"}"
config_file = "{config_file.as_posix()}"

params = yaml.safe_load(open(params_file, 'r'))
config = yaml.safe_load(open(config_file, 'r'))
    
# grab sync cameras function
{inspect.getsource(sync_cameras)}

# run 
sync_cameras(params, config["sync_cameras"])
"""

In [25]:
print(runner.python_script)


# load params
import yaml
params_file = "/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322/test_submit_camera_sync_24-08-08-2024-11-00-205605.params.yaml"
config_file = "/n/groups/datta/tim_sainburg/projects/multicamera_airflow_pipeline/multicamera_airflow_pipeline/tim_240731/default_config.yaml"

params = yaml.safe_load(open(params_file, 'r'))
config = yaml.safe_load(open(config_file, 'r'))
    
# grab sync cameras function
def sync_cameras(params, config):
    from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
    synchronizer = CameraSynchronizer(
        recording_directory = params["recording_directory"],
        output_directory = params["output_directory_camera_sync"],
        samplerate = params["samplerate"], # camera sample rate
        #trigger_pin = params["trigger_pin"], # Which pin camera trigger was on
        **config
    )
    synchronizer.run()


# run 
sync_cameras(params, config["sync_cameras"])



In [26]:
runner.run()

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Creating remote job directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Creating remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Successfully created remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Writing job files to remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322
INFO:paramiko.transport.sftp:[chan 1] Opened sftp connection (server version 3)
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Transferring /tmp/tmpiko3669r to login.o2.rc.hms.harvard.edu:/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322/test_submit_camera_sync_24-08-08-2024-11-00-205605.py
INFO:mult

In [27]:
import time

In [28]:
# 10000/60/24 = roughly 1 week
for i in range(10000):
    # check job status every n seconds
    status = runner.check_job_status()
    if status:
        break
    time.sleep(60)

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Checking job status: 43641811
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:The job is currently running.
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Checking job status: 43641811
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:The job has finished successfully.


In [29]:
def check_camera_sync_completion(output_directory):
    output_directory = Path(output_directory)
    if (output_directory / "camera_sync.csv").exists():
        return True
    else:
        return False

In [30]:
check_camera_sync_completion(params['output_directory_camera_sync'])

True

In [31]:
f"sacct -j {runner.slurm_job_id}"

'sacct -j 43641811'

In [32]:
runner.output_log

PosixPath('/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322/test_submit_camera_sync_24-08-08-2024-11-00-205605.log')

In [33]:
!tail {runner.output_log}

In [34]:
import textwrap

In [38]:
test = textwrap.dedent(
f"""
    # load params
    import yaml
    params_file = "{runner.remote_job_directory / f"{runner.job_name}.params.yaml"}"
    config_file = "{config_file.as_posix()}"

    params = yaml.safe_load(open(params_file, 'r'))
    config = yaml.safe_load(open(config_file, 'r'))

    # grab sync cameras function
    from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
    synchronizer = CameraSynchronizer(
        recording_directory=params["recording_directory"],
        output_directory=params["output_directory_camera_sync"],
        samplerate=params["samplerate"],  # camera sample rate
        # trigger_pin=params["trigger_pin"],  # Which pin camera trigger was on
        **config,
    )
    synchronizer.run()

    # run 
    sync_cameras(params, config["sync_cameras"])
    """
)

In [39]:
print(test)


# load params
import yaml
params_file = "/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_151100_078322/test_submit_camera_sync_24-08-08-2024-11-00-205605.params.yaml"
config_file = "/n/groups/datta/tim_sainburg/projects/multicamera_airflow_pipeline/multicamera_airflow_pipeline/tim_240731/default_config.yaml"

params = yaml.safe_load(open(params_file, 'r'))
config = yaml.safe_load(open(config_file, 'r'))

# grab sync cameras function
from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
synchronizer = CameraSynchronizer(
    recording_directory=params["recording_directory"],
    output_directory=params["output_directory_camera_sync"],
    samplerate=params["samplerate"],  # camera sample rate
    # trigger_pin=params["trigger_pin"],  # Which pin camera trigger was on
    **config,
)
synchronizer.run()

# run 
sync_cameras(params, config["sync_cameras"])

