Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue kinds #17

Merged
merged 3 commits into from Feb 3, 2020
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -54,6 +54,17 @@ A `match` block supports the following options:
errors running per-repo programs will be sent (warning: full stderr/stdout
will be sent, so consider carefully whether these have sensitive information
or not).
* `queue = <evict|parallel|sequential>` optionally specifies what to do when
multiple requests for the same repository are queued at once:
* `evict`: only run one job for this repository at a time. Additional jobs
will stay on the queue: if a new job comes in for that repository, it
evicts any previously queued jobs for that repository. In other words,
for this repository there can be at most one running job and one queued
job at any point.
* `parallel`: run as many jobs for this repository in parallel as
possible.
* `sequential`: only run one job for this repository at a time. Additional
jobs will stay on the queue and be executed in FIFO order.
* `secret = "<secret>"` is an optional GitHub secret which guarantees that
hooks are coming from your GitHub repository and not a malfeasant. Although
this is optional, we *highly* recommend setting it in all cases.
@@ -66,17 +77,21 @@ before any user `match` blocks:

```
match ".*" {
queue = sequential
timeout = 3600
}
```

The minimal configuration file is thus:
The minimal recommended configuration file is thus:

```
port = <port>
github {
reposdir = "<path>"
match ".*" {
email = "<email>"
}
}
```

