Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions context/llama_server_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ struct llama_server_context {

std::tie(model, ctx) = llama_init_from_gpt_params(params);
if (model == nullptr) {
LOG_ERROR_LLAMA("unable to load model", {{"model", params.model}});
LOG_ERROR_LLAMA("llama.cpp unable to load model", {{"model", params.model}});
return false;
}

Expand All @@ -551,6 +551,10 @@ struct llama_server_context {
}
}

if (ctx == nullptr) {
LOG_ERROR_LLAMA("Unable to get llama.cpp context", {});
return false;
}
n_ctx = llama_n_ctx(ctx);

add_bos_token = llama_should_add_bos_token(model);
Expand Down Expand Up @@ -578,7 +582,11 @@ struct llama_server_context {
slots.push_back(slot);
}

batch = llama_batch_init(n_ctx, 0, params.n_parallel);
try {
batch = llama_batch_init(n_ctx, 0, params.n_parallel);
} catch (const std::exception& e) {
LOG_ERROR_LLAMA("Failed to allocate llama.cpp batch metadata" , {{"exception", e.what()}, {"n_tokens_alloc", n_ctx}, {"embd", 0}, {"n_seq_max", params.n_parallel}});
}

// empty system prompt
system_prompt = "";
Expand Down Expand Up @@ -1296,7 +1304,9 @@ struct llama_server_context {
}

