Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make futures_helper_thread not Resettable #7811

Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -45,7 +45,7 @@ pub struct CommandRunner {
execution_client: Arc<bazel_protos::remote_execution_grpc::ExecutionClient>,
operations_client: Arc<bazel_protos::operations_grpc::OperationsClient>,
store: Store,
futures_timer_thread: resettable::Resettable<futures_timer::HelperThread>,
futures_timer_thread: Arc<futures_timer::HelperThread>,
}

#[derive(Debug, PartialEq)]
@@ -248,7 +248,7 @@ impl super::CommandRunner for CommandRunner {
// maybe the delay here should be the min of remaining time and the backoff period
Delay::new_handle(
Instant::now() + Duration::from_millis(backoff_period),
futures_timer_thread.with(futures_timer::HelperThread::handle),
futures_timer_thread.handle(),
)
.map_err(move |e| {
format!(
@@ -312,7 +312,7 @@ impl CommandRunner {
platform_properties: BTreeMap<String, String>,
thread_count: usize,
store: Store,
futures_timer_thread: resettable::Resettable<futures_timer::HelperThread>,
futures_timer_thread: Arc<futures_timer::HelperThread>,

This comment has been minimized.

Copy link
@stuhood

stuhood May 29, 2019

Member

Unrelated, but: now that we guarantee the tokio runtime everywhere, we should definitely switch to using tokio's timers.

This comment has been minimized.

Copy link
@illicitonion
) -> CommandRunner {
let env = Arc::new(grpcio::Environment::new(thread_count));
let channel = {
@@ -926,6 +926,7 @@ mod tests {
use std::iter::{self, FromIterator};
use std::ops::Sub;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, PartialEq)]
@@ -1469,7 +1470,7 @@ mod tests {
Duration::from_secs(1),
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
timer_thread.with(|t| t.handle()),
timer_thread.handle(),
)
.expect("Failed to make store");

@@ -1835,7 +1836,7 @@ mod tests {
Duration::from_secs(1),
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
timer_thread.with(|t| t.handle()),
timer_thread.handle(),
)
.expect("Failed to make store");
runtime
@@ -1933,7 +1934,7 @@ mod tests {
Duration::from_secs(1),
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
timer_thread.with(|t| t.handle()),
timer_thread.handle(),
)
.expect("Failed to make store");
store
@@ -2005,7 +2006,7 @@ mod tests {
Duration::from_secs(1),
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
timer_thread.with(|t| t.handle()),
timer_thread.handle(),
)
.expect("Failed to make store");

@@ -2615,7 +2616,7 @@ mod tests {
Duration::from_secs(1),
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(),
1,
timer_thread.with(|t| t.handle()),
timer_thread.handle(),
)
.expect("Failed to make store");

@@ -2632,8 +2633,8 @@ mod tests {
)
}

fn timer_thread() -> resettable::Resettable<futures_timer::HelperThread> {
resettable::Resettable::new(|| futures_timer::HelperThread::new().unwrap())
fn timer_thread() -> Arc<futures_timer::HelperThread> {
Arc::new(futures_timer::HelperThread::new().unwrap())
}

fn extract_execute_response(
@@ -39,6 +39,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::iter::Iterator;
use std::path::PathBuf;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

/// A binary which takes args of format:
@@ -211,7 +212,7 @@ fn main() {
.value_of("local-store-path")
.map(PathBuf::from)
.unwrap_or_else(fs::Store::default_path);
let timer_thread = resettable::Resettable::new(|| futures_timer::HelperThread::new().unwrap());
let timer_thread = Arc::new(futures_timer::HelperThread::new().unwrap());
let server_arg = args.value_of("server");
let remote_instance_arg = args.value_of("remote-instance-name").map(str::to_owned);
let output_files = if let Some(values) = args.values_of("output-file-path") {
@@ -254,7 +255,7 @@ fn main() {
// TODO: Take a command line arg.
fs::BackoffConfig::new(Duration::from_secs(1), 1.2, Duration::from_secs(20)).unwrap(),
3,
timer_thread.with(futures_timer::HelperThread::handle),
timer_thread.handle(),
)
}
(None, None) => fs::Store::local_only(local_store_path),
@@ -308,7 +309,7 @@ fn main() {
platform_properties,
1,
store.clone(),
timer_thread,
timer_thread.clone(),
)) as Box<dyn process_execution::CommandRunner>
}
None => Box::new(process_execution::local::CommandRunner::new(
@@ -44,7 +44,7 @@ pub struct Core {
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
runtime: Resettable<Arc<RwLock<Runtime>>>,
pub futures_timer_thread: Resettable<futures_timer::HelperThread>,
pub futures_timer_thread: Arc<futures_timer::HelperThread>,
store_and_command_runner_and_http_client:
Resettable<(Store, BoundedCommandRunner, reqwest::r#async::Client)>,
pub vfs: PosixFS,
@@ -103,7 +103,7 @@ impl Core {
None
};

let futures_timer_thread = Resettable::new(|| futures_timer::HelperThread::new().unwrap());
let futures_timer_thread = Arc::new(futures_timer::HelperThread::new().unwrap());
let futures_timer_thread2 = futures_timer_thread.clone();
let store_and_command_runner_and_http_client = Resettable::new(move || {
let local_store_dir = local_store_dir.clone();
@@ -126,7 +126,7 @@ impl Core {
fs::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10))
.unwrap(),
remote_store_rpc_retries,
futures_timer_thread2.with(futures_timer::HelperThread::handle),
futures_timer_thread2.handle(),
)
}
})
@@ -143,7 +143,7 @@ impl Core {
// Allow for some overhead for bookkeeping threads (if any).
process_execution_parallelism + 2,
store.clone(),
futures_timer_thread2.clone(),
futures_timer_thread2.clone(), // TODO remove clone once the store and http are not resettables
)),
None => Box::new(process_execution::local::CommandRunner::new(
store.clone(),
@@ -196,12 +196,10 @@ impl Core {
debug!("Waiting to enter fork_context...");
thread::sleep(Duration::from_millis(10));
}
let t = self.futures_timer_thread.with_reset(|| {
self.runtime.with_reset(|| {
self
.graph
.with_exclusive(|| self.store_and_command_runner_and_http_client.with_reset(f))
})
let t = self.runtime.with_reset(|| {
self
.graph
.with_exclusive(|| self.store_and_command_runner_and_http_client.with_reset(f))
});
self
.graph
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.