Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store outputs in a content-addressed store #33

Merged
merged 24 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5d003fc
remove the intermediate not-glue rbt layer
BrianHicks Aug 17, 2022
60408cd
RunnableJob -> Job
BrianHicks Aug 17, 2022
8b4e5bc
keep a basic store in the coordinator
BrianHicks Aug 18, 2022
268cbbc
store a mapping from input hashes to content
BrianHicks Aug 18, 2022
d413dde
make IDs a little stabler for better cache hits
BrianHicks Aug 18, 2022
58cb549
note for later
BrianHicks Aug 18, 2022
4d3b98f
copy from workspace to store
BrianHicks Aug 18, 2022
aaf8e79
make sure the root we're passed exists
BrianHicks Aug 18, 2022
ae151e2
deal with failures while reading
BrianHicks Aug 18, 2022
8ce067c
use aliases
BrianHicks Aug 18, 2022
88780fa
get rid of a couple unnecessary println! calls
BrianHicks Aug 18, 2022
11f1aef
this is just a quick lookup away now
BrianHicks Aug 18, 2022
0b5adf1
only run the job if we don't have an output
BrianHicks Aug 18, 2022
7a41259
add a logging framework
BrianHicks Aug 18, 2022
5607a70
replace println! with the log framework
BrianHicks Aug 18, 2022
b56531f
sprinkle some log lines around for debugging
BrianHicks Aug 18, 2022
9399b0f
use the nicer workspace consumer
BrianHicks Aug 18, 2022
19d8b7f
this was already done
BrianHicks Aug 18, 2022
44f8466
correct typo
BrianHicks Aug 18, 2022
c55310e
don't re-run if we already have the output
BrianHicks Aug 18, 2022
0d551fb
input files aren't actually handled yet
BrianHicks Aug 18, 2022
fdd4674
output path needs to participate in content hash
BrianHicks Aug 18, 2022
0f13d25
📎
BrianHicks Aug 18, 2022
801289b
store consumes workspace again
BrianHicks Aug 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ edition = "2018"

[dependencies]
anyhow = "1.0"
blake3 = "1.3.1"
byteorder = "1.4"
clap = { version = "3.2.16", features = ["color", "suggestions", "unstable-v4", "env", "cargo", "derive"] }
digest = "0.10"
itertools = "0.10.3"
libc = "0.2"
log = { version = "0.4.17", features = ["max_level_trace", "release_max_level_info"] }
notify = "4"
roc_std = { path = "vendor/roc_std" }
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
simple_logger = { version = "2.2.0", features = ["stderr"] }
sled = "0.34"
tempfile = "3.2"
walkdir = "2.3"
Expand Down
10 changes: 7 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::coordinator::Coordinator;
use crate::glue;
use crate::store::Store;
use anyhow::{Context, Result};
use clap::Parser;
use core::mem::MaybeUninit;
Expand All @@ -14,20 +15,23 @@ pub struct Cli {
use_fake_runner: bool,

#[clap(long, default_value = ".rbt")]
isolator_root: PathBuf,
root_dir: PathBuf,
}

impl Cli {
pub fn run(&self) -> Result<()> {
let rbt = Self::load();

let mut coordinator = Coordinator::default();
let store = Store::new(self.root_dir.join("store")).context("could not open store")?;
BrianHicks marked this conversation as resolved.
Show resolved Hide resolved

let mut coordinator = Coordinator::new(self.root_dir.join("workspaces"), store);
coordinator.add_target(rbt.f0.default);

let runner: Box<dyn crate::coordinator::Runner> = if self.use_fake_runner {
log::info!("using fake runner");
Box::new(crate::fake_runner::FakeRunner::default())
} else {
Box::new(crate::runner::Runner::new(self.isolator_root.to_owned()))
Box::new(crate::runner::Runner)
};

while coordinator.has_outstanding_work() {
Expand Down
56 changes: 43 additions & 13 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
use crate::glue;
use crate::job::{self, Job};
use crate::store::Store;
use crate::workspace::Workspace;
use anyhow::{Context, Result};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Coordinator {
workspace_root: PathBuf,
store: Store,

jobs: HashMap<job::Id, Job>,
blocked: HashMap<job::Id, HashSet<job::Id>>,
ready: Vec<job::Id>,
}

impl Coordinator {
pub fn new(workspace_root: PathBuf, store: Store) -> Self {
Coordinator {
workspace_root,
store,

jobs: HashMap::default(),
blocked: HashMap::default(),
ready: Vec::default(),
}
}

pub fn add_target(&mut self, top_job: glue::Job) {
// Note: this data structure is going to grow the ability to refer to other
// jobs as soon as it's possibly feasible. When that happens, a depth-first
Expand All @@ -25,18 +42,30 @@ impl Coordinator {
}

pub fn run_next<R: Runner>(&mut self, runner: &R) -> Result<()> {
let next = match self.ready.pop() {
let id = match self.ready.pop() {
Some(id) => id,
None => anyhow::bail!("no work ready to do"),
};

runner
.run(
self.jobs
.get(&next)
.context("had a bad job ID in Coordinator.ready")?,
)
.context("could not run job")?;
let job = self
.jobs
.get(&id)
.context("had a bad job ID in Coordinator.ready")?;

log::debug!("preparing to run job {}", job.id);

if self.store.for_job(job).is_none() {
let workspace = Workspace::create(&self.workspace_root, job)
.with_context(|| format!("could not create workspace for job {}", job.id))?;

runner.run(job, &workspace).context("could not run job")?;

self.store
.store_from_workspace(job, workspace)
.context("could not store job output")?;
} else {
log::debug!("already had output of this job; skipping");
bhansconnect marked this conversation as resolved.
Show resolved Hide resolved
}

// Now that we're done running the job, we update our bookkeeping to
// figure out what running that job just unblocked.
Expand All @@ -46,13 +75,14 @@ impl Coordinator {
let mut newly_unblocked = vec![]; // avoiding mutating both fields of self in the loop below

self.blocked.retain(|blocked, blockers| {
let removed = blockers.remove(&next);
let removed = blockers.remove(&id);
if !removed {
return false;
}

let no_blockers_remaining = blockers.is_empty();
if no_blockers_remaining {
log::debug!("unblocked {}", blocked);
newly_unblocked.push(*blocked)
}
!no_blockers_remaining
Expand All @@ -64,11 +94,11 @@ impl Coordinator {
}

pub trait Runner {
fn run(&self, job: &Job) -> Result<()>;
fn run(&self, job: &Job, workspace: &Workspace) -> Result<()>;
}

impl Runner for Box<dyn Runner> {
fn run(&self, job: &Job) -> Result<()> {
self.as_ref().run(job)
fn run(&self, job: &Job, workspace: &Workspace) -> Result<()> {
self.as_ref().run(job, workspace)
}
}
5 changes: 3 additions & 2 deletions src/fake_runner.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::coordinator::Runner;
use crate::job::Job;
use crate::workspace::Workspace;
use anyhow::Result;

#[derive(Debug, Default)]
pub struct FakeRunner {}

impl Runner for FakeRunner {
fn run(&self, job: &Job) -> Result<()> {
eprintln!("running job: {:#?}", job);
fn run(&self, job: &Job, _workspace: &Workspace) -> Result<()> {
log::info!("fake-running job {:?}", job);

std::thread::sleep(std::time::Duration::from_millis(500));

Expand Down
Loading