Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option for specifying environment variable with concurrency slot #10297

Merged
merged 4 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/python/pants/backend/python/rules/pytest_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TestTargetSetup:
timeout_seconds: Optional[int]
xml_dir: Optional[str]
junit_family: str
execution_slot_variable: str

# Prevent this class from being detected by pytest as a test class.
__test__ = False
Expand Down Expand Up @@ -201,6 +202,7 @@ async def setup_pytest_for_target(
timeout_seconds=field_set.timeout.calculate_from_global_options(pytest),
xml_dir=pytest.options.junit_xml_dir,
junit_family=pytest.options.junit_family,
execution_slot_variable=pytest.options.execution_slot_var,
)


Expand Down Expand Up @@ -247,6 +249,7 @@ async def run_python_test(
description=f"Run Pytest for {field_set.address.reference()}",
timeout_seconds=test_setup.timeout_seconds,
env=env,
execution_slot_variable=test_setup.execution_slot_variable,
)
result = await Get(FallibleProcessResult, Process, process)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import os
import re
from pathlib import Path, PurePath
from textwrap import dedent
from typing import List, Optional
Expand Down Expand Up @@ -140,6 +141,7 @@ def run_pytest(
origin: Optional[OriginSpec] = None,
junit_xml_dir: Optional[str] = None,
use_coverage: bool = False,
execution_slot_var: Optional[str] = None,
) -> TestResult:
args = [
"--backend-packages=pants.backend.python",
Expand All @@ -154,6 +156,9 @@ def run_pytest(
args.append(f"--pytest-junit-xml-dir={junit_xml_dir}")
if use_coverage:
args.append("--test-use-coverage")
if execution_slot_var:
args.append(f"--pytest-execution-slot-var={execution_slot_var}")

options_bootstrapper = create_options_bootstrapper(args=args)
address = Address(self.package, "target")
if origin is None:
Expand Down Expand Up @@ -371,3 +376,22 @@ def test_coverage(self) -> None:
assert result.status == Status.SUCCESS
assert f"{self.package}/test_good.py ." in result.stdout
assert result.coverage_data is not None

def test_execution_slot_variable(self) -> None:
source = FileContent(
path="test_concurrency_slot.py",
content=dedent(
"""\
import os

def test_fail_printing_slot_env_var():
slot = os.getenv("SLOT")
print(f"Value of slot is {slot}")
# Deliberately fail the test so the SLOT output gets printed to stdout
assert 1 == 2
"""
).encode(),
)
self.create_python_test_target([source])
result = self.run_pytest(execution_slot_var="SLOT")
assert re.search(r"Value of slot is \d+", result.stdout)
7 changes: 7 additions & 0 deletions src/python/pants/backend/python/subsystems/pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def register_options(cls, register):
advanced=True,
help="The format of the generated XML file. See https://docs.pytest.org/en/latest/reference.html#confval-junit_family.",
)
register(
"--execution-slot-var",
type=str,
default="",
advanced=True,
help="If a non-empty string, the process execution slot id (an integer) will be exposed to tests under this environment variable name.",
Copy link
Sponsor Member

@stuhood stuhood Jul 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be a bit cleaner for this to default to None (Optional[str]) here and in the Process constructor, even if inside of Process it gets converted into the empty string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the options type kwarg know how to interpret modern Python typing type annotations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. You keep type=str and set default=None. The type will end up being Optional[str].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this might be longer than 100 characters. Appreciated if you can split this up into multiple lines. I like the style of wrapping in parentheses:

help=(
    "blah blah"
    "blah"
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be caught by the linter if it was?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. We turned off the check for Flake8 because it doesn't work well with Black. Black is careful when formatting strings and doesn't touch them generally.

)

def get_requirement_strings(self) -> Tuple[str, ...]:
"""Returns a tuple of requirements-style strings for Pytest and Pytest plugins."""
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Process:
timeout_seconds: Union[int, float]
jdk_home: Optional[str]
is_nailgunnable: bool
execution_slot_variable: str

def __init__(
self,
Expand All @@ -48,6 +49,7 @@ def __init__(
timeout_seconds: Optional[Union[int, float]] = None,
jdk_home: Optional[str] = None,
is_nailgunnable: bool = False,
execution_slot_variable: str = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to make this Optional[str]. It expresses intent better. See jdk_home as an example.

) -> None:
"""Request to run a subprocess, similar to subprocess.Popen.

Expand Down Expand Up @@ -86,6 +88,7 @@ def __init__(
self.timeout_seconds = timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1
self.jdk_home = jdk_home
self.is_nailgunnable = is_nailgunnable
self.execution_slot_variable = execution_slot_variable


@frozen_after_init
Expand Down
19 changes: 10 additions & 9 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ impl AsyncSemaphore {
///
pub async fn with_acquired<F, B, O>(self, f: F) -> O
where
F: FnOnce() -> B + Send + 'static,
F: FnOnce(usize) -> B + Send + 'static,
B: Future<Output = O> + Send + 'static,
{
let permit = self.acquire().await;
let res = f().await;
let res = f(permit.id).await;
drop(permit);
res
}
Expand All @@ -93,6 +93,7 @@ impl AsyncSemaphore {

pub struct Permit {
inner: Arc<Mutex<Inner>>,
id: usize,
}

impl Drop for Permit {
Expand Down Expand Up @@ -145,7 +146,7 @@ impl Future for PermitFuture {
inner.waiters.push_back(this_waiter);
}
if inner.available_permits == 0 {
false
None
} else {
let will_issue_permit = {
if let Some(front_waiter) = inner.waiters.front() {
Expand All @@ -157,24 +158,24 @@ impl Future for PermitFuture {
// queue, so we don't have to waste time searching for it in the Drop
// handler.
self.waiter_id = None;
true
Some(inner.available_permits)
} else {
// Don't issue a permit to this task if it isn't at the head of the line,
// we added it as a waiter above.
false
None
}
} else {
false
None
}
};
if will_issue_permit {
if will_issue_permit.is_some() {
inner.available_permits -= 1;
}
will_issue_permit
}
};
if acquired {
Poll::Ready(Permit { inner })
if let Some(id) = acquired {
Poll::Ready(Permit { inner, id })
} else {
Poll::Pending
}
Expand Down
64 changes: 57 additions & 7 deletions src/rust/engine/async_semaphore/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,57 @@ use tokio::time::{delay_for, timeout};
async fn acquire_and_release() {
let sema = AsyncSemaphore::new(1);

sema.with_acquired(|| future::ready(())).await;
sema.with_acquired(|_id| future::ready(())).await;
}

#[tokio::test]
async fn correct_semaphor_slot_ids() {
let sema = AsyncSemaphore::new(2);
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx4, rx4) = oneshot::channel();

//Process 1
tokio::spawn(sema.clone().with_acquired(move |id| async move {
tx1.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
future::ready(())
}));
//Process 2
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(Duration::from_millis(10)).await;
tx2.send(id).unwrap();
future::ready(())
}));
//Process 3
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(Duration::from_millis(10)).await;
tx3.send(id).unwrap();
future::ready(())
}));

delay_for(Duration::from_millis(50)).await;

//Process 4
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(Duration::from_millis(10)).await;
tx4.send(id).unwrap();
future::ready(())
}));

let id1 = rx1.await.unwrap();
let id2 = rx2.await.unwrap();
let id3 = rx3.await.unwrap();
let id4 = rx4.await.unwrap();

// Process 1 should get ID 2, then process 2 should run with id 1 and complete, then process 3
// should run in the same "slot" as process 2 and get the same id. Process 4 is scheduled
// later and gets put into "slot" 2.
assert_eq!(id1, 2);
assert_eq!(id2, 1);
assert_eq!(id3, 1);
assert_eq!(id4, 2);
}

#[tokio::test]
Expand All @@ -25,7 +75,7 @@ async fn at_most_n_acquisitions() {
let (unblock_thread1, rx_thread1) = oneshot::channel();
let (tx_thread2, acquired_thread2) = oneshot::channel();

tokio::spawn(handle1.with_acquired(move || {
tokio::spawn(handle1.with_acquired(move |_id| {
async {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread1.send(()).unwrap();
Expand All @@ -39,7 +89,7 @@ async fn at_most_n_acquisitions() {
panic!("thread1 didn't acquire.");
}

tokio::spawn(handle2.with_acquired(move || {
tokio::spawn(handle2.with_acquired(move |_id| {
tx_thread2.send(()).unwrap();
future::ready(())
}));
Expand Down Expand Up @@ -91,7 +141,7 @@ async fn drop_while_waiting() {
let (unblock_thread3, rx_thread3) = oneshot::channel();
let (tx_thread2_attempt_1, did_not_acquire_thread2_attempt_1) = oneshot::channel();

tokio::spawn(handle1.with_acquired(move || {
tokio::spawn(handle1.with_acquired(move |_id| {
async {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread1.send(()).unwrap();
Expand Down Expand Up @@ -119,7 +169,7 @@ async fn drop_while_waiting() {
})
}));

tokio::spawn(handle3.with_acquired(move || {
tokio::spawn(handle3.with_acquired(move |_id| {
async {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread3.send(()).unwrap();
Expand Down Expand Up @@ -152,7 +202,7 @@ async fn dropped_future_is_removed_from_queue() {
let (tx_thread2, gave_up_thread2) = oneshot::channel();
let (unblock_thread2, rx_thread2) = oneshot::channel();

let join_handle1 = tokio::spawn(handle1.with_acquired(move || {
let join_handle1 = tokio::spawn(handle1.with_acquired(move |_id| {
async {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread1.send(()).unwrap();
Expand All @@ -165,7 +215,7 @@ async fn dropped_future_is_removed_from_queue() {
if let Err(_) = timeout(Duration::from_secs(5), acquired_thread1).await {
panic!("thread1 didn't acquire.");
}
let waiter = handle2.with_acquired(|| future::ready(()));
let waiter = handle2.with_acquired(|_id| future::ready(()));
let join_handle2 = tokio::spawn(async move {
match future::select(delay_for(Duration::from_millis(100)), waiter.boxed()).await {
future::Either::Left(((), waiter_future)) => {
Expand Down
17 changes: 15 additions & 2 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ pub struct Process {

pub timeout: Option<std::time::Duration>,

/// If not None, then if a BoundedCommandRunner executes this Process
pub execution_slot_variable: Option<String>,

#[derivative(PartialEq = "ignore", Hash = "ignore")]
pub description: String,

Expand Down Expand Up @@ -284,6 +287,7 @@ impl Process {
jdk_home: None,
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
}
}

Expand Down Expand Up @@ -511,7 +515,7 @@ impl CommandRunner for BoundedCommandRunner {

async fn run(
&self,
req: MultiPlatformProcess,
mut req: MultiPlatformProcess,
context: Context,
) -> Result<FallibleProcessResultWithPlatform, String> {
let name = format!("{}-waiting", req.workunit_name());
Expand All @@ -532,7 +536,7 @@ impl CommandRunner for BoundedCommandRunner {
let context = context.clone();
let name = format!("{}-running", req.workunit_name());

semaphore.with_acquired(move || {
semaphore.with_acquired(move |concurrency_id| {
let metadata = WorkunitMetadata {
desc: Some(desc),
message: None,
Expand All @@ -556,6 +560,15 @@ impl CommandRunner for BoundedCommandRunner {
},
};

for (_, process) in req.0.iter_mut() {
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
let execution_slot = format!("{}", concurrency_id);
process
.env
.insert(execution_slot_env_var.clone(), execution_slot);
}
}

with_workunit(
context.workunit_store.clone(),
name,
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ async fn jdk_symlink() {
jdk_home: Some(preserved_work_tmpdir.path().to_path_buf()),
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -500,6 +501,7 @@ async fn timeout() {
jdk_home: None,
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -546,6 +548,7 @@ async fn working_directory() {
jdk_home: None,
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
},
work_dir.path().to_owned(),
true,
Expand Down