Skip to content

Commit

Permalink
Raise exceptions from within futures in plot_pool
Browse files Browse the repository at this point in the history
  • Loading branch information
kclem committed Apr 23, 2023
1 parent 7e807a6 commit a439f09
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
11 changes: 8 additions & 3 deletions CRISPResso2/CRISPRessoAggregateCORE.py
Expand Up @@ -105,13 +105,13 @@ def main():
n_processes = int(args.n_processes)

process_pool = ProcessPoolExecutor(n_processes)
process_results = []
process_futures = []

plot = partial(
run_plot,
num_processes=n_processes,
process_pool=process_pool,
process_results=process_results,
process_futures=process_futures,
)

#glob returns paths including the original prefix
Expand Down Expand Up @@ -845,7 +845,12 @@ def main():
crispresso2Aggregate_info_file, crispresso2_info,
)

wait(process_results)
wait(process_futures)
if args.debug:
debug('Plot pool results:')
for future in process_futures:
debug('future: ' + str(future))
future_results = [f.result() for f in process_futures] #required to raise exceptions thrown from within future
process_pool.shutdown()

info('Analysis Complete!', {'percent_complete': 100})
Expand Down
11 changes: 8 additions & 3 deletions CRISPResso2/CRISPRessoBatchCORE.py
Expand Up @@ -332,13 +332,13 @@ def main():
if args.suppress_report:
save_png = False

process_results = []
process_futures = []
process_pool = ProcessPoolExecutor(n_processes_for_batch)

plot = partial(
CRISPRessoMultiProcessing.run_plot,
num_processes=n_processes_for_batch,
process_results=process_results,
process_futures=process_futures,
process_pool=process_pool,
)

Expand Down Expand Up @@ -855,7 +855,12 @@ def main():
outfile.write(batch_name + "\t" + line)

if not args.suppress_batch_summary_plots:
wait(process_results)
wait(process_futures)
if args.debug:
debug('CRISPResso batch results:')
for future in process_futures:
debug('future: ' + str(future))
future_results = [f.result() for f in process_futures] #required to raise exceptions thrown from within future
process_pool.shutdown()

if not args.suppress_report:
Expand Down
20 changes: 11 additions & 9 deletions CRISPResso2/CRISPRessoCORE.py
Expand Up @@ -3380,13 +3380,13 @@ def count_alternate_alleles(sub_base_vectors, ref_name, ref_sequence, ref_total_

############

plot_pool = ProcessPoolExecutor(n_processes)
plot_results = []
process_pool = ProcessPoolExecutor(n_processes)
process_futures = []
plot = partial(
CRISPRessoMultiProcessing.run_plot,
num_processes=n_processes,
process_pool=plot_pool,
process_results=plot_results,
process_pool=process_pool,
process_futures=process_futures,
)
###############################################################################################################################################
### FIGURE 1: Alignment
Expand Down Expand Up @@ -3455,9 +3455,6 @@ def count_alternate_alleles(sub_base_vectors, ref_name, ref_sequence, ref_total_
crispresso2_info['results']['general_plots']['plot_1d_data'] = [('Allele table', os.path.basename(allele_frequency_table_filename))]
###############################################################################################################################################

#hold mod pct dfs for each amplicon/guide
mod_pct_dfs = {}

ref_percent_complete_start, ref_percent_complete_end = 48, 88
ref_percent_complete_step = (ref_percent_complete_end - ref_percent_complete_start) / float(len(ref_names))
for ref_index, ref_name in enumerate(ref_names):
Expand Down Expand Up @@ -4636,8 +4633,13 @@ def get_scaffold_len(row, scaffold_start_loc, scaffold_seq):
crispresso2_info['results']['general_plots']['plot_11c_data'] = [('Scaffold insertion alleles with insertion sizes', os.path.basename(scaffold_insertion_sizes_filename))]

# join plotting pool
wait(plot_results)
plot_pool.shutdown()
wait(process_futures)
if args.debug:
debug('Plot pool results:')
for future in process_futures:
debug('future: ' + str(future))
future_results = [f.result() for f in process_futures] #required to raise exceptions thrown from within future
process_pool.shutdown()

info('Done!')

Expand Down
7 changes: 4 additions & 3 deletions CRISPResso2/CRISPRessoMultiProcessing.py
Expand Up @@ -251,7 +251,7 @@ def run_parallel_commands(commands_arr,n_processes=1,descriptor='CRISPResso2',co
pool.join()


def run_plot(plot_func, plot_args, num_processes, process_results, process_pool):
def run_plot(plot_func, plot_args, num_processes, process_futures, process_pool):
"""Run a plot in parallel if num_processes > 1, otherwise in serial.
Parameters
Expand All @@ -262,7 +262,7 @@ def run_plot(plot_func, plot_args, num_processes, process_results, process_pool)
The arguments to pass to the plotting function.
num_processes: int
The number of processes to use in parallel.
process_results: List
process_futures: List
The list of futures that submitting the parallel job will return.
process_pool: ProcessPoolExecutor or ThreadPoolExecutor
The pool to submit the job to.
Expand All @@ -272,6 +272,7 @@ def run_plot(plot_func, plot_args, num_processes, process_results, process_pool)
None
"""
if num_processes > 1:
process_results.append(process_pool.submit(plot_func, **plot_args))
process_futures.append(process_pool.submit(plot_func, **plot_args))

else:
plot_func(**plot_args)

0 comments on commit a439f09

Please sign in to comment.