Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schedule + db conflict #437

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
cargo-udeps
cargo-watch
rustup
tokio-console
twiggy
wasm-tools
];
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ wait-timeout = "0.2"

[features]
default = ["wasmtime-default", "ipfs", "monitoring", "websocket-notify"]
dev = ["ansi-logs", "ipfs", "monitoring", "websocket-notify"]
dev = ["ansi-logs", "console", "ipfs", "monitoring", "websocket-notify"]
ansi-logs = ["tracing-logfmt/ansi_logs"]
console = ["dep:console-subscriber"]
ipfs = ["dep:ipfs-api", "dep:ipfs-api-backend-hyper"]
Expand Down
49 changes: 33 additions & 16 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ pub trait Database: Send + Sync + Clone {
receipt: Receipt,
conn: &mut Connection,
) -> Result<Receipt, diesel::result::Error> {
let receipt = conn.transaction::<_, diesel::result::Error, _>(|conn| {
let returned = Self::store_receipt(receipt, conn)?;
Self::store_workflow_receipt(workflow_cid, returned.cid(), conn)?;
Ok(returned)
})?;

Ok(receipt)
conn.transaction::<_, diesel::result::Error, _>(|conn| {
if let Some(returned) = Self::store_receipt(receipt.clone(), conn)? {
Self::store_workflow_receipt(workflow_cid, returned.cid(), conn)?;
Ok(returned)
} else {
Self::store_workflow_receipt(workflow_cid, receipt.cid(), conn)?;
Ok(receipt)
}
})
}

/// Store receipt given a connection to the database pool.
Expand All @@ -134,12 +136,13 @@ pub trait Database: Send + Sync + Clone {
fn store_receipt(
receipt: Receipt,
conn: &mut Connection,
) -> Result<Receipt, diesel::result::Error> {
) -> Result<Option<Receipt>, diesel::result::Error> {
diesel::insert_into(schema::receipts::table)
.values(&receipt)
.on_conflict(schema::receipts::cid)
.do_nothing()
.get_result(conn)
.optional()
}

/// Store receipts given a connection to the Database pool.
Expand All @@ -148,13 +151,17 @@ pub trait Database: Send + Sync + Clone {
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = diesel::insert_into(schema::receipts::table)
if let Some(res) = diesel::insert_into(schema::receipts::table)
.values(receipt)
.on_conflict(schema::receipts::cid)
.do_nothing()
.execute(conn)?;

Ok::<_, diesel::result::Error>(acc + res)
.execute(conn)
.optional()?
{
Ok::<_, diesel::result::Error>(acc + res)
} else {
Ok(acc)
}
})
}

Expand Down Expand Up @@ -208,19 +215,25 @@ pub trait Database: Send + Sync + Clone {
workflow: workflow::Stored,
conn: &mut Connection,
) -> Result<workflow::Stored, diesel::result::Error> {
diesel::insert_into(schema::workflows::table)
if let Some(stored) = diesel::insert_into(schema::workflows::table)
.values(&workflow)
.on_conflict(schema::workflows::cid)
.do_nothing()
.get_result(conn)
.optional()?
{
Ok(stored)
} else {
Ok(workflow)
}
}

/// Store workflow [Cid] and [Receipt] [Cid] in the database for inner join.
fn store_workflow_receipt(
workflow_cid: Cid,
receipt_cid: Cid,
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
) -> Result<Option<usize>, diesel::result::Error> {
let value = StoredReceipt::new(Pointer::new(workflow_cid), Pointer::new(receipt_cid));
diesel::insert_into(schema::workflows_receipts::table)
.values(&value)
Expand All @@ -230,6 +243,7 @@ pub trait Database: Send + Sync + Clone {
))
.do_nothing()
.execute(conn)
.optional()
}

