Skip to content

Commit

Permalink
Revert "Deprecate unused --process-execution-local-enable-nailgun (#…
Browse files Browse the repository at this point in the history
…11600)" (#12257)

Restore native support for launching JVM processes with `nailgun`. @patricklaw will follow up to actually experiment with using this support (and will likely expose plenty of bugs in it! sorry).

This reverts commit e19336d.
  • Loading branch information
stuhood committed Jul 6, 2021
1 parent 751fe01 commit aa8a2cd
Show file tree
Hide file tree
Showing 12 changed files with 1,230 additions and 26 deletions.
10 changes: 0 additions & 10 deletions src/python/pants/bin/loader_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ def test_alternate_entrypoint_bad() -> None:
assert "must be of the form" in pants_run.stderr


def test_alternate_entrypoint_not_callable() -> None:
pants_run = run_pants(
command=["help"],
extra_env={DAEMON_ENTRYPOINT: "pants.bin.loader_integration_test:TEST_STR"},
)
pants_run.assert_failure()
assert "TEST_STR" in pants_run.stderr
assert "not callable" in pants_run.stderr


def exercise_alternate_entrypoint_scrubbing():
"""An alternate test entrypoint for exercising scrubbing."""
print(f"{DAEMON_ENTRYPOINT}={os.environ.get(DAEMON_ENTRYPOINT)}")
Expand Down
7 changes: 1 addition & 6 deletions src/python/pants/bin/pants_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ def run_alternate_entrypoint(entrypoint: str) -> None:

module = importlib.import_module(module_path)
entrypoint_fn = getattr(module, func_name)

try:
entrypoint_fn()
except TypeError:
print(f"{DAEMON_ENTRYPOINT} {func_name} is not callable", file=sys.stderr)
sys.exit(PANTS_FAILED_EXIT_CODE)
entrypoint_fn()

@staticmethod
def run_default_entrypoint() -> None:
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def __init__(
remote_cache_write=execution_options.remote_cache_write,
local_cleanup=execution_options.process_execution_local_cleanup,
local_parallelism=execution_options.process_execution_local_parallelism,
local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
remote_parallelism=execution_options.process_execution_remote_parallelism,
)

Expand Down
10 changes: 10 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class ExecutionOptions:
process_execution_local_cache: bool
process_execution_local_cleanup: bool
process_execution_local_parallelism: int
process_execution_local_enable_nailgun: bool
process_execution_remote_parallelism: int
process_execution_cache_namespace: str | None

Expand Down Expand Up @@ -367,6 +368,7 @@ def from_options(
process_execution_remote_parallelism=dynamic_remote_options.parallelism,
process_execution_local_cleanup=bootstrap_options.process_execution_local_cleanup,
process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
# Remote store setup.
remote_store_address=dynamic_remote_options.store_address,
remote_store_headers=dynamic_remote_options.store_headers,
Expand Down Expand Up @@ -442,6 +444,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
process_execution_cache_namespace=None,
process_execution_local_cleanup=True,
process_execution_local_cache=True,
process_execution_local_enable_nailgun=False,
# Remote store setup.
remote_store_address=None,
remote_store_headers={
Expand Down Expand Up @@ -1047,6 +1050,13 @@ def register_bootstrap_options(cls, register):
"process cache entries from being (re)used for different usecases or users."
),
)
register(
"--process-execution-local-enable-nailgun",
type=bool,
default=DEFAULT_EXECUTION_OPTIONS.process_execution_local_enable_nailgun,
help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.",
advanced=True,
)

register(
"--remote-execution",
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub mod remote_cache;
#[cfg(test)]
mod remote_cache_tests;

pub mod nailgun;

pub mod named_caches;

extern crate uname;
Expand Down
293 changes: 293 additions & 0 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::time::Duration;

use async_trait::async_trait;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt};
use log::{debug, trace};
use nails::execution::{self, child_channel, ChildInput, Command};
use tokio::net::TcpStream;

use crate::local::{CapturedWorkdir, ChildOutput};
use crate::nailgun::nailgun_pool::NailgunProcessName;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
ProcessCacheScope, ProcessMetadata,
};

#[cfg(test)]
pub mod tests;

pub mod nailgun_pool;

mod parsed_jvm_command_lines;
#[cfg(test)]
mod parsed_jvm_command_lines_tests;

