# pysmFISH pipeline running template

This jupyter lab notebook is used to run automated data analysis via papermill. The data will be run through the entire pipeline (full run). A copy of the run notebook will be stored in the processed experiment folder inside the notebooks subfolder. 

In [1]:
from pathlib import Path
import time

from pysmFISH.pipeline import Pipeline

In [None]:
# THIS CELL IS TAGGED PARAMETERS

# REQUIRED ARGUMENTS
# -------------------

# Path to the experiment folder
experiment_fpath = '' 

# Define if it is a 'new' or 're-run' (default: new)
run_type = 're-run'

# Define the parsing type. Can be: 
# original/no_parsing/reparsing_from_processing_folder/reparsing_from_storage
# (default: original)
parsing_type = 'no_parsing'

# OPTIONAL KWARGS
# ----------------

# Path to the cold storage hard drive (default: /fish/rawdata)
raw_data_folder_storage_path = '/fish/rawdata'

# Tag to identify the zarr file with parsed images (default: img_data)
parsed_image_tag = 'img_data'

# Tag to identify the zarr file with preprocessed images (default: preprocessed_img_data)
preprocessed_image_tag = 'preprocessed_img_data'

# Path to the location where the dataset are stored (default: /fish/fish_datasets)
dataset_folder_storage_path = '/fish/fish_datasets'

# Path to the location where the dataset are stored (default: /fish/fish_results)
results_folder_storage_path = '/fish/fish_results'

# Determine if the processed images will be saved (default: True)
save_intermediate_steps = True

# Path to an existing dataset that will be used in the processing
dataset_path = ''

# Number of FOV to process in parallel (20 when running in unmanaged cluster)
chunk_size = 100 #20

# Searching distance that define two dots as identical (default: 10)
same_dot_radius_duplicate_dots = 10

# Define the stitched counts on which the overlapping dotes will be removed 
# (default: microscope_stitched) 
stitching_selected = 'microscope_stitched'

# Value to select the barcodes that are passing the 
# screening (< hamming_distance). (default: 3)
hamming_distance = 3

# Define the name of the system that will run the processing. Can be local/htcondor
# (default htcondor). If engine == local the parameters that define the cluster
# will be ignored
processing_engine = 'unmanaged_cluster'

# Determine if the cluster should scale depending from the processing load
adaptive = True

# Number of cores to use in htcondor (default 20)
cores = 20

# Total memory for all the cores in condor (default 200GB) or per core in local setup
# or per process (nprocs) in the unmanaged cluster (6GB for 40 nprocs)
memory = '6GB'

# Size of the spillover disk for dask in htcondor (default 0.1GB)
disk = '0.1GB'

# Max number of jobs that the cluster can run
maximum_jobs = 15

# define the dask scheduler port. Used for the unmanaged cluster (default 23875)
scheduler_port = 23877

# define the dask dashboard port: Used for the unmanaged cluser (default 25399)
dashboard_port = 25399

# Address of the dask scheduler. Used for the unmanaged cluser. 
# 'localhost' if running of the main node (default 'localhost)
scheduler_address = 'localhost'

# Addresses of the workers (default [monod10,monod11,monod12,monod33])
workers_addresses_list = ['monod09','monod10','monod11','monod31']

# number of processes for each workers (unmanaged cluster) (default 40 for single node monod)
nprocs = 40

# number threads/process (default 1)
nthreads = 1

# Directory where to spill over on the node in htcondor (default /tmp)
local_directory = '/tmp'

# Directory where to store dask and htcondor logs
logs_directory = ''

# Save the intensity of the bits and the flipping direction
save_bits_int = True

# Start the analysis from preprocessed images
start_from_preprocessed_imgs = False

# Resume (check the *_decoded_fov_* files already present in the results folder)
resume = False

# Connect the pipeline to a previously created cluster (default False)
# Can be: 'connect_to_client' ,'connect_to_scheduler'
reuse_cluster = False

# Already active cluster to reconnect to when you want to reuse a cluster (default None)
active_cluster = None

# Already active client to reconnect to when you want to reuse a cluster (default None)
active_client = None

# Running cluster to connect when you want reuse a cluster
active_scheduler_address = None

fresh_tissue_segmentation_engine = 'stardist'

diameter_size=25

min_overlapping_pixels_segmentation= 20

max_expansion_radius = 25

fov_alignement_mode = 'clip'

remove_distinct_genes=True

