Skip to content

Commit

Permalink
fix: schedule + db conflict (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeeshan Lakhani committed Nov 15, 2023
1 parent bfaaff6 commit cca7581
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 109 deletions.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
cargo-udeps
cargo-watch
rustup
tokio-console
twiggy
wasm-tools
];
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ wait-timeout = "0.2"

[features]
default = ["wasmtime-default", "ipfs", "monitoring", "websocket-notify"]
dev = ["ansi-logs", "ipfs", "monitoring", "websocket-notify"]
dev = ["ansi-logs", "console", "ipfs", "monitoring", "websocket-notify"]
ansi-logs = ["tracing-logfmt/ansi_logs"]
console = ["dep:console-subscriber"]
ipfs = ["dep:ipfs-api", "dep:ipfs-api-backend-hyper"]
Expand Down
49 changes: 33 additions & 16 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ pub trait Database: Send + Sync + Clone {
receipt: Receipt,
conn: &mut Connection,
) -> Result<Receipt, diesel::result::Error> {
let receipt = conn.transaction::<_, diesel::result::Error, _>(|conn| {
let returned = Self::store_receipt(receipt, conn)?;
Self::store_workflow_receipt(workflow_cid, returned.cid(), conn)?;
Ok(returned)
})?;

Ok(receipt)
conn.transaction::<_, diesel::result::Error, _>(|conn| {
if let Some(returned) = Self::store_receipt(receipt.clone(), conn)? {
Self::store_workflow_receipt(workflow_cid, returned.cid(), conn)?;
Ok(returned)
} else {
Self::store_workflow_receipt(workflow_cid, receipt.cid(), conn)?;
Ok(receipt)
}
})
}

/// Store receipt given a connection to the database pool.
Expand All @@ -134,12 +136,13 @@ pub trait Database: Send + Sync + Clone {
fn store_receipt(
receipt: Receipt,
conn: &mut Connection,
) -> Result<Receipt, diesel::result::Error> {
) -> Result<Option<Receipt>, diesel::result::Error> {
diesel::insert_into(schema::receipts::table)
.values(&receipt)
.on_conflict(schema::receipts::cid)
.do_nothing()
.get_result(conn)
.optional()
}

/// Store receipts given a connection to the Database pool.
Expand All @@ -148,13 +151,17 @@ pub trait Database: Send + Sync + Clone {
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = diesel::insert_into(schema::receipts::table)
if let Some(res) = diesel::insert_into(schema::receipts::table)
.values(receipt)
.on_conflict(schema::receipts::cid)
.do_nothing()
.execute(conn)?;

Ok::<_, diesel::result::Error>(acc + res)
.execute(conn)
.optional()?
{
Ok::<_, diesel::result::Error>(acc + res)
} else {
Ok(acc)
}
})
}

Expand Down Expand Up @@ -208,19 +215,25 @@ pub trait Database: Send + Sync + Clone {
workflow: workflow::Stored,
conn: &mut Connection,
) -> Result<workflow::Stored, diesel::result::Error> {
diesel::insert_into(schema::workflows::table)
if let Some(stored) = diesel::insert_into(schema::workflows::table)
.values(&workflow)
.on_conflict(schema::workflows::cid)
.do_nothing()
.get_result(conn)
.optional()?
{
Ok(stored)
} else {
Ok(workflow)
}
}

/// Store workflow [Cid] and [Receipt] [Cid] in the database for inner join.
fn store_workflow_receipt(
workflow_cid: Cid,
receipt_cid: Cid,
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
) -> Result<Option<usize>, diesel::result::Error> {
let value = StoredReceipt::new(Pointer::new(workflow_cid), Pointer::new(receipt_cid));
diesel::insert_into(schema::workflows_receipts::table)
.values(&value)
Expand All @@ -230,6 +243,7 @@ pub trait Database: Send + Sync + Clone {
))
.do_nothing()
.execute(conn)
.optional()
}