use async_semaphore::AsyncSemaphore;
pub use nailgun_pool::NailgunPool;
use parsed_jvm_command_lines::ParsedJVMCommandLines;
use std::net::SocketAddr;

// Hardcoded constants for connecting to nailgun
static NAILGUN_MAIN_CLASS: &str = "com.martiansoftware.nailgun.NGServer";
static ARGS_TO_START_NAILGUN: [&str; 1] = [":0"];

///
/// Constructs the Process that would be used
/// to start the nailgun servers if we needed to.
///
// TODO(#8481) We should calculate the input_files by deeply fingerprinting the classpath.
fn construct_nailgun_server_request(
nailgun_name: &str,
args_for_the_jvm: Vec<String>,
jdk: PathBuf,
platform_constraint: Option<Platform>,
) -> Process {
let mut full_args = args_for_the_jvm;
full_args.push(NAILGUN_MAIN_CLASS.to_string());
full_args.extend(ARGS_TO_START_NAILGUN.iter().map(|&a| a.to_string()));

Process {
argv: full_args,
env: BTreeMap::new(),
working_directory: None,
input_files: hashing::EMPTY_DIGEST,
output_files: BTreeSet::new(),
output_directories: BTreeSet::new(),
timeout: Some(Duration::new(1000, 0)),
description: format!("Start a nailgun server for {}", nailgun_name),
level: log::Level::Info,
append_only_caches: BTreeMap::new(),
jdk_home: Some(jdk),
platform_constraint,
is_nailgunnable: true,
execution_slot_variable: None,
cache_scope: ProcessCacheScope::PerSession,
}
}

fn construct_nailgun_client_request(
original_req: Process,
client_main_class: String,
mut client_args: Vec<String>,
) -> Process {
client_args.insert(0, client_main_class);
Process {
argv: client_args,
jdk_home: None,
..original_req
}
}

///
/// A command runner that can run local requests under nailgun.
///
/// It should only be invoked with local requests.
/// It will read a flag marking an `Process` as nailgunnable.
/// If that flag is set, it will connect to a running nailgun server and run the command there.
/// Otherwise, it will just delegate to the regular local runner.
///
pub struct CommandRunner {
inner: super::local::CommandRunner,
nailgun_pool: NailgunPool,
async_semaphore: async_semaphore::AsyncSemaphore,
metadata: ProcessMetadata,
workdir_base: PathBuf,
executor: task_executor::Executor,
}

impl CommandRunner {
pub fn new(
runner: crate::local::CommandRunner,
metadata: ProcessMetadata,
workdir_base: PathBuf,
executor: task_executor::Executor,
) -> Self {
CommandRunner {
inner: runner,
nailgun_pool: NailgunPool::new(),
async_semaphore: AsyncSemaphore::new(1),
metadata,
workdir_base,
executor,
}
}

// Ensure that the workdir for the given nailgun name exists.
fn get_nailgun_workdir(&self, nailgun_name: &str) -> Result<PathBuf, String> {
let workdir = self.workdir_base.clone().join(nailgun_name);
if workdir.exists() {
debug!("Nailgun workdir {:?} exits. Reusing that...", &workdir);
Ok(workdir)
} else {
debug!("Creating nailgun workdir at {:?}", &workdir);
fs::safe_create_dir_all(&workdir)
.map_err(|err| format!("Error creating the nailgun workdir! {}", err))
.map(|_| workdir)
}
}

// TODO(#8527) Make this a more intentional scope (v2). Using the main class here is fragile,
// because two tasks might want to run the same main class with different input digests.
fn calculate_nailgun_name(main_class: &str) -> NailgunProcessName {
format!("nailgun_server_{}", main_class)
}
}

#[async_trait]
impl super::CommandRunner for CommandRunner {
async fn run(
&self,
req: MultiPlatformProcess,
context: Context,
) -> Result<FallibleProcessResultWithPlatform, String> {
let original_request = self.extract_compatible_request(&req).unwrap();

if !original_request.is_nailgunnable {
trace!("The request is not nailgunnable! Short-circuiting to regular process execution");
return self.inner.run(req, context).await;
}
debug!("Running request under nailgun:\n {:#?}", &original_request);

let executor = self.executor.clone();
let store = self.inner.store.clone();
let ParsedJVMCommandLines {
client_main_class, ..
} = ParsedJVMCommandLines::parse_command_lines(&original_request.argv)?;
let nailgun_name = CommandRunner::calculate_nailgun_name(&client_main_class);
let workdir_for_this_nailgun = self.get_nailgun_workdir(&nailgun_name)?;

self
.run_and_capture_workdir(
original_request,
context,
store,
executor,
true,
&workdir_for_this_nailgun,
Platform::current().unwrap(),
)
.await
}

fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
// Request compatibility should be the same as for the local runner, so we just delegate this.
self.inner.extract_compatible_request(req)
}
}