@@ -102,10 +117,12 @@ github {
then the repositories will have the following settings:

* `a/b`:
* `queue = sequential`
* `timeout = 3600`
* `email = "ghi@jkl.com"`
* `secret = "sec"`
* `c/d`:
* `queue = sequential`
* `timeout = 3600`
* `email = "abc@def.com"`
* `secret = "sec"`
@@ -5,6 +5,7 @@
\{ "{"
\} "}"
email "EMAIL"
evict "EVICT"
github "GITHUB"
match "MATCH"
maxjobs "MAXJOBS"
@@ -13,6 +14,7 @@ port "PORT"
queue "QUEUE"
reposdir "REPOSDIR"
secret "SECRET"
sequential "SEQUENTIAL"
timeout "TIMEOUT"
//.*?$ ;
[ \t\n\r]* ;
@@ -201,6 +201,7 @@ impl GitHub {
}
};
let mut email = None;
let mut queuekind = None;
let mut secret = None;
let mut timeout = None;
for opt in m.options {
@@ -217,6 +218,20 @@ impl GitHub {
let email_str = &email_str[1..email_str.len() - 1];
email = Some(email_str.to_owned());
}
config_ast::PerRepoOption::Queue(lexeme, qkind) => {
if queuekind.is_some() {
return Err(error_at_lexeme(
lexer,
lexeme,
"Mustn't specify 'queue' more than once",
));
}
queuekind = Some(match qkind {
config_ast::QueueKind::Evict => QueueKind::Evict,
config_ast::QueueKind::Parallel => QueueKind::Parallel,
config_ast::QueueKind::Sequential => QueueKind::Sequential,
});
}
config_ast::PerRepoOption::Secret(lexeme) => {
if secret.is_some() {
return Err(error_at_lexeme(
@@ -269,6 +284,7 @@ impl GitHub {
matches.push(Match {
re,
email,
queuekind,
secret,
timeout,
});
@@ -288,13 +304,17 @@ impl GitHub {
pub fn repoconfig<'a>(&'a self, owner: &str, repo: &str) -> (RepoConfig, Option<&'a SecStr>) {
let s = format!("{}/{}", owner, repo);
let mut email = None;
let mut queuekind = None;
let mut secret = None;
let mut timeout = None;
for m in &self.matches {
if m.re.is_match(&s) {
if let Some(ref e) = m.email {
email = Some(e.clone());
}
if let Some(q) = m.queuekind {
queuekind = Some(q);
}
if let Some(ref s) = m.secret {
secret = Some(s);
}
@@ -307,6 +327,7 @@ impl GitHub {
(
RepoConfig {
email,
queuekind: queuekind.unwrap(),
timeout: timeout.unwrap(),
},
secret,
@@ -319,6 +340,8 @@ pub struct Match {
re: Regex,
/// An optional email address to send errors to.
email: Option<String>,
/// The queue kind.
queuekind: Option<QueueKind>,
/// The GitHub secret used to validate requests.
secret: Option<SecStr>,
/// The maximum time to allow a command to run for before it is terminated (in seconds).
@@ -331,6 +354,7 @@ impl Default for Match {
Match {
re,
email: None,
queuekind: Some(QueueKind::Sequential),
secret: None,
timeout: Some(DEFAULT_TIMEOUT),
}
@@ -353,5 +377,13 @@ fn error_at_lexeme(lexer: &dyn Lexer<StorageT>, lexeme: Lexeme<StorageT>, msg: &
/// The configuration for a given repository.
pub struct RepoConfig {
pub email: Option<String>,
pub queuekind: QueueKind,
pub timeout: u64,
}

#[derive(Clone, Copy)]
pub enum QueueKind {
Evict,
Parallel,
Sequential,
}
@@ -51,14 +51,24 @@ PerRepoOptions -> Result<Vec<PerRepoOption<StorageT>>, ()>:

PerRepoOption -> Result<PerRepoOption<StorageT>, ()>:
"EMAIL" "=" "STRING" { Ok(PerRepoOption::Email(map_err($3)?)) }
| "QUEUE" "=" QueueKind {
let (lexeme, qkind) = $3?;
Ok(PerRepoOption::Queue(lexeme, qkind))
}
| "SECRET" "=" "STRING" { Ok(PerRepoOption::Secret(map_err($3)?)) }
| "TIMEOUT" "=" "INT" { Ok(PerRepoOption::Timeout(map_err($3)?)) }
;

QueueKind -> Result<(Lexeme<StorageT>, QueueKind), ()>:
"EVICT" { Ok((map_err($1)?, QueueKind::Evict)) }
| "PARALLEL" { Ok((map_err($1)?, QueueKind::Parallel)) }
| "SEQUENTIAL" { Ok((map_err($1)?, QueueKind::Sequential)) }
;

%%
use lrpar::Lexeme;

use crate::config_ast::{TopLevelOption, Match, PerRepoOption, ProviderOption};
use crate::config_ast::{TopLevelOption, Match, PerRepoOption, ProviderOption, QueueKind};

type StorageT = u8;

@@ -21,6 +21,13 @@ pub struct Match<StorageT> {

pub enum PerRepoOption<StorageT> {
Email(Lexeme<StorageT>),
Queue(Lexeme<StorageT>, QueueKind),
Secret(Lexeme<StorageT>),
Timeout(Lexeme<StorageT>),
}

pub enum QueueKind {
Evict,
Parallel,
Sequential,
}
@@ -245,18 +245,24 @@ impl JobRunner {
/// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or
/// `false` otherwise.
fn try_pop_queue(&mut self) -> bool {
// Note that the various `unwrap` calls to `queue.lock()` are acceptable because if if it
// fails it means that something has gone so seriously wrong in the other thread that
// there's no likelihood that we can recover.

let snare = Arc::clone(&self.snare);
let mut queue = snare.queue.lock().unwrap();
loop {
let pjob = self.snare.queue.lock().unwrap().pop();
if self.num_running == self.maxjobs && !queue.is_empty() {
return false;
}
let pjob = queue.pop(|path| {
self.running.iter().any(|jobslot| {
if let Some(job) = jobslot {
path == job.path
} else {
false
}
})
});
match pjob {
Some(qj) => {
if self.num_running == self.maxjobs {
self.snare.queue.lock().unwrap().push_front(qj);
return false;
}
debug_assert!(self.num_running < self.maxjobs);
match self.try_job(qj) {
Ok(j) => {
// The unwrap is safe since we've already checked that there's room to
@@ -268,7 +274,7 @@ impl JobRunner {
}
Err(Some(qj)) => {
// The job couldn't be run for temporary reasons: we'll retry later.
self.snare.queue.lock().unwrap().push_front(qj);
queue.push_front(qj);
return false;
}
Err(None) => {
@@ -280,7 +286,12 @@ impl JobRunner {
}
}
}
None => return true,
None => {
// We weren't able to pop any jobs from the queue, but that doesn't mean that
// the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in
// it which can't be popped until others with the same path have completed.
return queue.is_empty();
}
}
}
}
@@ -311,7 +322,7 @@ impl JobRunner {
if let Ok(stderrout_file) = tempfile() {
if set_nonblock(stderrout_file.as_raw_fd()).is_ok() {
if let Some(json_path_str) = json_path.to_str() {
let child = match Command::new(qj.path)
let child = match Command::new(qj.path.clone())
.arg(qj.event_type)
.arg(json_path_str)
.current_dir(tempdir.path())
@@ -354,6 +365,7 @@ impl JobRunner {
.unwrap();

return Ok(Job {
path: qj.path,
finish_by,
child,
_tempdir: tempdir,
@@ -463,6 +475,8 @@ impl JobRunner {
}

struct Job {
/// The path of the script we are running.
path: String,
/// What time must this Job have completed by? If it exceeds this time, it will be terminated.
finish_by: Instant,
/// The child process itself.
@@ -3,7 +3,7 @@ use std::{
time::Instant,
};

use crate::config::RepoConfig;
use crate::config::{QueueKind, RepoConfig};

pub(crate) struct QueueJob {
pub path: String,
@@ -40,25 +40,48 @@ impl Queue {
Queue { q: HashMap::new() }
}

/// Are there any jobs in the queue?
pub fn is_empty(&self) -> bool {
for v in self.q.values() {
if !v.is_empty() {
return false;
}
}
true
}

/// For the per-repo program at `path`, push a new request.
pub fn push_back(&mut self, qj: QueueJob) {
self.q
.entry(qj.path.clone())
.or_insert_with(VecDeque::new)
.push_back(qj);
let mut entry = self.q.entry(qj.path.clone());

match qj.rconf.queuekind {
QueueKind::Evict => {
entry = entry.and_modify(|v| v.clear());
}
QueueKind::Parallel | QueueKind::Sequential => (),
}
entry.or_insert_with(VecDeque::new).push_back(qj);
}

/// For the per-repo program at `path`, push an old request that has had to be requeued due to
/// a (hopefully) temporary error.
/// a (hopefully) temporary error. In order that jobs are not unnecessarily pushed on the queue
/// (which could happen with the `Evict` queue kind), the lock on `self` should be held between
/// calls to `pop` and `push_front`.
pub fn push_front(&mut self, qj: QueueJob) {
self.q
.entry(qj.path.clone())
.or_insert_with(VecDeque::new)
.push_front(qj);
}

/// If the queue has one or more entries, pop one and return it, or `None` otherwise.
pub fn pop(&mut self) -> Option<QueueJob> {
/// If the queue has a runnable entry, pop and return it, or `None` otherwise. Note that `None`
/// does not guarantee that the queue is empty: it may mean that there are queued jobs that
/// can't be run until existing jobs finish. `running(path)` is a function which must return
/// `true` if a job at `path` is currently running and `false` otherwise.
pub fn pop<F>(&mut self, running: F) -> Option<QueueJob>
where
F: Fn(&str) -> bool,
{
// We find the oldest element in the queue and pop that.
let mut earliest_time = None;
let mut earliest_key = None;
@@ -69,6 +92,14 @@ impl Queue {
continue;
}
}
match qj.rconf.queuekind {
QueueKind::Parallel => (),
QueueKind::Evict | QueueKind::Sequential => {
if running(&qj.path) {
continue;
}
}
}
earliest_time = Some(qj.req_time);
earliest_key = Some(k.clone());
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.