Skip to content

Conversation

@sqwishy
Copy link
Contributor

@sqwishy sqwishy commented Sep 20, 2022

There were a couple issues with the current implementation:

  1. When reading stdout and stderr from the child, as soon as we hit EOF on one we would stop reading from both (line 1420). This could lead to the return value not being read from the job program.

  2. Lines read from stdout and stderr are put into a channel and read elsewhere with rx.recv() (line 1497) but that channel isn't read until empty. It is only read in the while !done.load(...) (line 1449) loop and that loop can stop after any .store(true, ...). Which happens when the child exits, when the job is cancelled, when either stdout or stderr reach EOF...

    This can be verified by putting dbg!(rx.recv().await) or a similar assertion after the while loop before returning from that function. It shows the channel still containing log lines on rare occasions.

I was pretty careful in this to maintain the current behaviour; adding comments to express intention.

One difference in this is that some regular intervals (cancel check and ping update) should be more regular?

Before...

at 00ms wait for 10ms
at 10ms do things for 3ms
at 13ms wait again for 10ms
at 23ms do things again ...

With change...

at 00ms wait for 10ms
at 10ms do things for 3ms
at 13ms wait again but for 7ms
at 20ms do things again ...

Which I'm guessing is preferable but I could be wrong.

There were a couple issues with the current implementation:

1. When reading stdout and stderr from the child, as soon as we hit EOF
   on one we would stop reading from both (line 1420). This could lead
   to the return value not being read from the job program.

2. Lines read from stdout and stderr are put into a channel and read
   elsewhere with `rx.recv()` (line 1497) but that channel isn't read
   until empty.  It is only read in the `while !done.load(...)` (line
   1449) loop and that loop can stop after any `.store(true, ...)`.
   Which happens when the child exits, when the job is cancelled, when
   either stdout or stderr reach EOF...

   This can be verified by putting `dbg!(rx.recv().await)` or a similar
   assertion after the while loop before returning from that function.
   It shows the channel still containing log lines on rare occasions.

I was pretty careful in this to maintain the current behaviour; adding
comments to express intention.

One difference in this is that some regular intervals (cancel check and
ping update) should be more regular?

Before...

> at 00ms wait for 10ms
> at 10ms do things for 3ms
> at 13ms wait again for *10ms*
> at 23ms do things again ...

With change...

> at 00ms wait for 10ms
> at 10ms do things for 3ms
> at 13ms wait again but for *7ms*
> at 20ms do things again ...

Which I'm guessing is preferable but I could be wrong.
@sqwishy sqwishy marked this pull request as ready for review September 20, 2022 04:23
Comment on lines 1488 to 1490
* We want append_logs() to run concurrently while reading output from the
* job process. take_until() should only stop reading logs once either:
* - `write` has resovled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Is append_logs really running concurrently in the sense that there is no new parallel thread spawned to run it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. My understanding is that the future should run as long as it's being polled on, and it's being polled on in the take_until(). However, it's entirely possible that it doesn't even start that the scheduled write future until after sleep(write_logs_delay) is resolved.

Generally speaking something like:

future::join(sleep(Duration::from_secs(1)), sleep(Duration::from_secs(1))).await;

Will run both futures concurrently in the same task until they both finish and will resolve after one second. But I think only because join will poll both of them. Whereas sleep(...).then(bar) will only start polling bar after the sleep resolves. I guess I could just join(sleep(...), db_write_future).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, even won't run under while let Some(line) = output.by_ref().next().await {. So it might be best as a task. If a task panics it it caught in a JoinHandle so it adds another thing to deal with but it may make this scheduling way easier to think about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to use a task. It's still a little ugly.

@rubenfiszel
Copy link
Contributor

at 00ms wait for 10ms
at 10ms do things for 3ms
at 13ms wait again but for 7ms
at 20ms do things again ...

This seems preferable indeed but need to look back at the code why it is the case

@sqwishy
Copy link
Contributor Author

sqwishy commented Sep 20, 2022

