# Process all files

In this notebook, we load the list of files to process them in parallel.
The results are saved into a hdf5 file.


In [1]:
%load_ext autoreload
%autoreload 2

## Load the list of files

In [None]:
import pandas as pd
from pathlib import Path

# list files from the src folder
src = Path('/media/cephfs2/jparham/Joe for Jerome /')
dst = Path('/media/cephfs2/jeromeb/userdata/Baum_group/jparham/Analysis8')

filelist = pd.read_csv(dst/'filelist.csv', index_col=0)
print(f"Number of files {len(filelist)}")
filelist.head()


## Processing

Files are processed in parallel.

In [None]:
import motionquant as mq
import dask
from dask.distributed import Lock
import traceback

def process(filename, results_path, lock=None):
    """Process files and save results in a hf5 file"""
    name = Path(filename).stem        
    img, cell_mask, cell_trj, diff, flow, rho, div, blob_labels, blobs_trj = mq.process(Path(filename))
    if lock is not None:
        lock.acquire()
    mq.save_result(results_path, name, img, cell_mask, cell_trj, diff, flow, rho, div, blob_labels, blobs_trj)
    if lock is not None:
        lock.release()
    df = mq.record(filename, img, cell_mask, cell_trj, diff, flow, rho, div, blob_labels, blobs_trj)
    return df

def process_safe(filename, results_path):
    """Process files catching exceptions"""
    try:
        return process(filename, results_path)
    except Exception as e:
        print(f"file '{filename}' could not be processed")
        print(e)
        print(traceback.print_exc())
        pass
   

results_path = dst/Path('results.h5') # result
if results_path.exists():
    results_path.unlink()

print(f"Saving results in file '{results_path}'")
parallel_processing = False
if parallel_processing:
    from dask.distributed import LocalCluster
    cluster = LocalCluster()
    client = cluster.get_client()
    cluster.scale(2)
    print(cluster)
    lock = Lock('process-sufo')
    tsk = [dask.delayed(process_safe)(filename, results_path, lock) for filename in filelist['path'].iloc]
    res = dask.compute(tsk)
    df = pd.concat(res[0])
else:
    df = pd.concat(process_safe(filename, results_path) for filename in filelist['path'].iloc)


In [7]:
df.to_csv(dst/'results.csv')