Skip to content

Commit

Permalink
Avoid purge of model drivers on force drop space
Browse files Browse the repository at this point in the history
The purge is unnecessary as the space itself is removed.

Also, in fractal mgr simply sleep for a fixed duration if
initial threshold is breached.
  • Loading branch information
ohsayan committed Mar 2, 2024
1 parent 1ed4f41 commit 41e091c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 43 deletions.
11 changes: 3 additions & 8 deletions server/src/engine/core/space.rs
Expand Up @@ -266,19 +266,14 @@ impl Space {
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
));
let space_uuid = space.get_uuid();
for model in space.models.into_iter() {
let e: EntityIDRef<'static> = unsafe {
// UNSAFE(@ohsayan): I want to try what the borrow checker has been trying
core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model))
};
let mdl = models.st_delete_return(&e).unwrap();
global.purge_model_driver(
&space_name,
space_uuid,
&model,
mdl.data().get_uuid(),
);
models.st_delete(&e);
// no need to purge model drive since the dir itself is deleted. our work here is to just
// remove this from the linked models from the model ns
}
let _ = spaces.st_delete(space_name.as_str());
if if_exists {
Expand Down
82 changes: 56 additions & 26 deletions server/src/engine/fractal/mgr.rs
Expand Up @@ -42,7 +42,7 @@ use {
},
util::os,
},
std::path::PathBuf,
std::{path::PathBuf, time::Duration},
tokio::{
fs,
sync::{
Expand All @@ -54,6 +54,8 @@ use {
};

pub const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
const TASK_THRESHOLD: usize = 10;
const TASK_FAILURE_SLEEP_DURATION: u64 = 30;

/// A task for the [`FractalMgr`] to perform
#[derive(Debug)]
Expand All @@ -63,10 +65,9 @@ pub struct Task<T> {
}

impl<T> Task<T> {
const THRESHOLD: usize = 10;
/// Create a new task with the default threshold
pub fn new(task: T) -> Self {
Self::with_threshold(task, Self::THRESHOLD)
Self::with_threshold(task, TASK_THRESHOLD)
}
/// Create a task with the given threshold
fn with_threshold(task: T, threshold: usize) -> Self {
Expand All @@ -76,6 +77,11 @@ impl<T> Task<T> {
pub fn into_task(self) -> T {
self.task
}
async fn sleep(&self) {
if self.threshold != TASK_THRESHOLD {
tokio::time::sleep(Duration::from_secs(TASK_FAILURE_SLEEP_DURATION)).await
}
}
}

/// A general task
Expand Down Expand Up @@ -248,6 +254,11 @@ impl FractalMgr {

// services
impl FractalMgr {
#[inline(always)]
fn adjust_threshold(th: usize) -> usize {
// FIXME(@ohsayan): adjust a correct threshold. right now we don't do anything here (and for good reason)
th.saturating_sub(1)
}
/// The high priority executor service runs in the background to take care of high priority tasks and take any
/// appropriate action. It will exclusively own the high priority queue since it is the only broker that is
/// allowed to perform HP tasks
Expand All @@ -261,7 +272,10 @@ impl FractalMgr {
let task = tokio::select! {
task = receiver.recv() => {
match task {
Some(t) => t,
Some(t) => {
t.sleep().await;
t
},
None => {
info!("fhp: exiting executor service because all tasks closed");
break;
Expand All @@ -284,6 +298,22 @@ impl FractalMgr {
.unwrap()
}
}
#[cold]
#[inline(never)]
fn re_enqueue_model_sync(
&self,
model_id: ModelUniqueID,
observed_size: usize,
stats: BatchStats,
threshold: usize,
) {
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size - stats.get_actual()),
threshold,
))
.unwrap()
}
fn hp_executor(
&'static self,
global: super::Global,
Expand Down Expand Up @@ -370,15 +400,12 @@ impl FractalMgr {
model_id.uuid()
);
// enqueue again for retrying
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(
model_id,
observed_size - stats.get_actual(),
),
threshold - 1,
))
.unwrap();
self.re_enqueue_model_sync(
model_id,
observed_size,
stats,
Self::adjust_threshold(threshold),
)
}
}
}
Expand Down Expand Up @@ -411,7 +438,10 @@ impl FractalMgr {
}
task = lpq.recv() => {
let Task { threshold, task } = match task {
Some(t) => t,
Some(t) => {
t.sleep().await;
t
},
None => {
info!("flp: exiting executor service because all tasks closed");
break;
Expand All @@ -422,14 +452,14 @@ impl FractalMgr {
GenericTask::DeleteFile(f) => {
if let Err(_) = fs::remove_file(&f).await {
self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteFile(f), threshold - 1)
Task::with_threshold(GenericTask::DeleteFile(f), Self::adjust_threshold(threshold))
).unwrap();
}
}
GenericTask::DeleteDirAll(dir) => {
if let Err(_) = fs::remove_dir_all(&dir).await {
self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteDirAll(dir), threshold - 1)
Task::with_threshold(GenericTask::DeleteDirAll(dir), Self::adjust_threshold(threshold))
).unwrap();
}
}
Expand Down Expand Up @@ -465,16 +495,16 @@ impl FractalMgr {
model_id.space(), model_id.entity(),
);
// this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher
.send(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(
model_id.space(),
model_id.entity(),
model.data().get_uuid(),
),
observed_len - stats.get_actual(),
)))
.unwrap()
self.re_enqueue_model_sync(
ModelUniqueID::new(
model_id.space(),
model_id.entity(),
model.data().get_uuid(),
),
observed_len,
stats,
TASK_THRESHOLD,
)
}
}
}
Expand Down
23 changes: 14 additions & 9 deletions server/src/engine/fractal/mod.rs
Expand Up @@ -33,7 +33,7 @@ use {
GNSDriver, ModelDriver,
},
},
crate::engine::error::RuntimeResult,
crate::{engine::error::RuntimeResult, util::compiler},
std::{fmt, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
Expand Down Expand Up @@ -117,6 +117,7 @@ pub trait GlobalInstanceLike {
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
// default impls
#[inline(always)]
fn request_batch_resolve_if_cache_full(
&self,
space_name: &str,
Expand All @@ -128,14 +129,18 @@ pub trait GlobalInstanceLike {
let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size();
let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5))
& (r_tolerated_change);
if r_tolerated_change | r_percent_change {
let obtained_delta_size = model
.delta_state()
.__fractal_take_full_from_data_delta(FractalToken::new());
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
obtained_delta_size,
)));
if compiler::unlikely(r_tolerated_change | r_percent_change) {
// do not inline this path as we expect sufficient memory to be present and/or the background service
// to pick this up
compiler::cold_call(|| {
let obtained_delta_size = model
.delta_state()
.__fractal_take_full_from_data_delta(FractalToken::new());
self.taskmgr_post_high_priority(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
obtained_delta_size,
)));
})
}
}
}
Expand Down

0 comments on commit 41e091c

Please sign in to comment.