bead_alignment_centering_mode='scan'

# I changed it down not here
clip_size=0

# Add a note if needed
notes = 'no notes'

In [None]:
# Add a running time tag to the pipeline run name
experiment_fpath = Path(experiment_fpath)
date_tag = time.strftime("%y%m%d_%H_%M_%S")
pipeline_run_name = date_tag + '_' + experiment_fpath.stem

In [None]:
print(f"{notes}")

In [None]:
# Set up the pipeline run

running_pipeline = Pipeline(
                        pipeline_run_name= pipeline_run_name,
                        experiment_fpath= experiment_fpath,
                        run_type= run_type,
                        parsing_type= parsing_type,
                        processing_engine= processing_engine,
                        cores= cores,
                        memory= memory,
                        disk= disk,
                        local_directory= local_directory,
                        chunk_size= chunk_size,
                        raw_data_folder_storage_path= raw_data_folder_storage_path,
                        parsed_image_tag= parsed_image_tag,
                        preprocessed_image_tag= preprocessed_image_tag,
                        dataset_folder_storage_path= dataset_folder_storage_path,
                        results_folder_storage_path= results_folder_storage_path,
                        save_intermediate_steps= save_intermediate_steps,
                        dataset_path= dataset_path,
                        same_dot_radius_duplicate_dots= same_dot_radius_duplicate_dots,
                        stitching_selected= stitching_selected,
                        hamming_distance= hamming_distance,
                        logs_directory= logs_directory,
                        save_bits_int= save_bits_int,
                        start_from_preprocessed_imgs=start_from_preprocessed_imgs,
                        scheduler_port=scheduler_port,
                        dashboard_port=dashboard_port,
                        scheduler_address=scheduler_address,
                        workers_addresses_list=workers_addresses_list,
                        nprocs=nprocs,
                        nthreads=nthreads,
                        reuse_cluster=reuse_cluster,
                        active_cluster=active_cluster,
                        active_client=active_client,
                        active_scheduler_address=active_scheduler_address,
                        adaptive=adaptive,
                        maximum_jobs=maximum_jobs,
                        resume=resume,
                        fresh_tissue_segmentation_engine=fresh_tissue_segmentation_engine,
                        diameter_size=diameter_size,
                        min_overlapping_pixels_segmentation=min_overlapping_pixels_segmentation,
                        max_expansion_radius=max_expansion_radius,
                        fov_alignement_mode = fov_alignement_mode,
                        remove_distinct_genes=remove_distinct_genes,
                        bead_alignment_centering_mode=bead_alignment_centering_mode,
                        clip_size=clip_size,
                        )

In [None]:
%%time
# Full pipeline run
#running_pipeline.run_full()



In [None]:
#Reprocessing
running_pipeline.run_setup()
running_pipeline.run_cluster_activation()
running_pipeline.run_parsing()
running_pipeline.run_required_steps()

#fovs_to_process = [244,189]
#testing_dataset = running_pipeline.data.dataset.loc[running_pipeline.data.dataset.fov_num.isin(fovs_to_process),:]
#running_pipeline.grpd_fovs = testing_dataset.groupby('fov_num')


already_processed = (Path(running_pipeline.experiment_fpath) / "results").glob(
                "*barcodes_max_array*.parquet"
            )
already_done_fovs = []

for fname in already_processed:
    fov_num = int(fname.stem.split("_")[-1])
    already_done_fovs.append(fov_num)
not_processed_fovs = set(running_pipeline.grpd_fovs.groups.keys()).difference(
    set(already_done_fovs)
)
running_pipeline.data.dataset = running_pipeline.data.dataset.loc[running_pipeline.data.dataset.fov_num.isin(not_processed_fovs), :]
running_pipeline.grpd_fovs = running_pipeline.data.dataset.groupby("fov_num")
#running_pipeline.processing_barcoded_eel_step()

running_pipeline.save_bits_int = False
running_pipeline.rerun_from_registration_step()
#running_pipeline.rerun_decoding_step()

running_pipeline.clip_size = 60
running_pipeline.fov_alignment_mode = 'clip'

#running_pipeline.microscope_stitched_remove_dots_eel_graph_step()
running_pipeline.stitch_and_remove_dots_eel_graph_step()

running_pipeline.bead_alignment_centering_mode = 'middle'
running_pipeline.bead_alignment_radius = 1500
running_pipeline.processing_assign_dots()