/// Store series of receipts for a workflow [Cid] in the
Expand All @@ -244,8 +258,11 @@ pub trait Database: Send + Sync + Clone {
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = Self::store_workflow_receipt(workflow_cid, *receipt, conn)?;
Ok::<_, diesel::result::Error>(acc + res)
if let Some(res) = Self::store_workflow_receipt(workflow_cid, *receipt, conn)? {
Ok::<_, diesel::result::Error>(acc + res)
} else {
Ok(acc)
}
})
}

Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/runner/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl show::ConsoleTable for AckWorkflow {

fn echo_table(&self) -> Result<(), std::io::Error> {
let table = self.table();

let mut resource_table = Table::new(
self.resources
.iter()
Expand Down
53 changes: 31 additions & 22 deletions homestar-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl<'a> TaskScheduler<'a> {
let schedule: &mut Schedule<'a> = mut_graph.schedule.as_mut();
let schedule_length = schedule.len();
let mut resources_to_fetch = vec![];
let linkmap = LinkMap::<InstructionResult<Arg>>::new();

let resume = 'resume: {
for (idx, vec) in schedule.iter().enumerate().rev() {
Expand All @@ -128,29 +129,22 @@ impl<'a> TaskScheduler<'a> {
if let Ok(pointers) = folded_pointers {
match Db::find_instruction_pointers(&pointers, conn) {
Ok(found) => {
let linkmap = found.iter().fold(
LinkMap::<InstructionResult<Arg>>::new(),
|mut map, receipt| {
if let Some(idx) = resources_to_fetch
.iter()
.position(|(cid, _rsc)| cid == &receipt.instruction().cid())
{
resources_to_fetch.swap_remove(idx);
}

let _ = map.insert(
receipt.instruction().cid(),
receipt.output_as_arg(),
);

map
},
);
let linkmap = found.iter().fold(linkmap.clone(), |mut map, receipt| {
if let Some(idx) = resources_to_fetch
.iter()
.position(|(cid, _rsc)| cid == &receipt.instruction().cid())
{
resources_to_fetch.swap_remove(idx);
}

let _ = map
.insert(receipt.instruction().cid(), receipt.output_as_arg());

map
});

if found.len() == vec.len() {
break 'resume ControlFlow::Break((idx + 1, linkmap));
} else if !found.is_empty() && found.len() < vec.len() {
break 'resume ControlFlow::Break((idx, linkmap));
} else {
continue;
}
Expand Down Expand Up @@ -195,7 +189,7 @@ impl<'a> TaskScheduler<'a> {
}
_ => Ok(SchedulerContext {
scheduler: Self {
linkmap: Arc::new(LinkMap::<InstructionResult<Arg>>::new().into()),
linkmap: Arc::new(linkmap.into()),
ran: None,
run: schedule.to_vec(),
resume_step: None,
Expand All @@ -204,6 +198,21 @@ impl<'a> TaskScheduler<'a> {
}),
}
}

/// TODO
#[allow(dead_code)]
pub(crate) fn ran_length(&self) -> usize {
self.ran
.as_ref()
.map(|ran| ran.iter().flatten().collect::<Vec<_>>().len())
.unwrap_or_default()
}

/// TODO
#[allow(dead_code)]
pub(crate) fn run_length(&self) -> usize {
self.run.iter().flatten().collect::<Vec<_>>().len()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -304,7 +313,7 @@ mod test {
let mut conn = db.conn().unwrap();
let stored_receipt = MemoryDb::store_receipt(receipt.clone(), &mut conn).unwrap();

assert_eq!(receipt, stored_receipt);
assert_eq!(receipt, stored_receipt.unwrap());

let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let fetch_fn = |_rscs: FnvHashSet<Resource>| {
Expand Down
4 changes: 2 additions & 2 deletions homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Default for Monitoring {
fn default() -> Self {
Self {
process_collector_interval: Duration::from_millis(5000),
console_subscriber_port: 5555,
console_subscriber_port: 6669,
}
}
}
Expand All @@ -229,7 +229,7 @@ impl Default for Monitoring {
impl Default for Monitoring {
fn default() -> Self {
Self {
console_subscriber_port: 5555,
console_subscriber_port: 6669,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
debug!(cid = cid.to_string(), "found in in-memory linkmap");
Ok(result.to_owned())
} else if let Some(bytes) = resources.read().await.get(&Resource::Cid(cid)) {
debug!(cid = cid.to_string(), "found in resources");
Ok(InstructionResult::Ok(Arg::Ipld(Ipld::Bytes(
bytes.to_vec(),
))))
Expand Down Expand Up @@ -248,11 +249,11 @@ where
// Replay previous receipts if subscriptions are on.
#[cfg(feature = "websocket-notify")]
{
if scheduler.ran.as_ref().is_some_and(|ran| !ran.is_empty()) {
if scheduler.ran_length() > 0 {
info!(
workflow_cid = self.workflow_info.cid.to_string(),
"{} tasks left to run, sending last batch for workflow",
scheduler.ran.as_ref().unwrap().len()
scheduler.run_length()
);
let mut pointers = Vec::new();
for batch in scheduler
Expand Down Expand Up @@ -362,8 +363,7 @@ where
instruction_ptr,
invocation_ptr,
receipt_meta,
additional_meta,
)),
additional_meta)),
Err(err) => Err(
anyhow!("cannot execute wasm module: {err}"))
.with_context(|| {
Expand All @@ -382,7 +382,6 @@ where

// Concurrently add handles to Runner's running set.
running_tasks.append_or_insert(self.workflow_info.cid(), handles);

while let Some(res) = task_set.join_next().await {
let (executed, instruction_ptr, invocation_ptr, receipt_meta, add_meta) = match res
{
Expand All @@ -396,8 +395,8 @@ where
break;
}
};
let output_to_store = Ipld::try_from(executed)?;

let output_to_store = Ipld::try_from(executed)?;
let invocation_receipt = InvocationReceipt::new(
invocation_ptr,
InstructionResult::Ok(output_to_store),
Expand Down Expand Up @@ -425,6 +424,7 @@ where

let stored_receipt =
Db::commit_receipt(self.workflow_info.cid, receipt, &mut self.db.conn()?)?;

debug!(
cid = self.workflow_info.cid.to_string(),
"commited to database"
Expand Down
Loading
Loading