if (queue_results[i].id == task_id) {
assert(queue_results[i].multitask_id == -1);
if (queue_results[i].multitask_id != -1) {
LOG_ERROR_LLAMA("Incorrect multitask ID", {{"task_id", task_id}});
}
task_result res = queue_results[i];
queue_results.erase(queue_results.begin() + i);
return res;
Expand Down
116 changes: 71 additions & 45 deletions controllers/llamaCPP.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
#include <fstream>
#include <iostream>
#include "log.h"
#include "utils/nitro_utils.h"
#include "utils/logging_utils.h"

// External
#include "common.h"
#include "llama.h"

#include "log.h"
#include "utils/nitro_utils.h"

using namespace inferences;
using json = nlohmann::json;

Expand Down Expand Up @@ -50,6 +49,7 @@ std::shared_ptr<inferenceState> create_inference_state(llamaCPP* instance) {
bool llamaCPP::CheckModelLoaded(
std::function<void(const HttpResponsePtr&)>& callback) {
if (!llama.model_loaded_external) {
LOG_ERROR << "Model has not been loaded";
Json::Value jsonResp;
jsonResp["message"] =
"Model has not been loaded, please load model into nitro";
Expand Down Expand Up @@ -159,6 +159,7 @@ llamaCPP::~llamaCPP() {
void llamaCPP::WarmupModel() {
json pseudo;

LOG_INFO << "Warm-up model";
pseudo["prompt"] = "Hello";
pseudo["n_predict"] = 2;
pseudo["stream"] = false;
Expand Down Expand Up @@ -187,6 +188,8 @@ void llamaCPP::InferenceImpl(
inferences::ChatCompletionRequest&& completion,
std::function<void(const HttpResponsePtr&)>& callback) {
std::string formatted_output = pre_prompt;
int request_id = ++no_of_requests;
LOG_INFO_REQUEST(request_id) << "Generating reponse for inference request";

json data;
json stopWords;
Expand All @@ -196,9 +199,9 @@ void llamaCPP::InferenceImpl(
// Increase number of chats received and clean the prompt
no_of_chats++;
if (no_of_chats % clean_cache_threshold == 0) {
LOG_INFO << "Clean cache threshold reached!";
LOG_INFO_REQUEST(request_id) << "Clean cache threshold reached!";
llama.kv_cache_clear();
LOG_INFO << "Cache cleaned";
LOG_INFO_REQUEST(request_id) << "Cache cleaned";
}

// Default values to enable auto caching
Expand All @@ -207,9 +210,7 @@ void llamaCPP::InferenceImpl(

// Passing load value
data["repeat_last_n"] = this->repeat_last_n;

LOG_INFO << "Messages:" << completion.messages.toStyledString();
LOG_INFO << "Stop:" << completion.stop.toStyledString();
LOG_INFO_REQUEST(request_id) << "Stop words:" << completion.stop.toStyledString();

data["stream"] = completion.stream;
data["n_predict"] = completion.max_tokens;
Expand Down Expand Up @@ -268,18 +269,18 @@ void llamaCPP::InferenceImpl(
auto image_url = content_piece["image_url"]["url"].asString();
std::string base64_image_data;
if (image_url.find("http") != std::string::npos) {
LOG_INFO << "Remote image detected but not supported yet";
LOG_INFO_REQUEST(request_id) << "Remote image detected but not supported yet";
} else if (image_url.find("data:image") != std::string::npos) {
LOG_INFO << "Base64 image detected";
LOG_INFO_REQUEST(request_id) << "Base64 image detected";
base64_image_data = nitro_utils::extractBase64(image_url);
LOG_INFO << base64_image_data;
LOG_INFO_REQUEST(request_id) << base64_image_data;
} else {
LOG_INFO << "Local image detected";
LOG_INFO_REQUEST(request_id) << "Local image detected";
nitro_utils::processLocalImage(
image_url, [&](const std::string& base64Image) {
base64_image_data = base64Image;
});
LOG_INFO << base64_image_data;
LOG_INFO_REQUEST(request_id) << base64_image_data;
}
content_piece_image_data["data"] = base64_image_data;

Expand All @@ -306,7 +307,7 @@ void llamaCPP::InferenceImpl(
}
}
formatted_output += ai_prompt;
LOG_INFO << formatted_output;
LOG_INFO_REQUEST(request_id) << formatted_output;
}

data["prompt"] = formatted_output;
Expand All @@ -322,35 +323,36 @@ void llamaCPP::InferenceImpl(
bool is_streamed = data["stream"];
// Enable full message debugging
#ifdef DEBUG
LOG_INFO << "Current completion text";
LOG_INFO << formatted_output;
LOG_INFO_REQUEST(request_id) << "Current completion text";
LOG_INFO_REQUEST(request_id) << formatted_output;
#endif

if (is_streamed) {
LOG_INFO_REQUEST(request_id) << "Streamed, waiting for respone";
auto state = create_inference_state(this);
auto chunked_content_provider =
[state, data](char* pBuffer, std::size_t nBuffSize) -> std::size_t {
[state, data, request_id](char* pBuffer, std::size_t nBuffSize) -> std::size_t {
if (state->inference_status == PENDING) {
state->inference_status = RUNNING;
} else if (state->inference_status == FINISHED) {
return 0;
}

if (!pBuffer) {
LOG_INFO << "Connection closed or buffer is null. Reset context";
LOG_WARN_REQUEST(request_id) "Connection closed or buffer is null. Reset context";
state->inference_status = FINISHED;
return 0;
}

if (state->inference_status == EOS) {
LOG_INFO << "End of result";
LOG_INFO_REQUEST(request_id) << "End of result";
const std::string str =
"data: " +
create_return_json(nitro_utils::generate_random_string(20), "_", "",
"stop") +
"\n\n" + "data: [DONE]" + "\n\n";

LOG_VERBOSE("data stream", {{"to_send", str}});
LOG_VERBOSE("data stream", {{"request_id": request_id}, {"to_send", str}});
std::size_t nRead = std::min(str.size(), nBuffSize);
memcpy(pBuffer, str.data(), nRead);
state->inference_status = FINISHED;
Expand All @@ -370,7 +372,7 @@ void llamaCPP::InferenceImpl(
memcpy(pBuffer, str.data(), nRead);

if (result.stop) {
LOG_INFO << "reached result stop";
LOG_INFO_REQUEST(request_id) << "Reached result stop";
state->inference_status = EOS;
return nRead;
}
Expand All @@ -383,14 +385,14 @@ void llamaCPP::InferenceImpl(

return nRead;
} else {
LOG_INFO << "Error during inference";
LOG_ERROR_REQUEST(request_id) << "Error during inference";
}
state->inference_status = FINISHED;
return 0;
};
// Queued task
state->instance->queue->runTaskInQueue(
[callback, state, data, chunked_content_provider]() {
[callback, state, data, chunked_content_provider, request_id]() {
state->task_id =
state->instance->llama.request_completion(data, false, false, -1);

Expand All @@ -410,22 +412,22 @@ void llamaCPP::InferenceImpl(
retries += 1;
}
if (state->inference_status != RUNNING)
LOG_INFO << "Wait for task to be released:" << state->task_id;
LOG_INFO_REQUEST(request_id) << "Wait for task to be released:" << state->task_id;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
LOG_INFO << "Task completed, release it";
LOG_INFO_REQUEST(request_id) << "Task completed, release it";
// Request completed, release it
state->instance->llama.request_cancel(state->task_id);
LOG_INFO_REQUEST(request_id) << "Inference completed";
});
} else {
Json::Value respData;
auto resp = nitro_utils::nitroHttpResponse();
int task_id = llama.request_completion(data, false, false, -1);
LOG_INFO << "sent the non stream, waiting for respone";
LOG_INFO_REQUEST(request_id) << "Non stream, waiting for respone";
if (!json_value(data, "stream", false)) {
std::string completion_text;
task_result result = llama.next_result(task_id);
LOG_INFO << "Here is the result:" << result.error;
if (!result.error && result.stop) {
int prompt_tokens = result.result_json["tokens_evaluated"];
int predicted_tokens = result.result_json["tokens_predicted"];
Expand All @@ -435,9 +437,12 @@ void llamaCPP::InferenceImpl(
prompt_tokens, predicted_tokens);
resp->setBody(full_return);
} else {
resp->setBody("Internal error during inference");
respData["message"] = "Internal error during inference";
resp = nitro_utils::nitroHttpJsonResponse(respData);
LOG_ERROR_REQUEST(request_id) << "Error during inference";
}
callback(resp);
LOG_INFO_REQUEST(request_id) << "Inference completed";
}
}
}
Expand All @@ -458,10 +463,12 @@ void llamaCPP::Embedding(
void llamaCPP::EmbeddingImpl(
std::shared_ptr<Json::Value> jsonBody,
std::function<void(const HttpResponsePtr&)>& callback) {
int request_id = ++no_of_requests;
LOG_INFO_REQUEST(request_id) << "Generating reponse for embedding request";
// Queue embedding task
auto state = create_inference_state(this);

state->instance->queue->runTaskInQueue([this, state, jsonBody, callback]() {
state->instance->queue->runTaskInQueue([this, state, jsonBody, callback, request_id]() {
Json::Value responseData(Json::arrayValue);

if (jsonBody->isMember("input")) {
Expand Down Expand Up @@ -502,50 +509,58 @@ void llamaCPP::EmbeddingImpl(
resp->setBody(Json::writeString(Json::StreamWriterBuilder(), root));
resp->setContentTypeString("application/json");
callback(resp);
LOG_INFO_REQUEST(request_id) << "Embedding completed";
});
}

void llamaCPP::UnloadModel(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
Json::Value jsonResp;
jsonResp["message"] = "No model loaded";
if (llama.model_loaded_external) {
if (CheckModelLoaded(callback)) {
StopBackgroundTask();

llama_free(llama.ctx);
llama_free_model(llama.model);
llama.ctx = nullptr;
llama.model = nullptr;
jsonResp["message"] = "Model unloaded successfully";
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
callback(resp);
LOG_INFO << "Model unloaded successfully";
}
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
callback(resp);
return;
}

void llamaCPP::ModelStatus(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
Json::Value jsonResp;
bool is_model_loaded = llama.model_loaded_external;
if (is_model_loaded) {
if (CheckModelLoaded(callback)) {
jsonResp["model_loaded"] = is_model_loaded;
jsonResp["model_data"] = llama.get_model_props().dump();
} else {
jsonResp["model_loaded"] = is_model_loaded;
}

auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
callback(resp);
return;
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
callback(resp);
LOG_INFO << "Model status responded";
}
}

void llamaCPP::LoadModel(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {

if (!nitro_utils::isAVX2Supported() && ggml_cpu_has_avx2()) {
LOG_ERROR << "AVX2 is not supported by your processor";
Json::Value jsonResp;
jsonResp["message"] = "AVX2 is not supported by your processor, please download and replace the correct Nitro asset version";
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
resp->setStatusCode(drogon::k500InternalServerError);
callback(resp);
return;
}

if (llama.model_loaded_external) {
LOG_INFO << "model loaded";
LOG_INFO << "Model already loaded";
Json::Value jsonResp;
jsonResp["message"] = "Model already loaded";
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
Expand All @@ -568,6 +583,7 @@ void llamaCPP::LoadModel(
jsonResp["message"] = "Model loaded successfully";
auto resp = nitro_utils::nitroHttpJsonResponse(jsonResp);
callback(resp);
LOG_INFO << "Model loaded successfully";
}
}

Expand Down Expand Up @@ -602,7 +618,17 @@ bool llamaCPP::LoadModelImpl(std::shared_ptr<Json::Value> jsonBody) {
}
};

params.model = jsonBody->operator[]("llama_model_path").asString();
Json::Value model_path = jsonBody->operator[]("llama_model_path");
if (model_path.isNull()) {
LOG_ERROR << "Missing model path in request";
} else {
if (std::filesystem::exists(std::filesystem::path(model_path.asString()))) {
params.model = model_path.asString();
} else {
LOG_ERROR << "Could not find model in path " << model_path.asString();
}
}

params.n_gpu_layers = jsonBody->get("ngl", 100).asInt();
params.n_ctx = jsonBody->get("ctx_len", 2048).asInt();
params.embedding = jsonBody->get("embedding", true).asBool();
Expand Down Expand Up @@ -681,7 +707,7 @@ void llamaCPP::StopBackgroundTask() {
if (llama.model_loaded_external) {
llama.model_loaded_external = false;
llama.condition_tasks.notify_one();
LOG_INFO << "changed to false";
LOG_INFO << "Background task stopped! ";
if (backgroundThread.joinable()) {
backgroundThread.join();
}
Expand Down
1 change: 1 addition & 0 deletions controllers/llamaCPP.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class llamaCPP : public drogon::HttpController<llamaCPP>,
std::string pre_prompt;
int repeat_last_n;
bool caching_enabled;
std::atomic<int> no_of_requests = 0;
std::atomic<int> no_of_chats = 0;
int clean_cache_threshold;
std::string grammar_file_content;
Expand Down
5 changes: 5 additions & 0 deletions utils/logging_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once

#define LOG_INFO_REQUEST(RID) LOG_INFO << "Request " << RID << ": "
#define LOG_WARN_REQUEST(RID) LOG_WARN << "Request " << RID << ": "
#define LOG_ERROR_REQUEST(RID) LOG_ERROR << "Request " << RID << ": "
Loading