diff --git a/engine/controllers/models.cc b/engine/controllers/models.cc index 7439a5df5..86b749ce6 100644 --- a/engine/controllers/models.cc +++ b/engine/controllers/models.cc @@ -218,11 +218,10 @@ void Models::ListModel( obj["id"] = model_entry.model; obj["model"] = model_entry.model; obj["status"] = "downloaded"; - // TODO(sang) Temporarily remove this estimation - // auto es = model_service_->GetEstimation(model_entry.model); - // if (es.has_value() && !!es.value()) { - // obj["recommendation"] = hardware::ToJson(*(es.value())); - // } + auto es = model_service_->GetEstimation(model_entry.model); + if (es.has_value()) { + obj["recommendation"] = hardware::ToJson(*es); + } data.append(std::move(obj)); yaml_handler.Reset(); } else if (model_config.engine == kPythonEngine) { diff --git a/engine/main.cc b/engine/main.cc index 122ea094a..2f60916a6 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -37,6 +37,7 @@ #include "utils/file_manager_utils.h" #include "utils/logging_utils.h" #include "utils/system_info_utils.h" +#include "utils/task_queue.h" #if defined(__APPLE__) && defined(__MACH__) #include // for dirname() @@ -177,8 +178,11 @@ void RunServer(std::optional host, std::optional port, download_service, dylib_path_manager, db_service); auto inference_svc = std::make_shared(engine_service); auto model_src_svc = std::make_shared(db_service); - auto model_service = std::make_shared( - db_service, hw_service, download_service, inference_svc, engine_service); + cortex::TaskQueue task_queue( + std::min(2u, std::thread::hardware_concurrency()), "background_task"); + auto model_service = + std::make_shared(db_service, hw_service, download_service, + inference_svc, engine_service, task_queue); inference_svc->SetModelService(model_service); auto file_watcher_srv = std::make_shared( diff --git a/engine/services/hardware_service.cc b/engine/services/hardware_service.cc index 88a5df6b0..817ab320b 100644 --- a/engine/services/hardware_service.cc +++ b/engine/services/hardware_service.cc @@ -207,9 +207,6 @@ bool HardwareService::Restart(const std::string& host, int port) { if (!TryConnectToServer(host, port)) { return false; } - std::cout << "Server started" << std::endl; - std::cout << "API Documentation available at: http://" << host << ":" - << port << std::endl; } #endif @@ -348,6 +345,7 @@ void HardwareService::UpdateHardwareInfos() { return false; return true; }; + auto res = db_service_->AddHardwareEntry( HwEntry{.uuid = gpu.uuid, .type = "gpu", diff --git a/engine/services/model_service.cc b/engine/services/model_service.cc index aeef54605..c13f7cf19 100644 --- a/engine/services/model_service.cc +++ b/engine/services/model_service.cc @@ -143,6 +143,21 @@ cpp::result GetDownloadTask( } } // namespace +ModelService::ModelService(std::shared_ptr db_service, + std::shared_ptr hw_service, + std::shared_ptr download_service, + std::shared_ptr inference_service, + std::shared_ptr engine_svc, + cortex::TaskQueue& task_queue) + : db_service_(db_service), + hw_service_(hw_service), + download_service_{download_service}, + inference_svc_(inference_service), + engine_svc_(engine_svc), + task_queue_(task_queue) { + ProcessBgrTasks(); +}; + void ModelService::ForceIndexingModelList() { CTL_INF("Force indexing model list"); @@ -331,8 +346,17 @@ cpp::result ModelService::HandleDownloadUrlAsync( return download_service_->AddTask(downloadTask, on_finished); } +std::optional ModelService::GetEstimation( + const std::string& model_handle) { + std::lock_guard l(es_mtx_); + if (auto it = es_.find(model_handle); it != es_.end()) { + return it->second; + } + return std::nullopt; +} + cpp::result, std::string> -ModelService::GetEstimation(const std::string& model_handle, +ModelService::EstimateModel(const std::string& model_handle, const std::string& kv_cache, int n_batch, int n_ubatch) { namespace fs = std::filesystem; @@ -548,7 +572,7 @@ ModelService::DownloadModelFromCortexsoAsync( // Close the file pyvenv_cfg.close(); // Add executable permission to python - set_permission_utils::SetExecutePermissionsRecursive(venv_path); + (void)set_permission_utils::SetExecutePermissionsRecursive(venv_path); } else { CTL_ERR("Failed to extract venv.zip"); }; @@ -828,7 +852,7 @@ cpp::result ModelService::StartModel( CTL_WRN("Error: " + res.error()); for (auto& depend : depends) { if (depend != model_handle) { - StopModel(depend); + auto sr = StopModel(depend); } } return cpp::fail("Model failed to start dependency '" + depend + @@ -1390,4 +1414,28 @@ std::string ModelService::GetEngineByModelId( auto mc = yaml_handler.GetModelConfig(); CTL_DBG(mc.engine); return mc.engine; +} + +void ModelService::ProcessBgrTasks() { + CTL_INF("Start processing background tasks") + auto cb = [this] { + CTL_DBG("Estimate model resource usage"); + auto list_entry = db_service_->LoadModelList(); + if (list_entry) { + for (const auto& model_entry : list_entry.value()) { + // Only process local models + if (model_entry.status == cortex::db::ModelStatus::Downloaded) { + auto es = EstimateModel(model_entry.model); + if (es.has_value()) { + std::lock_guard l(es_mtx_); + es_[model_entry.model] = es.value(); + } + } + } + } + }; + + auto clone = cb; + task_queue_.RunInQueue(std::move(cb)); + task_queue_.RunEvery(std::chrono::seconds(10), std::move(clone)); } \ No newline at end of file diff --git a/engine/services/model_service.h b/engine/services/model_service.h index dcf99430f..04c7f240a 100644 --- a/engine/services/model_service.h +++ b/engine/services/model_service.h @@ -10,6 +10,7 @@ #include "services/download_service.h" #include "services/hardware_service.h" #include "utils/hardware/gguf/gguf_file_estimate.h" +#include "utils/task_queue.h" class InferenceService; @@ -35,12 +36,8 @@ class ModelService { std::shared_ptr hw_service, std::shared_ptr download_service, std::shared_ptr inference_service, - std::shared_ptr engine_svc) - : db_service_(db_service), - hw_service_(hw_service), - download_service_{download_service}, - inference_svc_(inference_service), - engine_svc_(engine_svc) {}; + std::shared_ptr engine_svc, + cortex::TaskQueue& task_queue); cpp::result AbortDownloadModel( const std::string& task_id); @@ -81,7 +78,10 @@ class ModelService { bool HasModel(const std::string& id) const; - cpp::result, std::string> GetEstimation( + std::optional GetEstimation( + const std::string& model_handle); + + cpp::result, std::string> EstimateModel( const std::string& model_handle, const std::string& kv_cache = "f16", int n_batch = 2048, int n_ubatch = 2048); @@ -112,6 +112,8 @@ class ModelService { const std::string& model_path, int ngl, int ctx_len, int n_batch = 2048, int n_ubatch = 2048, const std::string& kv_cache_type = "f16"); + void ProcessBgrTasks(); + int GetCpuThreads() const; std::shared_ptr db_service_; @@ -126,4 +128,8 @@ class ModelService { */ std::unordered_map> loaded_model_metadata_map_; + + std::mutex es_mtx_; + std::unordered_map> es_; + cortex::TaskQueue& task_queue_; }; diff --git a/engine/utils/task_queue.h b/engine/utils/task_queue.h new file mode 100644 index 000000000..911a7b307 --- /dev/null +++ b/engine/utils/task_queue.h @@ -0,0 +1,45 @@ +#pragma once +#include +#include +#include "trantor/net/EventLoopThreadPool.h" + +namespace cortex { +class TaskQueue { + public: + TaskQueue(size_t num_threads, const std::string& name) + : ev_loop_pool_( + std::make_unique(num_threads, name)) { + ev_loop_pool_->start(); + } + ~TaskQueue() {} + + template + void RunInQueue(Functor&& f) { + if (ev_loop_pool_) { + ev_loop_pool_->getNextLoop()->runInLoop(std::forward(f)); + } + } + + template + uint64_t RunEvery(const std::chrono::duration& interval, + Functor&& cb) { + if (ev_loop_pool_) { + return ev_loop_pool_->getNextLoop()->runEvery(interval, + std::forward(cb)); + } + return 0; + } + + template + uint64_t RunAfter(const std::chrono::duration& delay, Functor&& cb) { + if (ev_loop_pool_) { + return ev_loop_pool_->getNextLoop()->runAfter(delay, + std::forward(cb)); + } + return 0; + } + + private: + std::unique_ptr ev_loop_pool_ = nullptr; +}; +} // namespace cortex \ No newline at end of file