Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make plotting/replotting thread pools distinct for farms to avoid stack overflow, still protected by semaphores #2167

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 16 additions & 26 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use rayon::ThreadPoolBuilder;
use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
Expand All @@ -29,9 +28,7 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
run_future_in_dedicated_thread, tokio_rayon_spawn_handler, AsyncJoinOnDrop,
};
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
Expand Down Expand Up @@ -134,13 +131,21 @@ pub(crate) struct FarmingArgs {
/// the system
#[arg(long, default_value_t = available_parallelism())]
farming_thread_pool_size: usize,
/// Size of thread pool used for plotting, defaults to number of CPU cores available in the
/// system. This thread pool is global for all farms and generally doesn't need to be changed.
/// Size of PER FARM thread pool used for plotting, defaults to number of CPU cores available
/// in the system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer will plot multiple
/// sectors concurrently, see `--sector-downloading-concurrency` and
/// `--sector-encoding-concurrency` options.
#[arg(long, default_value_t = available_parallelism())]
plotting_thread_pool_size: usize,
/// Size of thread pool used for replotting, typically smaller pool than for plotting to not
/// affect farming as much, defaults to half of the number of CPU cores available in the system.
/// This thread pool is global for all farms and generally doesn't need to be changed.
/// Size of PER FARM thread pool used for replotting, typically smaller pool than for plotting
/// to not affect farming as much, defaults to half of the number of CPU cores available in the
/// system.
///
/// NOTE: The fact that this parameter is per farm doesn't mean farmer will replot multiple
/// sectors concurrently, see `--sector-downloading-concurrency` and
/// `--sector-encoding-concurrency` options.
#[arg(long, default_value_t = available_parallelism() / 2)]
replotting_thread_pool_size: usize,
}
Expand Down Expand Up @@ -423,21 +428,6 @@ where
None => farmer_app_info.protocol_info.max_pieces_in_sector,
};

let plotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("plotting#{thread_index}"))
.num_threads(plotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);
let replotting_thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |thread_index| format!("replotting#{thread_index}"))
.num_threads(replotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()?,
);

let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get()));
let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));

Expand Down Expand Up @@ -467,8 +457,8 @@ where
encoding_semaphore: Arc::clone(&encoding_semaphore),
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool: Arc::clone(&plotting_thread_pool),
replotting_thread_pool: Arc::clone(&replotting_thread_pool),
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay: Some(plotting_delay_receiver),
},
disk_farm_index,
Expand Down
60 changes: 53 additions & 7 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use rayon::ThreadPoolBuilder;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::{File, OpenOptions};
Expand Down Expand Up @@ -268,11 +268,11 @@ pub struct SingleDiskFarmOptions<NC, PG> {
/// Thread pool size used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving)
pub farming_thread_pool_size: usize,
/// Thread pool used for plotting
pub plotting_thread_pool: Arc<ThreadPool>,
/// Thread pool used for replotting, typically smaller pool than for plotting to not affect
/// Thread pool size used for plotting
pub plotting_thread_pool_size: usize,
/// Thread pool size used for replotting, typically smaller pool than for plotting to not affect
/// farming as much
pub replotting_thread_pool: Arc<ThreadPool>,
pub replotting_thread_pool_size: usize,
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
Expand Down Expand Up @@ -592,8 +592,8 @@ impl SingleDiskFarm {
downloading_semaphore,
encoding_semaphore,
farming_thread_pool_size,
plotting_thread_pool,
replotting_thread_pool,
plotting_thread_pool_size,
replotting_thread_pool_size,
plotting_delay,
farm_during_initial_plotting,
} = options;
Expand Down Expand Up @@ -877,6 +877,50 @@ impl SingleDiskFarm {

move || {
let _span_guard = span.enter();
let plotting_thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("plotting-{disk_farm_index}.{thread_index}")
})
.num_threads(plotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(PlottingError::FailedToCreateThreadPool)
{
Ok(thread_pool) => thread_pool,
Err(error) => {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(
%error,
"Plotting failed to send error to background task",
);
}
}
return;
}
};
let replotting_thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("replotting-{disk_farm_index}.{thread_index}")
})
.num_threads(replotting_thread_pool_size)
.spawn_handler(tokio_rayon_spawn_handler())
.build()
.map_err(PlottingError::FailedToCreateThreadPool)
{
Ok(thread_pool) => thread_pool,
Err(error) => {
if let Some(error_sender) = error_sender.lock().take() {
if let Err(error) = error_sender.send(error.into()) {
error!(
%error,
"Plotting failed to send error to background task",
);
}
}
return;
}
};

let plotting_fut = async move {
if start_receiver.recv().await.is_err() {
Expand Down Expand Up @@ -984,6 +1028,7 @@ impl SingleDiskFarm {
let span = span.clone();

move || {
let _span_guard = span.enter();
let thread_pool = match ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("farming-{disk_farm_index}.{thread_index}")
Expand All @@ -1008,6 +1053,7 @@ impl SingleDiskFarm {
};

let handle = Handle::current();
let span = span.clone();
thread_pool.install(move || {
let _span_guard = span.enter();

Expand Down
15 changes: 9 additions & 6 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::{mpsc, oneshot};
use futures::{select, FutureExt, SinkExt, StreamExt};
use lru::LruCache;
use parity_scale_codec::Encode;
use rayon::ThreadPool;
use rayon::{ThreadPool, ThreadPoolBuildError};
use std::collections::HashMap;
use std::fs::File;
use std::io;
Expand Down Expand Up @@ -81,12 +81,15 @@ pub enum PlottingError {
/// Farm is shutting down
#[error("Farm is shutting down")]
FarmIsShuttingDown,
/// I/O error occurred
#[error("I/O error: {0}")]
Io(#[from] io::Error),
/// Low-level plotting error
#[error("Low-level plotting error: {0}")]
LowLevel(#[from] plotting::PlottingError),
/// I/O error occurred
#[error("I/O error: {0}")]
Io(#[from] io::Error),
/// Failed to create thread pool
#[error("Failed to create thread pool: {0}")]
FailedToCreateThreadPool(#[from] ThreadPoolBuildError),
}

pub(super) struct PlottingOptions<NC, PG> {
Expand All @@ -111,8 +114,8 @@ pub(super) struct PlottingOptions<NC, PG> {
/// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
/// allow one permit at a time for efficient CPU utilization
pub(crate) encoding_semaphore: Arc<Semaphore>,
pub(super) plotting_thread_pool: Arc<ThreadPool>,
pub(super) replotting_thread_pool: Arc<ThreadPool>,
pub(super) plotting_thread_pool: ThreadPool,
pub(super) replotting_thread_pool: ThreadPool,
pub(super) stop_receiver: broadcast::Receiver<()>,
}

Expand Down