In [1]:
import np_tools
import pathlib
import npc_lims
import npc_session

In [2]:
SRC = pathlib.Path(
    '//allen/programs/mindscope/workgroups/dynamicrouting/PilotEphys/Task 2 pilot'
)

MIN_TB_TO_MOVE = 5

sessions_to_skip = sorted(
    s.id for s in npc_lims.get_session_info()
    if "https://github.com/AllenInstitute/npc_lims/issues/5" in s.issues
)

In [None]:
sessions_to_skip

In [None]:
import concurrent.futures    
import dataclasses

@dataclasses.dataclass
class Info:
    size: float
    path: pathlib.Path
    
def get_folder_info(folder):
    if not folder.is_dir():
        return None
    return Info(
        size=np_tools.dir_size_gb(folder),
        path=folder,
    )
    
dirs = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    for folder in executor.map(get_folder_info, SRC.iterdir()):
        if folder:
            dirs.append(folder)


In [4]:
import concurrent.futures    
import dataclasses

@dataclasses.dataclass
class Info:
    size: float
    path: pathlib.Path
    date: str
    session: npc_session.SessionRecord
    
def get_session_dir_info(session_dir):
    if not session_dir.is_dir():
        return None
    try:
        session = npc_session.SessionRecord(session_dir)
    except:
        return None
    if session in sessions_to_skip:
        return None
    return Info(
        size=np_tools.dir_size_gb(session_dir),
        path=session_dir,
        date=session.date,
        session=session,
    )
    
dirs = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    for session in executor.map(get_session_dir_info, SRC.iterdir()):
        if session:
            dirs.append(session)

In [None]:
ephys_sessions = sorted((d for d in dirs if d.size >= 10 and d.size >= 1000), key=lambda d: d.date)
move_sessions = []
move_size = 0
for d in ephys_sessions:
    move_size += d.size
    move_sessions.append(d)
    if move_size >= MIN_TB_TO_MOVE * 1024:
        break
sum(t.size for t in move_sessions)

In [None]:
import datetime

scratch = pathlib.Path('//allen/programs/mindscope/workgroups/dynamicrouting/ben/vast_transfers')
scratch.mkdir(parents=True, exist_ok=True)
dt = datetime.datetime.now()
DEST = pathlib.Path('//allen/aind/scratch/dynamic-routing/Task 2 pilot')

def get_file_manifest(info: Info) -> list[pathlib.Path]:
    return list(info.path.rglob('*'))

def write_manifest(info: Info) -> None:   
    get_manifest_path(info).write_text(
        '\n'.join(p.relative_to(SRC).as_posix() for p in get_file_manifest(info)),
        newline='\n',
    )

def get_hpc_output_path(info: Info) -> pathlib.Path:
    return scratch / f'{info.session}_{dt:%Y-%m-%d_%H%M}.log'

def get_manifest_path(info: Info) -> pathlib.Path:   
    p = get_hpc_output_path(info).with_suffix('.txt')
    p.touch(exist_ok=True)
    return p

def get_log_path(info: Info) -> pathlib.Path:
    p = get_hpc_output_path(info)
    p.touch(exist_ok=True)
    return p

def get_shell_script_path(info: Info) -> pathlib.Path:
    p = get_hpc_output_path(info).with_suffix('.sh')
    p.touch(exist_ok=True)
    return p

def get_rsync_cmd(info: Info) -> str:
    src = info.path.as_posix()
    dest = DEST.as_posix()
    rsync_cmd = f'rsync -Larv --remove-source-files --log-file={get_log_path(info).as_posix()} --files-from={get_manifest_path(info).as_posix()} "{src}" "{dest}"'
    # -a archive mode
    # -r recursive (for dirs)
    # -v verbose
    # -L copy the data that symlinks point to
    # --remove-source-files deletes source files after copying, but not dirs
    return rsync_cmd

def get_shell_script_cmd(info: Info) -> str:
    script = f"""#!/bin/bash
#SBATCH --job-name=npexp_to_incoming                        # Job name
#SBATCH --mail-type=FAIL                                    # Mail events (NONE, BEGIN, END, FAIL, ALL)
#SBATCH --mail-user=ben.hardcastle@alleninstitute.org       # Where to send mail  
#SBATCH --ntasks=1                                          # Run on a single CPU
#SBATCH --mem=4gb                                           # Job memory request (per node)
#SBATCH --time=20:00:00                                     # Time limit hrs:min:sec
#SBATCH --output=dynamicrouting_to_vast%j.log               # Standard output and error log
#SBATCH --partition braintv                                 # Partition used for processing
#SBATCH --tmp=100M                                          # Request the amount of space your jobs needs on /scratch/fast

pwd; hostname; date

echo 'Running rsync job on a single thread'

{get_rsync_cmd(info)}

date
"""
    return script

def write_shell_script(info: Info) -> None:
    get_shell_script_path(info).write_text(get_shell_script_cmd(info), newline='\n') 
    # if writing on Windows, newline==\r\n by default, which isn't compatible with bash on linux

def submit_job(info: Info) -> None:
    with np_tools.hpc as ssh:
        ssh.run(f'sbatch {get_shell_script_path(info).as_posix()}')

def process(info: Info) -> None:
    write_manifest(info)
    write_shell_script(info)

with concurrent.futures.ThreadPoolExecutor() as executor:
    for future in concurrent.futures.as_completed([executor.submit(process, t) for t in ephys_sessions]):
        _ = future.result() # wait for completion / handle exceptions

for info in ephys_sessions:
    submit_job(info) # submit jobs to HPC in series to avoid overloading the scheduler

In [None]:
get_shell_script_cmd(info)