|
3 | 3 | //! This module handles the downloading of `Source`s and figuring out which |
4 | 4 | //! files to use for `Plugins`. |
5 | 5 |
|
| 6 | +use std::cmp; |
6 | 7 | use std::collections::HashSet; |
| 8 | +use std::convert::TryInto; |
7 | 9 | use std::fmt; |
8 | 10 | use std::fs; |
9 | 11 | use std::path::{Path, PathBuf}; |
10 | 12 | use std::result; |
| 13 | +use std::sync; |
11 | 14 |
|
12 | 15 | use anyhow::{anyhow, bail, Context as ResultExt, Error, Result}; |
13 | 16 | use indexmap::{indexmap, IndexMap}; |
14 | 17 | use itertools::{Either, Itertools}; |
15 | 18 | use maplit::hashmap; |
16 | 19 | use once_cell::sync::Lazy; |
17 | | -use rayon::prelude::*; |
18 | 20 | use serde::{Deserialize, Serialize}; |
19 | 21 | use url::Url; |
20 | 22 | use walkdir::WalkDir; |
@@ -547,26 +549,46 @@ impl Config { |
547 | 549 | .map(|(_, locked)| locked) |
548 | 550 | .collect::<Vec<_>>() |
549 | 551 | } else { |
550 | | - // Install the sources in parallel. |
551 | | - map.into_par_iter() |
552 | | - .map(|(source, plugins)| { |
553 | | - let source_name = source.to_string(); |
554 | | - let source = source |
555 | | - .lock(ctx) |
556 | | - .with_context(s!("failed to install source `{}`", source_name))?; |
557 | | - |
558 | | - let mut locked = Vec::with_capacity(plugins.len()); |
559 | | - for (index, plugin) in plugins { |
560 | | - let name = plugin.name.clone(); |
561 | | - locked.push(( |
562 | | - index, |
563 | | - plugin |
564 | | - .lock(ctx, &templates, source.clone(), matches, apply) |
565 | | - .with_context(s!("failed to install plugin `{}`", name)), |
566 | | - )); |
567 | | - } |
568 | | - Ok(locked) |
569 | | - }) |
| 552 | + /// The maximmum number of threads to use while downloading sources. |
| 553 | + const MAX_THREADS: u32 = 8; |
| 554 | + |
| 555 | + // Create a thread pool and install the sources in parallel. |
| 556 | + let thread_count = cmp::min(count.try_into().unwrap_or(MAX_THREADS), MAX_THREADS); |
| 557 | + let mut pool = scoped_threadpool::Pool::new(thread_count); |
| 558 | + let (tx, rx) = sync::mpsc::channel(); |
| 559 | + let templates_ref = &templates; |
| 560 | + |
| 561 | + pool.scoped(move |scoped| { |
| 562 | + for (source, plugins) in map { |
| 563 | + let tx = tx.clone(); |
| 564 | + scoped.execute(move || { |
| 565 | + tx.send((|| { |
| 566 | + let source_name = source.to_string(); |
| 567 | + let source = source |
| 568 | + .lock(ctx) |
| 569 | + .with_context(s!("failed to install source `{}`", source_name))?; |
| 570 | + |
| 571 | + let mut locked = Vec::with_capacity(plugins.len()); |
| 572 | + for (index, plugin) in plugins { |
| 573 | + let name = plugin.name.clone(); |
| 574 | + locked.push(( |
| 575 | + index, |
| 576 | + plugin |
| 577 | + .lock(ctx, templates_ref, source.clone(), matches, apply) |
| 578 | + .with_context(s!("failed to install plugin `{}`", name)), |
| 579 | + )); |
| 580 | + } |
| 581 | + Ok(locked) |
| 582 | + })()) |
| 583 | + .expect("oops! did main thread die?"); |
| 584 | + }) |
| 585 | + } |
| 586 | + scoped.join_all(); |
| 587 | + }); |
| 588 | + |
| 589 | + rx.iter() |
| 590 | + // all threads must send a response |
| 591 | + .take(count) |
570 | 592 | // The result of this is basically an `Iter<Result<Vec<(usize, Result)>, _>>` |
571 | 593 | // The first thing we need to do is to filter out the failures and record the |
572 | 594 | // errors that occurred while installing the source in our `errors` list. |
|
0 commit comments