From d626c52e5eda95f94c3fb7a29540f47c81ab1194 Mon Sep 17 00:00:00 2001 From: vansangpfiev Date: Mon, 14 Oct 2024 13:43:43 +0700 Subject: [PATCH 1/2] chore: refactor server controller --- engine/controllers/server.cc | 434 ++++----------------------- engine/controllers/server.h | 65 +--- engine/main.cc | 10 +- engine/services/inference_service.cc | 380 +++++++++++++++++++++++ engine/services/inference_service.h | 76 +++++ engine/utils/cortex_utils.h | 5 - engine/utils/engine_constants.h | 7 +- 7 files changed, 526 insertions(+), 451 deletions(-) create mode 100644 engine/services/inference_service.cc create mode 100644 engine/services/inference_service.h diff --git a/engine/controllers/server.cc b/engine/controllers/server.cc index ffbf2cef3..0c41b0f74 100644 --- a/engine/controllers/server.cc +++ b/engine/controllers/server.cc @@ -9,19 +9,7 @@ using namespace inferences; using json = nlohmann::json; namespace inferences { -namespace { -// Need to change this after we rename repositories -std::string NormalizeEngine(const std::string& engine) { - if (engine == kLlamaEngine) { - return kLlamaRepo; - } else if (engine == kOnnxEngine) { - return kOnnxRepo; - } else if (engine == kTrtLlmEngine) { - return kTrtLlmRepo; - } - return engine; -}; -} // namespace + server::server() { #if defined(_WIN32) SetDefaultDllDirectories(LOAD_LIBRARY_SEARCH_DEFAULT_DIRS); @@ -33,404 +21,118 @@ server::~server() {} void server::ChatCompletion( const HttpRequestPtr& req, std::function&& callback) { - std::string engine_type; - if (!HasFieldInReq(req, "engine")) { - engine_type = kLlamaRepo; - } else { - engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - } - - auto ne = NormalizeEngine(engine_type); - - if (!IsEngineLoaded(ne)) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); + LOG_DEBUG << "Start chat completion"; + auto json_body = req->getJsonObject(); + bool is_stream = (*json_body).get("stream", false).asBool(); + auto q = std::make_shared(); + auto ir = inference_svc_.HandleChatCompletion(q, json_body); + if (ir.has_error()) { + auto err = ir.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(err)); + resp->setStatusCode( + static_cast(std::get<0>(err)["status_code"].asInt())); callback(resp); - LOG_WARN << "Engine is not loaded yet"; return; } - - LOG_TRACE << "Start chat completion"; - auto json_body = req->getJsonObject(); - bool is_stream = (*json_body).get("stream", false).asBool(); - auto q = std::make_shared(); - std::get(engines_[ne].engine) - ->HandleChatCompletion(json_body, - [q](Json::Value status, Json::Value res) { - q->push(std::make_pair(status, res)); - }); - LOG_TRACE << "Wait to chat completion responses"; + LOG_DEBUG << "Wait to chat completion responses"; if (is_stream) { ProcessStreamRes(std::move(callback), q); } else { ProcessNonStreamRes(std::move(callback), *q); } - LOG_TRACE << "Done chat completion"; + LOG_DEBUG << "Done chat completion"; } void server::Embedding(const HttpRequestPtr& req, std::function&& callback) { - auto engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - auto ne = NormalizeEngine(engine_type); - if (!IsEngineLoaded(ne)) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); + LOG_TRACE << "Start embedding"; + auto q = std::make_shared(); + auto ir = inference_svc_.HandleEmbedding(q, req->getJsonObject()); + if (ir.has_error()) { + auto err = ir.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(err)); + resp->setStatusCode( + static_cast(std::get<0>(err)["status_code"].asInt())); callback(resp); - LOG_WARN << "Engine is not loaded yet"; return; } - - LOG_TRACE << "Start embedding"; - SyncQueue q; - std::get(engines_[ne].engine) - ->HandleEmbedding(req->getJsonObject(), - [&q](Json::Value status, Json::Value res) { - q.push(std::make_pair(status, res)); - }); LOG_TRACE << "Wait to embedding"; - ProcessNonStreamRes(std::move(callback), q); + ProcessNonStreamRes(std::move(callback), *q); LOG_TRACE << "Done embedding"; } void server::UnloadModel( const HttpRequestPtr& req, std::function&& callback) { - std::string engine_type; - if (!HasFieldInReq(req, "engine")) { - engine_type = kLlamaRepo; - } else { - engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - } - auto ne = NormalizeEngine(engine_type); - - if (!IsEngineLoaded(ne)) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); - callback(resp); - LOG_WARN << "Engine is not loaded yet"; - return; - } - LOG_TRACE << "Start unload model"; - std::get(engines_[ne].engine) - ->UnloadModel( - req->getJsonObject(), - [cb = std::move(callback)](Json::Value status, Json::Value res) { - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(static_cast( - status["status_code"].asInt())); - cb(resp); - }); - LOG_TRACE << "Done unload model"; + auto ir = inference_svc_.UnloadModel(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); + callback(resp); } void server::ModelStatus( const HttpRequestPtr& req, std::function&& callback) { - std::string engine_type; - if (!HasFieldInReq(req, "engine")) { - engine_type = kLlamaRepo; - } else { - engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - } - - auto ne = NormalizeEngine(engine_type); - - if (!IsEngineLoaded(ne)) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); - callback(resp); - LOG_WARN << "Engine is not loaded yet"; - return; - } - - LOG_TRACE << "Start to get model status"; - std::get(engines_[ne].engine) - ->GetModelStatus( - req->getJsonObject(), - [cb = std::move(callback)](Json::Value status, Json::Value res) { - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(static_cast( - status["status_code"].asInt())); - cb(resp); - }); - LOG_TRACE << "Done get model status"; + auto ir = inference_svc_.GetModelStatus(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); + callback(resp); } void server::GetModels(const HttpRequestPtr& req, std::function&& callback) { - if (engines_.empty()) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); - callback(resp); - LOG_WARN << "Engine is not loaded yet"; - return; - } - LOG_TRACE << "Start to get models"; - Json::Value resp_data(Json::arrayValue); - for (auto const& [k, v] : engines_) { - auto e = std::get(v.engine); - if (e->IsSupported("GetModels")) { - e->GetModels(req->getJsonObject(), - [&resp_data](Json::Value status, Json::Value res) { - for (auto r : res["data"]) { - resp_data.append(r); - } - }); - } - } - Json::Value root; - root["data"] = resp_data; - root["object"] = "list"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(root); - resp->setStatusCode(drogon::HttpStatusCode::k200OK); + auto ir = inference_svc_.GetModels(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); callback(resp); - LOG_TRACE << "Done get models"; } void server::GetEngines( const HttpRequestPtr& req, std::function&& callback) { - Json::Value res; - Json::Value engine_array(Json::arrayValue); - for (const auto& [s, _] : engines_) { - Json::Value val; - val["id"] = s; - val["object"] = "engine"; - engine_array.append(val); - } - - res["object"] = "list"; - res["data"] = engine_array; - - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); + auto ir = inference_svc_.GetEngines(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ir); callback(resp); } void server::FineTuning( const HttpRequestPtr& req, std::function&& callback) { - auto engine_type = - (*(req->getJsonObject())).get("engine", kPythonRuntimeRepo).asString(); - - if (engines_.find(engine_type) == engines_.end()) { - try { - std::string abs_path = - (getenv("ENGINE_PATH") ? getenv("ENGINE_PATH") - : cortex_utils::GetCurrentPath()) + - cortex_utils::kPythonRuntimeLibPath; - engines_[engine_type].dl = - std::make_unique(abs_path, "engine"); - } catch (const cortex_cpp::dylib::load_error& e) { - - LOG_ERROR << "Could not load engine: " << e.what(); - engines_.erase(engine_type); - - Json::Value res; - res["message"] = "Could not load engine " + engine_type; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k500InternalServerError); - callback(resp); - return; - } - - auto func = engines_[engine_type].dl->get_function( - "get_engine"); - engines_[engine_type].engine = func(); - LOG_INFO << "Loaded engine: " << engine_type; - } - - LOG_TRACE << "Start to fine-tuning"; - auto& en = std::get(engines_[engine_type].engine); - if (en->IsSupported("HandlePythonFileExecutionRequest")) { - en->HandlePythonFileExecutionRequest( - req->getJsonObject(), - [cb = std::move(callback)](Json::Value status, Json::Value res) { - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(static_cast( - status["status_code"].asInt())); - cb(resp); - }); - } else { - Json::Value res; - res["message"] = "Method is not supported yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k500InternalServerError); - callback(resp); - LOG_WARN << "Method is not supported yet"; - } + auto ir = inference_svc_.FineTuning(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); + callback(resp); LOG_TRACE << "Done fine-tuning"; } void server::LoadModel(const HttpRequestPtr& req, std::function&& callback) { - auto engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - - auto ne = NormalizeEngine(engine_type); - - // We have not loaded engine yet, should load it before using it - if (engines_.find(ne) == engines_.end()) { - auto get_engine_path = [](std::string_view e) { - if (e == kLlamaRepo) { - return cortex_utils::kLlamaLibPath; - } else if (e == kOnnxRepo) { - return cortex_utils::kOnnxLibPath; - } else if (e == kTrtLlmRepo) { - return cortex_utils::kTensorrtLlmPath; - } - return cortex_utils::kLlamaLibPath; - }; - - try { - if (ne == kLlamaRepo) { - cortex::cpuid::CpuInfo cpu_info; - LOG_INFO << "CPU instruction set: " << cpu_info.to_string(); - } - - std::string abs_path = - (getenv("ENGINE_PATH") - ? getenv("ENGINE_PATH") - : file_manager_utils::GetCortexDataPath().string()) + - get_engine_path(ne); -#if defined(_WIN32) - // TODO(?) If we only allow to load an engine at a time, the logic is simpler. - // We would like to support running multiple engines at the same time. Therefore, - // the adding/removing dll directory logic is quite complicated: - // 1. If llamacpp is loaded and new requested engine is tensorrt-llm: - // Unload the llamacpp dll directory then load the tensorrt-llm - // 2. If tensorrt-llm is loaded and new requested engine is llamacpp: - // Do nothing, llamacpp can re-use tensorrt-llm dependencies (need to be tested careful) - // 3. Add dll directory if met other conditions - - auto add_dll = [this](const std::string& e_type, const std::string& p) { - auto ws = std::wstring(p.begin(), p.end()); - if (auto cookie = AddDllDirectory(ws.c_str()); cookie != 0) { - LOG_INFO << "Added dll directory: " << p; - engines_[e_type].cookie = cookie; - } else { - LOG_WARN << "Could not add dll directory: " << p; - } - }; - - if (IsEngineLoaded(kLlamaRepo) && ne == kTrtLlmRepo) { - // Remove llamacpp dll directory - if (!RemoveDllDirectory(engines_[kLlamaRepo].cookie)) { - LOG_INFO << "Could not remove dll directory: " << kLlamaRepo; - } else { - LOG_WARN << "Removed dll directory: " << kLlamaRepo; - } - - add_dll(ne, abs_path); - } else if (IsEngineLoaded(kTrtLlmRepo) && ne == kLlamaRepo) { - // Do nothing - } else { - add_dll(ne, abs_path); - } -#endif - engines_[ne].dl = std::make_unique(abs_path, "engine"); - - } catch (const cortex_cpp::dylib::load_error& e) { - LOG_ERROR << "Could not load engine: " << e.what(); - engines_.erase(ne); - - Json::Value res; - res["message"] = "Could not load engine " + engine_type; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k500InternalServerError); - callback(resp); - return; - } - cur_engine_type_ = ne; - - auto func = engines_[ne].dl->get_function("get_engine"); - engines_[ne].engine = func(); - - auto& en = std::get(engines_[ne].engine); - if (ne == kLlamaRepo) { //fix for llamacpp engine first - auto config = file_manager_utils::GetCortexConfig(); - if (en->IsSupported("SetFileLogger")) { - en->SetFileLogger(config.maxLogLines, - (std::filesystem::path(config.logFolderPath) / - std::filesystem::path(config.logLlamaCppPath)) - .string()); - } else { - LOG_WARN << "Method SetFileLogger is not supported yet"; - } - } - LOG_INFO << "Loaded engine: " << engine_type; - } - - LOG_TRACE << "Load model"; - auto& en = std::get(engines_[ne].engine); - en->LoadModel(req->getJsonObject(), [cb = std::move(callback)]( - Json::Value status, Json::Value res) { - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode( - static_cast(status["status_code"].asInt())); - cb(resp); - }); + auto ir = inference_svc_.LoadModel(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); + callback(resp); LOG_TRACE << "Done load model"; } void server::UnloadEngine( const HttpRequestPtr& req, std::function&& callback) { - std::string engine_type; - if (!HasFieldInReq(req, "engine")) { - engine_type = kLlamaRepo; - } else { - engine_type = - (*(req->getJsonObject())).get("engine", kLlamaRepo).asString(); - } - - auto ne = NormalizeEngine(engine_type); - - if (!IsEngineLoaded(ne)) { - Json::Value res; - res["message"] = "Engine is not loaded yet"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); - callback(resp); - LOG_WARN << "Engine is not loaded yet"; - return; - } - - EngineI* e = std::get(engines_[ne].engine); - delete e; -#if defined(_WIN32) - if (!RemoveDllDirectory(engines_[ne].cookie)) { - LOG_WARN << "Could not remove dll directory: " << engine_type; - } else { - LOG_INFO << "Removed dll directory: " << engine_type; - } -#endif - engines_.erase(ne); - LOG_INFO << "Unloaded engine " + engine_type; - Json::Value res; - res["message"] = "Unloaded engine " + engine_type; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k200OK); + auto ir = inference_svc_.UnloadEngine(req->getJsonObject()); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(std::get<1>(ir)); + resp->setStatusCode( + static_cast(std::get<0>(ir)["status_code"].asInt())); callback(resp); } void server::ProcessStreamRes(std::function cb, - std::shared_ptr q) { + std::shared_ptr q) { auto err_or_done = std::make_shared(false); auto chunked_content_provider = [q, err_or_done](char* buf, std::size_t buf_size) -> std::size_t { @@ -464,7 +166,7 @@ void server::ProcessStreamRes(std::function cb, } void server::ProcessNonStreamRes(std::function cb, - SyncQueue& q) { + services::SyncQueue& q) { auto [status, res] = q.wait_and_pop(); auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); resp->setStatusCode( @@ -472,32 +174,4 @@ void server::ProcessNonStreamRes(std::function cb, cb(resp); } -bool server::IsEngineLoaded(const std::string& e) { - return engines_.find(e) != engines_.end(); -} - -bool server::HasFieldInReq( - const HttpRequestPtr& req, - std::function& callback, - const std::string& field) { - if (auto o = req->getJsonObject(); !o || (*o)[field].isNull()) { - Json::Value res; - res["message"] = "No " + field + " field in request body"; - auto resp = cortex_utils::CreateCortexHttpJsonResponse(res); - resp->setStatusCode(k409Conflict); - callback(resp); - LOG_WARN << "No " << field << " field in request body"; - return false; - } - return true; -} - -bool server::HasFieldInReq(const HttpRequestPtr& req, - const std::string& field) { - if (auto o = req->getJsonObject(); !o || (*o)[field].isNull()) { - return false; - } - return true; -} - } // namespace inferences diff --git a/engine/controllers/server.h b/engine/controllers/server.h index f1fe89bd5..1eb4203ca 100644 --- a/engine/controllers/server.h +++ b/engine/controllers/server.h @@ -17,9 +17,7 @@ #include #include "common/base.h" -#include "cortex-common/EngineI.h" -#include "cortex-common/cortexpythoni.h" -#include "utils/dylib.h" +#include "services/inference_service.h" #include "utils/json.hpp" #ifndef SERVER_VERBOSE @@ -98,66 +96,11 @@ class server : public drogon::HttpController, private: void ProcessStreamRes(std::function cb, - std::shared_ptr q); + std::shared_ptr q); void ProcessNonStreamRes(std::function cb, - SyncQueue& q); - bool IsEngineLoaded(const std::string& e); - - bool HasFieldInReq(const HttpRequestPtr& req, - std::function& callback, - const std::string& field); - - bool HasFieldInReq(const HttpRequestPtr& req, const std::string& field); - - private: - struct SyncQueue { - void push(std::pair&& p) { - std::unique_lock l(mtx); - q.push(p); - cond.notify_one(); - } - - std::pair wait_and_pop() { - std::unique_lock l(mtx); - cond.wait(l, [this] { return !q.empty(); }); - auto res = q.front(); - q.pop(); - return res; - } - - std::mutex mtx; - std::condition_variable cond; - // Status and result - std::queue> q; - }; - struct StreamStatus { - void Done() { - std::unique_lock l(m); - stream_done = true; - cv.notify_all(); - } - - void Wait() { - std::unique_lock l(m); - cv.wait(l, [this] { return stream_done; }); - } - - private: - std::mutex m; - std::condition_variable cv; - bool stream_done = false; - }; + services::SyncQueue& q); private: - using EngineV = std::variant; - struct EngineInfo { - std::unique_ptr dl; - EngineV engine; -#if defined(_WIN32) - DLL_DIRECTORY_COOKIE cookie; -#endif - }; - std::unordered_map engines_; - std::string cur_engine_type_; + services::InferenceService inference_svc_; }; }; // namespace inferences diff --git a/engine/main.cc b/engine/main.cc index 985042a5d..43eecd9ec 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -46,8 +46,10 @@ void RunServer() { std::filesystem::path(config.logFolderPath) / std::filesystem::path(cortex_utils::logs_folder)); trantor::FileLogger asyncFileLogger; - asyncFileLogger.setFileName((std::filesystem::path(config.logFolderPath) / - std::filesystem::path(cortex_utils::logs_base_name)).string()); + asyncFileLogger.setFileName( + (std::filesystem::path(config.logFolderPath) / + std::filesystem::path(cortex_utils::logs_base_name)) + .string()); asyncFileLogger.setMaxLines(config.maxLogLines); // Keep last 100000 lines asyncFileLogger.startLogging(); trantor::Logger::setOutputFunction( @@ -111,8 +113,8 @@ int main(int argc, char* argv[]) { std::string py_home_path = (argc > 3) ? argv[3] : ""; std::unique_ptr dl; try { - std::string abs_path = cortex_utils::GetCurrentPath() + - cortex_utils::kPythonRuntimeLibPath; + std::string abs_path = + cortex_utils::GetCurrentPath() + kPythonRuntimeLibPath; dl = std::make_unique(abs_path, "engine"); } catch (const cortex_cpp::dylib::load_error& e) { LOG_ERROR << "Could not load engine: " << e.what(); diff --git a/engine/services/inference_service.cc b/engine/services/inference_service.cc new file mode 100644 index 000000000..51457b931 --- /dev/null +++ b/engine/services/inference_service.cc @@ -0,0 +1,380 @@ +#include "inference_service.h" +#include "utils/cpuid/cpu_info.h" +#include "utils/engine_constants.h" +#include "utils/file_manager_utils.h" + +namespace services { + +namespace { +// Need to change this after we rename repositories +std::string NormalizeEngine(const std::string& engine) { + if (engine == kLlamaEngine) { + return kLlamaRepo; + } else if (engine == kOnnxEngine) { + return kOnnxRepo; + } else if (engine == kTrtLlmEngine) { + return kTrtLlmRepo; + } + return engine; +}; +} // namespace + +cpp::result InferenceService::HandleChatCompletion( + std::shared_ptr q, std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + auto ne = NormalizeEngine(engine_type); + if (!IsEngineLoaded(ne)) { + Json::Value res; + res["message"] = "Engine is not loaded yet"; + Json::Value stt; + stt["status_code"] = 409; + LOG_WARN << "Engine is not loaded yet"; + return cpp::fail(std::make_pair(stt, res)); + } + std::get(engines_[ne].engine) + ->HandleChatCompletion(json_body, + [q](Json::Value status, Json::Value res) { + q->push(std::make_pair(status, res)); + }); + return {}; +} + +cpp::result InferenceService::HandleEmbedding( + std::shared_ptr q, std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + + auto ne = NormalizeEngine(engine_type); + if (!IsEngineLoaded(ne)) { + Json::Value res; + res["message"] = "Engine is not loaded yet"; + Json::Value stt; + stt["status_code"] = 409; + LOG_WARN << "Engine is not loaded yet"; + return cpp::fail(std::make_pair(stt, res)); + } + std::get(engines_["llama-cpp"].engine) + ->HandleEmbedding(json_body, [q](Json::Value status, Json::Value res) { + q->push(std::make_pair(status, res)); + }); + return {}; +} + +InferResult InferenceService::LoadModel( + std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + + auto ne = NormalizeEngine(engine_type); + Json::Value r; + Json::Value stt; + // We have not loaded engine yet, should load it before using it + if (engines_.find(ne) == engines_.end()) { + auto get_engine_path = [](std::string_view e) { + if (e == kLlamaRepo) { + return kLlamaLibPath; + } else if (e == kOnnxRepo) { + return kOnnxLibPath; + } else if (e == kTrtLlmRepo) { + return kTensorrtLlmPath; + } + return kLlamaLibPath; + }; + try { + if (ne == kLlamaRepo) { + cortex::cpuid::CpuInfo cpu_info; + LOG_INFO << "CPU instruction set: " << cpu_info.to_string(); + } + + std::string abs_path = + (getenv("ENGINE_PATH") + ? getenv("ENGINE_PATH") + : file_manager_utils::GetCortexDataPath().string()) + + get_engine_path(ne); +#if defined(_WIN32) + // TODO(?) If we only allow to load an engine at a time, the logic is simpler. + // We would like to support running multiple engines at the same time. Therefore, + // the adding/removing dll directory logic is quite complicated: + // 1. If llamacpp is loaded and new requested engine is tensorrt-llm: + // Unload the llamacpp dll directory then load the tensorrt-llm + // 2. If tensorrt-llm is loaded and new requested engine is llamacpp: + // Do nothing, llamacpp can re-use tensorrt-llm dependencies (need to be tested careful) + // 3. Add dll directory if met other conditions + + auto add_dll = [this](const std::string& e_type, const std::string& p) { + auto ws = std::wstring(p.begin(), p.end()); + if (auto cookie = AddDllDirectory(ws.c_str()); cookie != 0) { + LOG_INFO << "Added dll directory: " << p; + engines_[e_type].cookie = cookie; + } else { + LOG_WARN << "Could not add dll directory: " << p; + } + }; + + if (IsEngineLoaded(kLlamaRepo) && ne == kTrtLlmRepo) { + // Remove llamacpp dll directory + if (!RemoveDllDirectory(engines_[kLlamaRepo].cookie)) { + LOG_INFO << "Could not remove dll directory: " << kLlamaRepo; + } else { + LOG_WARN << "Removed dll directory: " << kLlamaRepo; + } + + add_dll(ne, abs_path); + } else if (IsEngineLoaded(kTrtLlmRepo) && ne == kLlamaRepo) { + // Do nothing + } else { + add_dll(ne, abs_path); + } +#endif + engines_[ne].dl = std::make_unique(abs_path, "engine"); + + } catch (const cortex_cpp::dylib::load_error& e) { + LOG_ERROR << "Could not load engine: " << e.what(); + engines_.erase(ne); + + r["message"] = "Could not load engine " + ne; + stt["status_code"] = 500; + return std::make_pair(stt, r); + } + + auto func = engines_[ne].dl->get_function("get_engine"); + engines_[ne].engine = func(); + + auto& en = std::get(engines_[ne].engine); + if (ne == kLlamaRepo) { //fix for llamacpp engine first + auto config = file_manager_utils::GetCortexConfig(); + if (en->IsSupported("SetFileLogger")) { + en->SetFileLogger(config.maxLogLines, + (std::filesystem::path(config.logFolderPath) / + std::filesystem::path(config.logLlamaCppPath)) + .string()); + } else { + LOG_WARN << "Method SetFileLogger is not supported yet"; + } + } + LOG_INFO << "Loaded engine: " << ne; + } + + // LOG_TRACE << "Load model"; + auto& en = std::get(engines_[ne].engine); + en->LoadModel(json_body, [&stt, &r](Json::Value status, Json::Value res) { + stt = status; + r = res; + }); + return std::make_pair(stt, r); +} + +InferResult InferenceService::UnloadModel( + std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + + auto ne = NormalizeEngine(engine_type); + Json::Value r; + Json::Value stt; + if (!IsEngineLoaded(ne)) { + r["message"] = "Engine is not loaded yet"; + stt["status_code"] = 409; + LOG_WARN << "Engine is not loaded yet"; + return std::make_pair(stt, r); + } + LOG_TRACE << "Start unload model"; + std::get(engines_[ne].engine) + ->UnloadModel(json_body, [&r, &stt](Json::Value status, Json::Value res) { + stt = status; + r = res; + }); + return std::make_pair(stt, r); +} + +InferResult InferenceService::GetModelStatus( + std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + + auto ne = NormalizeEngine(engine_type); + Json::Value r; + Json::Value stt; + + if (!IsEngineLoaded(ne)) { + r["message"] = "Engine is not loaded yet"; + stt["status_code"] = 409; + LOG_WARN << "Engine is not loaded yet"; + return std::make_pair(stt, r); + } + + LOG_TRACE << "Start to get model status"; + std::get(engines_[ne].engine) + ->GetModelStatus(json_body, + [&stt, &r](Json::Value status, Json::Value res) { + stt = status; + r = res; + }); + return std::make_pair(stt, r); +} + +InferResult InferenceService::GetModels( + std::shared_ptr json_body) { + Json::Value r; + Json::Value stt; + if (engines_.empty()) { + r["message"] = "Engine is not loaded yet"; + stt["status_code"] = 409; + return std::make_pair(stt, r); + } + + LOG_TRACE << "Start to get models"; + Json::Value resp_data(Json::arrayValue); + for (auto const& [k, v] : engines_) { + auto e = std::get(v.engine); + if (e->IsSupported("GetModels")) { + e->GetModels(json_body, + [&resp_data](Json::Value status, Json::Value res) { + for (auto r : res["data"]) { + resp_data.append(r); + } + }); + } + } + Json::Value root; + root["data"] = resp_data; + root["object"] = "list"; + stt["status_code"] = 200; + return std::make_pair(stt, root); + // LOG_TRACE << "Done get models"; +} + +Json::Value InferenceService::GetEngines( + std::shared_ptr json_body) { + Json::Value res; + Json::Value engine_array(Json::arrayValue); + for (const auto& [s, _] : engines_) { + Json::Value val; + val["id"] = s; + val["object"] = "engine"; + engine_array.append(val); + } + + res["object"] = "list"; + res["data"] = engine_array; + return res; +} + +InferResult InferenceService::FineTuning( + std::shared_ptr json_body) { + std::string ne = kPythonRuntimeRepo; + Json::Value r; + Json::Value stt; + + if (engines_.find(ne) == engines_.end()) { + try { + std::string abs_path = + (getenv("ENGINE_PATH") + ? getenv("ENGINE_PATH") + : file_manager_utils::GetCortexDataPath().string()) + + kPythonRuntimeLibPath; + engines_[ne].dl = std::make_unique(abs_path, "engine"); + } catch (const cortex_cpp::dylib::load_error& e) { + + LOG_ERROR << "Could not load engine: " << e.what(); + engines_.erase(ne); + + Json::Value res; + r["message"] = "Could not load engine " + ne; + stt["status_code"] = 500; + return std::make_pair(stt, r); + } + + auto func = + engines_[ne].dl->get_function("get_engine"); + engines_[ne].engine = func(); + LOG_INFO << "Loaded engine: " << ne; + } + + LOG_TRACE << "Start to fine-tuning"; + auto& en = std::get(engines_[ne].engine); + if (en->IsSupported("HandlePythonFileExecutionRequest")) { + en->HandlePythonFileExecutionRequest( + json_body, [&r, &stt](Json::Value status, Json::Value res) { + r = res; + stt = status; + }); + } else { + LOG_WARN << "Method is not supported yet"; + r["message"] = "Method is not supported yet"; + stt["status_code"] = 500; + return std::make_pair(stt, r); + } + LOG_TRACE << "Done fine-tuning"; + return std::make_pair(stt, r); +} + +InferResult InferenceService::UnloadEngine( + std::shared_ptr json_body) { + std::string engine_type; + if (!HasFieldInReq(json_body, "engine")) { + engine_type = kLlamaRepo; + } else { + engine_type = (*(json_body)).get("engine", kLlamaRepo).asString(); + } + + auto ne = NormalizeEngine(engine_type); + Json::Value r; + Json::Value stt; + + if (!IsEngineLoaded(ne)) { + r["message"] = "Engine is not loaded yet"; + stt["status_code"] = 409; + LOG_WARN << "Engine is not loaded yet"; + return std::make_pair(stt, r); + } + + EngineI* e = std::get(engines_[ne].engine); + delete e; +#if defined(_WIN32) + if (!RemoveDllDirectory(engines_[ne].cookie)) { + LOG_WARN << "Could not remove dll directory: " << ne; + } else { + LOG_INFO << "Removed dll directory: " << ne; + } +#endif + engines_.erase(ne); + LOG_INFO << "Unloaded engine " + ne; + r["message"] = "Unloaded engine " + ne; + stt["status_code"] = 200; + return std::make_pair(stt, r); +} + +bool InferenceService::IsEngineLoaded(const std::string& e) { + return engines_.find(e) != engines_.end(); +} + +bool InferenceService::HasFieldInReq(std::shared_ptr json_body, + const std::string& field) { + if (!json_body || (*json_body)[field].isNull()) { + return false; + } + return true; +} +} // namespace services \ No newline at end of file diff --git a/engine/services/inference_service.h b/engine/services/inference_service.h new file mode 100644 index 000000000..16c8e3b99 --- /dev/null +++ b/engine/services/inference_service.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include +#include "common/base.h" +#include "cortex-common/EngineI.h" +#include "cortex-common/cortexpythoni.h" +#include "utils/dylib.h" +#include "utils/json.hpp" +#include "utils/result.hpp" + +namespace services { +// Status and result +using InferResult = std::pair; + +struct SyncQueue { + void push(InferResult&& p) { + std::unique_lock l(mtx); + q.push(p); + cond.notify_one(); + } + + InferResult wait_and_pop() { + std::unique_lock l(mtx); + cond.wait(l, [this] { return !q.empty(); }); + auto res = q.front(); + q.pop(); + return res; + } + + std::mutex mtx; + std::condition_variable cond; + std::queue q; +}; + +class InferenceService { + public: + cpp::result HandleChatCompletion( + std::shared_ptr q, std::shared_ptr json_body); + + cpp::result HandleEmbedding( + std::shared_ptr q, std::shared_ptr json_body); + + InferResult LoadModel(std::shared_ptr json_body); + + InferResult UnloadModel(std::shared_ptr json_body); + + InferResult GetModelStatus(std::shared_ptr json_body); + + InferResult GetModels(std::shared_ptr json_body); + + Json::Value GetEngines(std::shared_ptr json_body); + + InferResult FineTuning(std::shared_ptr json_body); + + InferResult UnloadEngine(std::shared_ptr json_body); + + private: + bool IsEngineLoaded(const std::string& e); + + bool HasFieldInReq(std::shared_ptr json_body, + const std::string& field); + + private: + using EngineV = std::variant; + struct EngineInfo { + std::unique_ptr dl; + EngineV engine; +#if defined(_WIN32) + DLL_DIRECTORY_COOKIE cookie; +#endif + }; + // TODO(sang) move engines_ into engine service? + std::unordered_map engines_; +}; +} // namespace services \ No newline at end of file diff --git a/engine/utils/cortex_utils.h b/engine/utils/cortex_utils.h index 9673f0c1a..f0c2a5c1b 100644 --- a/engine/utils/cortex_utils.h +++ b/engine/utils/cortex_utils.h @@ -27,11 +27,6 @@ #endif namespace cortex_utils { -constexpr static auto kLlamaLibPath = "/engines/cortex.llamacpp"; -constexpr static auto kPythonRuntimeLibPath = "/engines/cortex.python"; -constexpr static auto kOnnxLibPath = "/engines/cortex.onnx"; -constexpr static auto kTensorrtLlmPath = "/engines/cortex.tensorrt-llm"; - inline std::string models_folder = "./models"; inline std::string logs_folder = "./logs"; inline std::string logs_base_name = "./logs/cortex.log"; diff --git a/engine/utils/engine_constants.h b/engine/utils/engine_constants.h index 63334b860..dbc1e223b 100644 --- a/engine/utils/engine_constants.h +++ b/engine/utils/engine_constants.h @@ -7,4 +7,9 @@ constexpr const auto kTrtLlmEngine = "tensorrt-llm"; constexpr const auto kOnnxRepo = "cortex.onnx"; constexpr const auto kLlamaRepo = "cortex.llamacpp"; constexpr const auto kTrtLlmRepo = "cortex.tensorrt-llm"; -constexpr const auto kPythonRuntimeRepo = "cortex.python"; \ No newline at end of file +constexpr const auto kPythonRuntimeRepo = "cortex.python"; + +constexpr const auto kLlamaLibPath = "/engines/cortex.llamacpp"; +constexpr const auto kPythonRuntimeLibPath = "/engines/cortex.python"; +constexpr const auto kOnnxLibPath = "/engines/cortex.onnx"; +constexpr const auto kTensorrtLlmPath = "/engines/cortex.tensorrt-llm"; \ No newline at end of file From 472f7162b9687d4e056ddb5b6eb1dde2fe988e2f Mon Sep 17 00:00:00 2001 From: vansangpfiev Date: Mon, 14 Oct 2024 14:02:55 +0700 Subject: [PATCH 2/2] fix: update status --- engine/services/inference_service.cc | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/engine/services/inference_service.cc b/engine/services/inference_service.cc index 51457b931..08dbc33aa 100644 --- a/engine/services/inference_service.cc +++ b/engine/services/inference_service.cc @@ -17,6 +17,11 @@ std::string NormalizeEngine(const std::string& engine) { } return engine; }; + +constexpr const int k200OK = 200; +constexpr const int k400BadRequest = 400; +constexpr const int k409Conflict = 409; +constexpr const int k500InternalServerError = 500; } // namespace cpp::result InferenceService::HandleChatCompletion( @@ -32,7 +37,7 @@ cpp::result InferenceService::HandleChatCompletion( Json::Value res; res["message"] = "Engine is not loaded yet"; Json::Value stt; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; LOG_WARN << "Engine is not loaded yet"; return cpp::fail(std::make_pair(stt, res)); } @@ -58,7 +63,7 @@ cpp::result InferenceService::HandleEmbedding( Json::Value res; res["message"] = "Engine is not loaded yet"; Json::Value stt; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; LOG_WARN << "Engine is not loaded yet"; return cpp::fail(std::make_pair(stt, res)); } @@ -146,7 +151,7 @@ InferResult InferenceService::LoadModel( engines_.erase(ne); r["message"] = "Could not load engine " + ne; - stt["status_code"] = 500; + stt["status_code"] = k500InternalServerError; return std::make_pair(stt, r); } @@ -191,7 +196,7 @@ InferResult InferenceService::UnloadModel( Json::Value stt; if (!IsEngineLoaded(ne)) { r["message"] = "Engine is not loaded yet"; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; LOG_WARN << "Engine is not loaded yet"; return std::make_pair(stt, r); } @@ -219,7 +224,7 @@ InferResult InferenceService::GetModelStatus( if (!IsEngineLoaded(ne)) { r["message"] = "Engine is not loaded yet"; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; LOG_WARN << "Engine is not loaded yet"; return std::make_pair(stt, r); } @@ -240,7 +245,7 @@ InferResult InferenceService::GetModels( Json::Value stt; if (engines_.empty()) { r["message"] = "Engine is not loaded yet"; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; return std::make_pair(stt, r); } @@ -260,7 +265,7 @@ InferResult InferenceService::GetModels( Json::Value root; root["data"] = resp_data; root["object"] = "list"; - stt["status_code"] = 200; + stt["status_code"] = k200OK; return std::make_pair(stt, root); // LOG_TRACE << "Done get models"; } @@ -302,7 +307,7 @@ InferResult InferenceService::FineTuning( Json::Value res; r["message"] = "Could not load engine " + ne; - stt["status_code"] = 500; + stt["status_code"] = k500InternalServerError; return std::make_pair(stt, r); } @@ -323,7 +328,7 @@ InferResult InferenceService::FineTuning( } else { LOG_WARN << "Method is not supported yet"; r["message"] = "Method is not supported yet"; - stt["status_code"] = 500; + stt["status_code"] = k500InternalServerError; return std::make_pair(stt, r); } LOG_TRACE << "Done fine-tuning"; @@ -345,7 +350,7 @@ InferResult InferenceService::UnloadEngine( if (!IsEngineLoaded(ne)) { r["message"] = "Engine is not loaded yet"; - stt["status_code"] = 409; + stt["status_code"] = k409Conflict; LOG_WARN << "Engine is not loaded yet"; return std::make_pair(stt, r); } @@ -362,7 +367,7 @@ InferResult InferenceService::UnloadEngine( engines_.erase(ne); LOG_INFO << "Unloaded engine " + ne; r["message"] = "Unloaded engine " + ne; - stt["status_code"] = 200; + stt["status_code"] = k200OK; return std::make_pair(stt, r); }