/// Store series of receipts for a workflow [Cid] in the
Expand All @@ -244,8 +258,11 @@ pub trait Database: Send + Sync + Clone {
conn: &mut Connection,
) -> Result<usize, diesel::result::Error> {
receipts.iter().try_fold(0, |acc, receipt| {
let res = Self::store_workflow_receipt(workflow_cid, *receipt, conn)?;
Ok::<_, diesel::result::Error>(acc + res)
if let Some(res) = Self::store_workflow_receipt(workflow_cid, *receipt, conn)? {
Ok::<_, diesel::result::Error>(acc + res)
} else {
Ok(acc)
}
})
}

Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/runner/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl show::ConsoleTable for AckWorkflow {

fn echo_table(&self) -> Result<(), std::io::Error> {
let table = self.table();

let mut resource_table = Table::new(
self.resources
.iter()
Expand Down
53 changes: 31 additions & 22 deletions homestar-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl<'a> TaskScheduler<'a> {
let schedule: &mut Schedule<'a> = mut_graph.schedule.as_mut();
let schedule_length = schedule.len();
let mut resources_to_fetch = vec![];
let linkmap = LinkMap::<InstructionResult<Arg>>::new();

let resume = 'resume: {
for (idx, vec) in schedule.iter().enumerate().rev() {
Expand All @@ -128,29 +129,22 @@ impl<'a> TaskScheduler<'a> {
if let Ok(pointers) = folded_pointers {
match Db::find_instruction_pointers(&pointers, conn) {
Ok(found) => {
let linkmap = found.iter().fold(
LinkMap::<InstructionResult<Arg>>::new(),
|mut map, receipt| {
if let Some(idx) = resources_to_fetch
.iter()
.position(|(cid, _rsc)| cid == &receipt.instruction().cid())
{
resources_to_fetch.swap_remove(idx);
}

let _ = map.insert(
receipt.instruction().cid(),
receipt.output_as_arg(),
);

map
},
);
let linkmap = found.iter().fold(linkmap.clone(), |mut map, receipt| {
if let Some(idx) = resources_to_fetch
.iter()
.position(|(cid, _rsc)| cid == &receipt.instruction().cid())
{
resources_to_fetch.swap_remove(idx);
}

let _ = map
.insert(receipt.instruction().cid(), receipt.output_as_arg());

map
});

if found.len() == vec.len() {
break 'resume ControlFlow::Break((idx + 1, linkmap));
} else if !found.is_empty() && found.len() < vec.len() {
break 'resume ControlFlow::Break((idx, linkmap));
} else {
continue;
}
Expand Down Expand Up @@ -195,7 +189,7 @@ impl<'a> TaskScheduler<'a> {
}
_ => Ok(SchedulerContext {
scheduler: Self {
linkmap: Arc::new(LinkMap::<InstructionResult<Arg>>::new().into()),
linkmap: Arc::new(linkmap.into()),
ran: None,
run: schedule.to_vec(),
resume_step: None,
Expand All @@ -204,6 +198,21 @@ impl<'a> TaskScheduler<'a> {
}),
}
}

/// TODO
#[allow(dead_code)]
pub(crate) fn ran_length(&self) -> usize {
self.ran
.as_ref()
.map(|ran| ran.iter().flatten().collect::<Vec<_>>().len())
.unwrap_or_default()
}

/// TODO
#[allow(dead_code)]
pub(crate) fn run_length(&self) -> usize {
self.run.iter().flatten().collect::<Vec<_>>().len()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -304,7 +313,7 @@ mod test {
let mut conn = db.conn().unwrap();
let stored_receipt = MemoryDb::store_receipt(receipt.clone(), &mut conn).unwrap();

assert_eq!(receipt, stored_receipt);
assert_eq!(receipt, stored_receipt.unwrap());

let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let fetch_fn = |_rscs: FnvHashSet<Resource>| {
Expand Down
4 changes: 2 additions & 2 deletions homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Default for Monitoring {
fn default() -> Self {
Self {
process_collector_interval: Duration::from_millis(5000),
console_subscriber_port: 5555,
console_subscriber_port: 6669,
}
}
}
Expand All @@ -229,7 +229,7 @@ impl Default for Monitoring {
impl Default for Monitoring {
fn default() -> Self {
Self {
console_subscriber_port: 5555,
console_subscriber_port: 6669,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
debug!(cid = cid.to_string(), "found in in-memory linkmap");
Ok(result.to_owned())
} else if let Some(bytes) = resources.read().await.get(&Resource::Cid(cid)) {
debug!(cid = cid.to_string(), "found in resources");
Ok(InstructionResult::Ok(Arg::Ipld(Ipld::Bytes(
bytes.to_vec(),
))))
Expand Down Expand Up @@ -248,11 +249,11 @@ where
// Replay previous receipts if subscriptions are on.
#[cfg(feature = "websocket-notify")]
{
if scheduler.ran.as_ref().is_some_and(|ran| !ran.is_empty()) {
if scheduler.ran_length() > 0 {
info!(
workflow_cid = self.workflow_info.cid.to_string(),
"{} tasks left to run, sending last batch for workflow",
scheduler.ran.as_ref().unwrap().len()
scheduler.run_length()
);
let mut pointers = Vec::new();
for batch in scheduler
Expand Down Expand Up @@ -362,8 +363,7 @@ where
instruction_ptr,
invocation_ptr,
receipt_meta,
additional_meta,
)),
additional_meta)),
Err(err) => Err(
anyhow!("cannot execute wasm module: {err}"))
.with_context(|| {
Expand All @@ -382,7 +382,6 @@ where

// Concurrently add handles to Runner's running set.
running_tasks.append_or_insert(self.workflow_info.cid(), handles);

while let Some(res) = task_set.join_next().await {
let (executed, instruction_ptr, invocation_ptr, receipt_meta, add_meta) = match res
{
Expand All @@ -396,8 +395,8 @@ where
break;
}
};
let output_to_store = Ipld::try_from(executed)?;

let output_to_store = Ipld::try_from(executed)?;
let invocation_receipt = InvocationReceipt::new(
invocation_ptr,
InstructionResult::Ok(output_to_store),
Expand Down Expand Up @@ -425,6 +424,7 @@ where

let stored_receipt =
Db::commit_receipt(self.workflow_info.cid, receipt, &mut self.db.conn()?)?;

debug!(
cid = self.workflow_info.cid.to_string(),
"commited to database"
Expand Down
Loading

0 comments on commit cca7581

Please sign in to comment.