This fucking piece of shit CI dude.

Also @rubenfiszel this is an example of some nsjail jank that I've seen locally sometimes.

2022-09-20T23:08:36.8978994Z   left: `Object {"error": String("Error during execution of the script\nlast 5 logs lines:\n[I][2022-09-20T23:08:36+0000] Executing '/usr/local/bin/python3' for '[STANDALONE MODE]'\n[E][2022-09-20T23:08:36+0000][1] void subproc::subprocNewProc(nsjconf_t*, int, int, int, int, int)():211 execve('/usr/local/bin/python3') failed: No such file or directory\n[F][2022-09-20T23:08:36+0000][1] pid_t subproc::runChild(nsjconf_t*, int, int, int, int)():466 Launching child process failed\n[W][2022-09-20T23:08:36+0000][11929] pid_t subproc::runChild(nsjconf_t*, int, int, int, int)():486 Received error message from the child process before it has been executed\n[E][2022-09-20T23:08:36+0000][11929] int nsjail::standaloneMode(nsjconf_t*)():272 Couldn't launch the child process")}`,

aka

[I][2022-09-20T23:08:36+0000] Executing '/usr/local/bin/python3' for '[STANDALONE MODE]'
[E][2022-09-20T23:08:36+0000][1] void subproc::subprocNewProc(nsjconf_t*, int, int, int, int, int)():211 execve('/usr/local/bin/python3') failed: No such file or directory
[F][2022-09-20T23:08:36+0000][1] pid_t subproc::runChild(nsjconf_t*, int, int, int, int)():466 Launching child process failed
[W][2022-09-20T23:08:36+0000][11929] pid_t subproc::runChild(nsjconf_t*, int, int, int, int)():486 Received error message from the child process before it has been executed
[E][2022-09-20T23:08:36+0000][11929] int nsjail::standaloneMode(nsjconf_t*)():272 Couldn't launch the child process

@rubenfiszel
Copy link
Contributor

@sqwishy I think for this error we might want to reproduce this under nsjail more verbose/debug mode. It's pretty annoying, I hope we will not have to patch nsjail...

@sqwishy
Copy link
Contributor Author

sqwishy commented Sep 21, 2022

@rubenfiszel Any changes regarding nsjail debugging will go in a separate branch. CI appears to be failing because it's using an old Go compiler.

If you're fine with everything else in the changes here you can merge it or if you want the CI to pass I can look at writing our Go bootstrapping code to support older compilers and put that in another pull request, let me know.

@rubenfiszel
Copy link
Contributor

rubenfiszel commented Sep 21, 2022 via email

};

/* a future that reads output from the child and appends to the database */
/* this whole section is kind of a mess and could use some love */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this kind of language but list what could be improved if you want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you gave me feedback earlier you said you were surprised that I wrote this the way because it wasn't very clear or something. I shared the general sentiment. This comment was just to be clear about that so that it would be less surprising to readers.

*
* (This looks a bit nicer using try_for_each but the side-effects/capturing
* in FnMut closure seems impractical, _maybe_ a futures::Sink would work but
* I know next to nothing about that.) */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop the comment at would work.

}
let wait_result = tokio::select! {
(w, _) = future::join(wait_on_child, lines) => w,
_ = ping.collect::<()>() => unreachable!("job ping stopped"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unreachable part is weird, could you explain in more details ?
If really unreachable why even select! ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unreachable part is weird, could you explain in more details ?

The stream should repeat forever.

If really unreachable why even select! ?

To poll on the future.

let mut log_remaining = (MAX_LOG_SIZE as usize).saturating_sub(logs.chars().count());
let mut result = io::Result::Ok(());
let mut output = output;
/* write_is_done resolves the task is done, same as write, but does not contain the Result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find what write_is_done was referring to

@rubenfiszel rubenfiszel merged commit e7a6c1b into windmill-labs:main Sep 24, 2022
@github-actions github-actions bot locked and limited conversation to collaborators Sep 24, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants