# Split all `.nd2` files from JP's microfluidics project into individual positions (series in nd2) and covert to `.tif`

Use dask jobqueue to distribute this across a whole host of SLURM workers.

TODO: check whether walltime is long enough...bfconvert seems to be painfully slow

In [2]:
import os
import glob
import sys
from dask_jobqueue import SLURMCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed
from distributed.scheduler import KilledWorker
import subprocess
import pathlib

# Find all nd2 files in basepath

In [3]:
basepath = pathlib.Path("/projects/dk49")

In [4]:
nd2files= basepath.rglob("*.nd2")
nd2files = list(filter(lambda x: not str(x.stem).startswith("._"), nd2files))

In [5]:
nd2files

[PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_03_Eggslinkerlengths/20181003 linker lengths.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_12_Dayafter11amGrowth/Candida.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_12_Dayafter11amGrowth/Staph.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_12_Dayafter11amGrowth/Candida_LL37_1000nM.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_12_EggsCurve/2018_10_12_EggsCurve.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_10_Eggs Linker lenghts/2018_10_10 Linker Lengths001.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_15_EggHaD5LL37/2018_10_15_EggHaD5LL37.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_10_11 Swarming tests/2018_11_10 swarming tests.nd2'),
 PosixPath('/projects/dk49/eggs/2018_Sept_Nov_Boston/2018_09_26 Eggs BioparticlesConjugated/20180926 BioparticlesConjugatedwithacid.nd2'),
 Pos

# define processing functions

In [6]:
def generate_output_pattern(nd2path, dest_root="/scratch/dk49/"):
    '''given a pathlib path to an nd2 file, returns the outputfolder below
    dest_root and the pattern for bfconvert as pathlib objects'''
    tmp = pathlib.Path(dest_root).joinpath(*nd2path.parts[3:])
    folder = tmp.parent
    pattern = str(tmp.stem) + '_s%s.tif'
    return (folder, folder / pattern)

In [7]:
def process_file(nd2path, bfexe = "/projects/dk49/bftools/bfconvert"):
    '''given a pathlib object to an nd2 file, creates the output folder and returns the tuple of cmd strings 
    to split using bfconvert'''
    folder, pattern = generate_output_pattern(nd2path)
    # generate outputfolder including parents
    folder.mkdir(exist_ok=True, parents=True)
    cmd = [bfexe, str(nd2path), str(pattern)]
    return subprocess.check_output(cmd)

# Request Cluster workers using dask and distribute jobs to workers

In [8]:
cluster=Cluster(cores=1, memory='16GB', projects='dk49', walltime="24:00:00")
cluster.scale(20)

In [10]:
client=Client(cluster)


In [12]:
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [13]:
futures = client.map(process_file, nd2files)


NameError: name 'progress' is not defined

In [14]:
futures

[<Future: status: pending, key: process_file-635a6cf67955139558e24802235bf99a>,
 <Future: status: pending, key: process_file-bfef5ad990f85efd8c3210790684ed43>,
 <Future: status: pending, key: process_file-0e0291ab3c666ff4c2da6c6090e64915>,
 <Future: status: finished, type: bytes, key: process_file-9fa48079f332d10fe74d4a90a66c450a>,
 <Future: status: pending, key: process_file-96a42523e6f35a501be9afcab25f7ebe>,
 <Future: status: pending, key: process_file-7b7bb31303e86cc15cbdcb074bcd12e5>,
 <Future: status: pending, key: process_file-1a21c229fea231162ffa261980c68f16>,
 <Future: status: pending, key: process_file-e3d3807d8f765f15e77b5f5705adaead>,
 <Future: status: pending, key: process_file-123e60fcc425a9356c0bed4bd0367bc1>,
 <Future: status: pending, key: process_file-8e75910fde7e3f368f958e868bae167d>,
 <Future: status: pending, key: process_file-fb2102c72e1cc11c7cdbb916e1d0d81f>,
 <Future: status: pending, key: process_file-5fb4cb67b6dfa4da0ef4903dea869918>,
 <Future: status: pending,