#[async_trait]
impl CapturedWorkdir for CommandRunner {
fn named_caches(&self) -> &NamedCaches {
self.inner.named_caches()
}

async fn run_in_workdir<'a, 'b, 'c>(
&'a self,
workdir_path: &'b Path,
req: Process,
context: Context,
_exclusive_spawn: bool,
) -> Result<BoxStream<'c, Result<ChildOutput, String>>, String> {
// Separate argument lists, to form distinct EPRs for (1) starting the nailgun server and (2) running the client in it.
let ParsedJVMCommandLines {
nailgun_args,
client_args,
client_main_class,
} = ParsedJVMCommandLines::parse_command_lines(&req.argv)?;

let nailgun_name = CommandRunner::calculate_nailgun_name(&client_main_class);
let nailgun_name2 = nailgun_name.clone();
let nailgun_name3 = nailgun_name.clone();
let client_workdir = if let Some(working_directory) = &req.working_directory {
workdir_path.join(working_directory)
} else {
workdir_path.to_path_buf()
};

let jdk_home = req
.jdk_home
.clone()
.ok_or("JDK home must be specified for all nailgunnable requests.")?;
let nailgun_req = construct_nailgun_server_request(
&nailgun_name,
nailgun_args,
jdk_home,
req.platform_constraint,
);
trace!("Extracted nailgun request:\n {:#?}", &nailgun_req);

let nailgun_req_digest = crate::digest(
MultiPlatformProcess::from(nailgun_req.clone()),
&self.metadata,
);

let nailgun_pool = self.nailgun_pool.clone();
let req2 = req.clone();
let workdir_for_this_nailgun = self.get_nailgun_workdir(&nailgun_name)?;
let build_id = context.build_id;
let store = self.inner.store.clone();

let mut child = self
.async_semaphore
.clone()
.with_acquired(move |_id| {
// Get the port of a running nailgun server (or a new nailgun server if it doesn't exist)
nailgun_pool.connect(
nailgun_name.clone(),
nailgun_req,
workdir_for_this_nailgun,
nailgun_req_digest,
build_id,
store,
req.input_files,
)
})
.map_err(|e| format!("Failed to connect to nailgun! {}", e))
.inspect(move |_| debug!("Connected to nailgun instance {}", &nailgun_name3))
.and_then(move |nailgun_port| {
// Run the client request in the nailgun we have active.
debug!("Got nailgun port {} for {}", nailgun_port, nailgun_name2);
let client_req = construct_nailgun_client_request(req2, client_main_class, client_args);
let cmd = Command {
command: client_req.argv[0].clone(),
args: client_req.argv[1..].to_vec(),
env: client_req
.env
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
working_dir: client_workdir,
};
trace!("Client request: {:#?}", client_req);
let addr: SocketAddr = format!("127.0.0.1:{:?}", nailgun_port).parse().unwrap();
debug!("Connecting to server at {}...", addr);
TcpStream::connect(addr)
.and_then(move |stream| {
nails::client::handle_connection(nails::Config::default(), stream, cmd, async {
let (_stdin_write, stdin_read) = child_channel::<ChildInput>();
stdin_read
})
})
.map_err(|e| format!("Error communicating with server: {}", e))
})
.await?;

let output_stream = child
.output_stream
.take()
.unwrap()
.map(|output| match output {
execution::ChildOutput::Stdout(bytes) => Ok(ChildOutput::Stdout(bytes)),
execution::ChildOutput::Stderr(bytes) => Ok(ChildOutput::Stderr(bytes)),
});
let exit_code = child
.wait()
.map_ok(ChildOutput::Exit)
.map_err(|e| format!("Error communicating with server: {}", e));

Ok(futures::stream::select(output_stream, exit_code.into_stream()).boxed())
}
}

0 comments on commit aa8a2cd

Please sign in to comment.