From a7300e942f46cc5b47ce49401e12bbaa63c21bb6 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 13 Nov 2025 15:55:53 +0100 Subject: [PATCH 1/5] chore(deps): bump llama.cpp to '92bb442ad999a0d52df0af2730cd861012e8ac5c' Signed-off-by: Ettore Di Giacinto --- backend/cpp/llama-cpp/Makefile | 2 +- backend/cpp/llama-cpp/grpc-server.cpp | 242 +++++++++++++------------- 2 files changed, 120 insertions(+), 124 deletions(-) diff --git a/backend/cpp/llama-cpp/Makefile b/backend/cpp/llama-cpp/Makefile index 091c3386dd6a..02f12013f560 100644 --- a/backend/cpp/llama-cpp/Makefile +++ b/backend/cpp/llama-cpp/Makefile @@ -1,5 +1,5 @@ -LLAMA_VERSION?=7d019cff744b73084b15ca81ba9916f3efab1223 +LLAMA_VERSION?=92bb442ad999a0d52df0af2730cd861012e8ac5c LLAMA_REPO?=https://github.com/ggerganov/llama.cpp CMAKE_ARGS?= diff --git a/backend/cpp/llama-cpp/grpc-server.cpp b/backend/cpp/llama-cpp/grpc-server.cpp index a71f43aec6c8..72ed1b09b568 100644 --- a/backend/cpp/llama-cpp/grpc-server.cpp +++ b/backend/cpp/llama-cpp/grpc-server.cpp @@ -579,7 +579,8 @@ class BackendServiceImpl final : public backend::Backend::Service { auto completion_id = gen_chatcmplid(); - std::unordered_set task_ids; + // need to store the reader as a pointer, so that it won't be destroyed when the handle returns + const auto rd = std::make_shared(ctx_server); try { std::vector tasks; @@ -871,18 +872,77 @@ class BackendServiceImpl final : public backend::Backend::Service { tasks.push_back(std::move(task)); } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + rd->post_tasks(std::move(tasks)); } catch (const std::exception & e) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } - ctx_server.receive_cmpl_results_stream(task_ids, [&](server_task_result_ptr & result) -> bool { + // Get first result for error checking (following server.cpp pattern) + server_task_result_ptr first_result = rd->next([&context]() { return context->IsCancelled(); }); + if (first_result == nullptr) { + // connection is closed + return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); + } else if (first_result->is_error()) { + json error_json = first_result->to_json(); + backend::Reply reply; + reply.set_message(error_json.value("message", "")); + writer->Write(reply); + return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); + } + + // Process first result + json first_res_json = first_result->to_json(); + if (first_res_json.is_array()) { + for (const auto & res : first_res_json) { + std::string completion_text = res.value("content", ""); + + backend::Reply reply; + reply.set_message(completion_text); + int32_t tokens_predicted = res.value("tokens_predicted", 0); + reply.set_tokens(tokens_predicted); + int32_t tokens_evaluated = res.value("tokens_evaluated", 0); + reply.set_prompt_tokens(tokens_evaluated); + + if (res.contains("timings")) { + double timing_prompt_processing = res.at("timings").value("prompt_ms", 0.0); + reply.set_timing_prompt_processing(timing_prompt_processing); + double timing_token_generation = res.at("timings").value("predicted_ms", 0.0); + reply.set_timing_token_generation(timing_token_generation); + } + + writer->Write(reply); + } + } else { + std::string completion_text = first_res_json.value("content", ""); + + backend::Reply reply; + reply.set_message(completion_text); + int32_t tokens_predicted = first_res_json.value("tokens_predicted", 0); + reply.set_tokens(tokens_predicted); + int32_t tokens_evaluated = first_res_json.value("tokens_evaluated", 0); + reply.set_prompt_tokens(tokens_evaluated); + + if (first_res_json.contains("timings")) { + double timing_prompt_processing = first_res_json.at("timings").value("prompt_ms", 0.0); + reply.set_timing_prompt_processing(timing_prompt_processing); + double timing_token_generation = first_res_json.at("timings").value("predicted_ms", 0.0); + reply.set_timing_token_generation(timing_token_generation); + } + + writer->Write(reply); + } + + // Process subsequent results + while (rd->has_next()) { // Check if context is cancelled before processing result if (context->IsCancelled()) { - ctx_server.cancel_tasks(task_ids); - return false; + break; + } + + auto result = rd->next([&context]() { return context->IsCancelled(); }); + if (result == nullptr) { + // connection is closed + break; } json res_json = result->to_json(); @@ -904,9 +964,6 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - // Log Request Correlation Id - - // Send the reply writer->Write(reply); } } else { @@ -926,24 +983,9 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - - - // Send the reply - writer->Write(reply); - + writer->Write(reply); } - return true; - }, [&](const json & error_data) { - backend::Reply reply; - reply.set_message(error_data.value("content", "")); - writer->Write(reply); - return true; - }, [&context]() { - // Check if the gRPC context is cancelled - return context->IsCancelled(); - }); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); + } // Check if context was cancelled during processing if (context->IsCancelled()) { @@ -963,7 +1005,7 @@ class BackendServiceImpl final : public backend::Backend::Service { } std::cout << "[PREDICT] Received result: " << data.dump(2) << std::endl; auto completion_id = gen_chatcmplid(); - std::unordered_set task_ids; + const auto rd = std::make_shared(ctx_server); try { std::vector tasks; @@ -1261,9 +1303,7 @@ class BackendServiceImpl final : public backend::Backend::Service { tasks.push_back(std::move(task)); } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + rd->post_tasks(std::move(tasks)); } catch (const std::exception & e) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } @@ -1271,51 +1311,45 @@ class BackendServiceImpl final : public backend::Backend::Service { std::cout << "[DEBUG] Waiting for results..." << std::endl; - // Check cancellation before waiting for results - if (context->IsCancelled()) { - ctx_server.cancel_tasks(task_ids); - ctx_server.queue_results.remove_waiting_task_ids(task_ids); + // Wait for all results + auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + + if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); - } - - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - std::cout << "[DEBUG] Received " << results.size() << " results" << std::endl; - if (results.size() == 1) { + } else if (all_results.error) { + std::cout << "[DEBUG] Error in results: " << all_results.error->to_json().value("message", "") << std::endl; + reply->set_message(all_results.error->to_json().value("message", "")); + return grpc::Status(grpc::StatusCode::INTERNAL, all_results.error->to_json().value("message", "Error occurred")); + } else { + std::cout << "[DEBUG] Received " << all_results.results.size() << " results" << std::endl; + if (all_results.results.size() == 1) { // single result - reply->set_message(results[0]->to_json().value("content", "")); + GGML_ASSERT(dynamic_cast(all_results.results[0].get()) != nullptr); + reply->set_message(all_results.results[0]->to_json().value("content", "")); - int32_t tokens_predicted = results[0]->to_json().value("tokens_predicted", 0); + int32_t tokens_predicted = all_results.results[0]->to_json().value("tokens_predicted", 0); reply->set_tokens(tokens_predicted); - int32_t tokens_evaluated = results[0]->to_json().value("tokens_evaluated", 0); + int32_t tokens_evaluated = all_results.results[0]->to_json().value("tokens_evaluated", 0); reply->set_prompt_tokens(tokens_evaluated); - if (results[0]->to_json().contains("timings")) { - double timing_prompt_processing = results[0]->to_json().at("timings").value("prompt_ms", 0.0); + if (all_results.results[0]->to_json().contains("timings")) { + double timing_prompt_processing = all_results.results[0]->to_json().at("timings").value("prompt_ms", 0.0); reply->set_timing_prompt_processing(timing_prompt_processing); - double timing_token_generation = results[0]->to_json().at("timings").value("predicted_ms", 0.0); + double timing_token_generation = all_results.results[0]->to_json().at("timings").value("predicted_ms", 0.0); reply->set_timing_token_generation(timing_token_generation); } } else { // multiple results (multitask) json arr = json::array(); - for (auto & res : results) { + for (auto & res : all_results.results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); arr.push_back(res->to_json().value("content", "")); } reply->set_message(arr); } - - - }, [&](const json & error_data) { - std::cout << "[DEBUG] Error in results: " << error_data.value("content", "") << std::endl; - reply->set_message(error_data.value("content", "")); - }, [&context]() { - // Check if the gRPC context is cancelled - // This is checked every HTTP_POLLING_SECONDS (1 second) during receive_multi_results - return context->IsCancelled(); - }); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); + } + std::cout << "[DEBUG] Predict request completed successfully" << std::endl; // Check if context was cancelled during processing @@ -1352,9 +1386,7 @@ class BackendServiceImpl final : public backend::Backend::Service { int embd_normalize = 2; // default to Euclidean/L2 norm // create and queue the task - json responses = json::array(); - bool error = false; - std::unordered_set task_ids; + const auto rd = std::make_shared(ctx_server); { std::vector tasks; for (size_t i = 0; i < tokenized_prompts.size(); i++) { @@ -1369,40 +1401,23 @@ class BackendServiceImpl final : public backend::Backend::Service { tasks.push_back(std::move(task)); } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + rd->post_tasks(std::move(tasks)); } - // Check cancellation before waiting for results - if (context->IsCancelled()) { - ctx_server.cancel_tasks(task_ids); - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); - } - - // get the result - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { - GGML_ASSERT(dynamic_cast(res.get()) != nullptr); - responses.push_back(res->to_json()); - } - }, [&](const json & error_data) { - error = true; - }, [&context]() { - // Check if the gRPC context is cancelled - return context->IsCancelled(); - }); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - - // Check if context was cancelled during processing - if (context->IsCancelled()) { + // Wait for all results + auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + + if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); + } else if (all_results.error) { + return grpc::Status(grpc::StatusCode::INTERNAL, all_results.error->to_json().value("message", "Error in receiving results")); } - if (error) { - return grpc::Status(grpc::StatusCode::INTERNAL, "Error in receiving results"); + // Collect responses + json responses = json::array(); + for (auto & res : all_results.results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + responses.push_back(res->to_json()); } std::cout << "[DEBUG] Responses size: " << responses.size() << std::endl; @@ -1453,9 +1468,7 @@ class BackendServiceImpl final : public backend::Backend::Service { } // Create and queue the task - json responses = json::array(); - bool error = false; - std::unordered_set task_ids; + const auto rd = std::make_shared(ctx_server); { std::vector tasks; std::vector documents; @@ -1473,40 +1486,23 @@ class BackendServiceImpl final : public backend::Backend::Service { tasks.push_back(std::move(task)); } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); - } - - // Check cancellation before waiting for results - if (context->IsCancelled()) { - ctx_server.cancel_tasks(task_ids); - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); + rd->post_tasks(std::move(tasks)); } - // Get the results - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { - GGML_ASSERT(dynamic_cast(res.get()) != nullptr); - responses.push_back(res->to_json()); - } - }, [&](const json & error_data) { - error = true; - }, [&context]() { - // Check if the gRPC context is cancelled - return context->IsCancelled(); - }); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - - // Check if context was cancelled during processing - if (context->IsCancelled()) { + // Wait for all results + auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + + if (all_results.is_terminated) { return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); + } else if (all_results.error) { + return grpc::Status(grpc::StatusCode::INTERNAL, all_results.error->to_json().value("message", "Error in receiving results")); } - if (error) { - return grpc::Status(grpc::StatusCode::INTERNAL, "Error in receiving results"); + // Collect responses + json responses = json::array(); + for (auto & res : all_results.results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + responses.push_back(res->to_json()); } // Sort responses by score in descending order std::sort(responses.begin(), responses.end(), [](const json& a, const json& b) { From 5041c48418d494eb35150f6d33f071791bcc910b Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 13 Nov 2025 17:17:07 +0100 Subject: [PATCH 2/5] DEBUG Signed-off-by: Ettore Di Giacinto --- backend/cpp/llama-cpp/grpc-server.cpp | 102 ++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 13 deletions(-) diff --git a/backend/cpp/llama-cpp/grpc-server.cpp b/backend/cpp/llama-cpp/grpc-server.cpp index 72ed1b09b568..2b8c00a4500d 100644 --- a/backend/cpp/llama-cpp/grpc-server.cpp +++ b/backend/cpp/llama-cpp/grpc-server.cpp @@ -569,18 +569,21 @@ class BackendServiceImpl final : public backend::Backend::Service { } grpc::Status PredictStream(grpc::ServerContext* context, const backend::PredictOptions* request, grpc::ServerWriter* writer) override { + std::cout << "[PredictStream] Starting PredictStream request" << std::endl; json data = parse_options(true, request, ctx_server); - + std::cout << "[PredictStream] Parsed options, stream=true" << std::endl; //Raise error if embeddings is set to true if (ctx_server.params_base.embedding) { + std::cout << "[PredictStream] ERROR: Embedding is not supported in streaming mode" << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in streaming mode"); } - auto completion_id = gen_chatcmplid(); + std::cout << "[PredictStream] Generated completion_id: " << completion_id << std::endl; // need to store the reader as a pointer, so that it won't be destroyed when the handle returns const auto rd = std::make_shared(ctx_server); + std::cout << "[PredictStream] Created server_response_reader" << std::endl; try { std::vector tasks; @@ -873,25 +876,44 @@ class BackendServiceImpl final : public backend::Backend::Service { } rd->post_tasks(std::move(tasks)); + std::cout << "[PredictStream] Posted " << tasks.size() << " tasks to queue" << std::endl; } catch (const std::exception & e) { + std::cout << "[PredictStream] EXCEPTION during task creation: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } + std::cout << "[PredictStream] Waiting for first result..." << std::endl; // Get first result for error checking (following server.cpp pattern) server_task_result_ptr first_result = rd->next([&context]() { return context->IsCancelled(); }); + std::cout << "[PredictStream] Received first result, is_null=" << (first_result == nullptr) << std::endl; if (first_result == nullptr) { // connection is closed + std::cout << "[PredictStream] First result is nullptr, connection closed" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } else if (first_result->is_error()) { + std::cout << "[PredictStream] First result is an ERROR" << std::endl; json error_json = first_result->to_json(); + std::cout << "[PredictStream] Error JSON: " << error_json.dump() << std::endl; backend::Reply reply; reply.set_message(error_json.value("message", "")); + std::cout << "[PredictStream] Writing error reply to stream" << std::endl; writer->Write(reply); + std::cout << "[PredictStream] Returning INTERNAL error status" << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); + } else { + // Ensure first result is a completion result (partial or final) + std::cout << "[PredictStream] First result is valid, checking type..." << std::endl; + GGML_ASSERT( + dynamic_cast(first_result.get()) != nullptr + || dynamic_cast(first_result.get()) != nullptr + ); + std::cout << "[PredictStream] First result type check passed" << std::endl; } // Process first result + std::cout << "[PredictStream] Processing first result..." << std::endl; json first_res_json = first_result->to_json(); + std::cout << "[PredictStream] First result JSON: " << first_res_json.dump(2) << std::endl; if (first_res_json.is_array()) { for (const auto & res : first_res_json) { std::string completion_text = res.value("content", ""); @@ -910,7 +932,9 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - writer->Write(reply); + std::cout << "[PredictStream] Writing first result array element, message length=" << completion_text.length() << std::endl; + bool write_ok = writer->Write(reply); + std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; } } else { std::string completion_text = first_res_json.value("content", ""); @@ -929,23 +953,55 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - writer->Write(reply); + std::cout << "[PredictStream] Writing first result (non-array), message length=" << completion_text.length() << std::endl; + bool write_ok = writer->Write(reply); + std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; } // Process subsequent results + std::cout << "[PredictStream] Starting to process subsequent results, has_next=" << rd->has_next() << std::endl; + int result_count = 0; while (rd->has_next()) { + result_count++; + std::cout << "[PredictStream] Processing result #" << result_count << std::endl; // Check if context is cancelled before processing result if (context->IsCancelled()) { + std::cout << "[PredictStream] Context cancelled, breaking loop" << std::endl; break; } + std::cout << "[PredictStream] Calling rd->next()..." << std::endl; auto result = rd->next([&context]() { return context->IsCancelled(); }); + std::cout << "[PredictStream] Received result, is_null=" << (result == nullptr) << std::endl; if (result == nullptr) { // connection is closed + std::cout << "[PredictStream] Result is nullptr, connection closed, breaking" << std::endl; break; } + // Check for errors in subsequent results + if (result->is_error()) { + std::cout << "[PredictStream] Result #" << result_count << " is an ERROR" << std::endl; + json error_json = result->to_json(); + std::cout << "[PredictStream] Error JSON: " << error_json.dump() << std::endl; + backend::Reply reply; + reply.set_message(error_json.value("message", "")); + std::cout << "[PredictStream] Writing error reply to stream" << std::endl; + writer->Write(reply); + std::cout << "[PredictStream] Returning INTERNAL error status" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); + } else { + // Ensure result is a completion result (partial or final) + std::cout << "[PredictStream] Result #" << result_count << " is valid, checking type..." << std::endl; + GGML_ASSERT( + dynamic_cast(result.get()) != nullptr + || dynamic_cast(result.get()) != nullptr + ); + std::cout << "[PredictStream] Result #" << result_count << " type check passed" << std::endl; + } + json res_json = result->to_json(); + std::cout << "[PredictStream] Result #" << result_count << " JSON: " << res_json.dump(2) << std::endl; if (res_json.is_array()) { for (const auto & res : res_json) { std::string completion_text = res.value("content", ""); @@ -964,7 +1020,9 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - writer->Write(reply); + std::cout << "[PredictStream] Writing result #" << result_count << " array element, message length=" << completion_text.length() << std::endl; + bool write_ok = writer->Write(reply); + std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; } } else { std::string completion_text = res_json.value("content", ""); @@ -983,15 +1041,20 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - writer->Write(reply); + std::cout << "[PredictStream] Writing result #" << result_count << " (non-array), message length=" << completion_text.length() << std::endl; + bool write_ok = writer->Write(reply); + std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; } } + std::cout << "[PredictStream] Finished processing all results, processed " << result_count << " subsequent results" << std::endl; // Check if context was cancelled during processing if (context->IsCancelled()) { + std::cout << "[PredictStream] Context was cancelled, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } + std::cout << "[PredictStream] Returning OK status" << std::endl; return grpc::Status::OK; } @@ -1003,9 +1066,12 @@ class BackendServiceImpl final : public backend::Backend::Service { if (ctx_server.params_base.embedding) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in Predict mode"); } + std::cout << "[PREDICT] Starting Predict request" << std::endl; std::cout << "[PREDICT] Received result: " << data.dump(2) << std::endl; auto completion_id = gen_chatcmplid(); + std::cout << "[PREDICT] Generated completion_id: " << completion_id << std::endl; const auto rd = std::make_shared(ctx_server); + std::cout << "[PREDICT] Created server_response_reader" << std::endl; try { std::vector tasks; @@ -1304,24 +1370,32 @@ class BackendServiceImpl final : public backend::Backend::Service { } rd->post_tasks(std::move(tasks)); + std::cout << "[PREDICT] Posted " << tasks.size() << " tasks to queue" << std::endl; } catch (const std::exception & e) { + std::cout << "[PREDICT] EXCEPTION during task creation: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } - - std::cout << "[DEBUG] Waiting for results..." << std::endl; + std::cout << "[PREDICT] Waiting for all results..." << std::endl; // Wait for all results auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); + std::cout << "[PREDICT] wait_for_all returned, is_terminated=" << all_results.is_terminated + << ", has_error=" << (all_results.error != nullptr) + << ", results_count=" << all_results.results.size() << std::endl; if (all_results.is_terminated) { + std::cout << "[PREDICT] Request was terminated, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } else if (all_results.error) { - std::cout << "[DEBUG] Error in results: " << all_results.error->to_json().value("message", "") << std::endl; - reply->set_message(all_results.error->to_json().value("message", "")); - return grpc::Status(grpc::StatusCode::INTERNAL, all_results.error->to_json().value("message", "Error occurred")); + std::cout << "[PREDICT] Error in results: " << all_results.error->to_json().value("message", "") << std::endl; + json error_json = all_results.error->to_json(); + std::cout << "[PREDICT] Error JSON: " << error_json.dump() << std::endl; + reply->set_message(error_json.value("message", "")); + std::cout << "[PREDICT] Returning INTERNAL error status" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); } else { - std::cout << "[DEBUG] Received " << all_results.results.size() << " results" << std::endl; + std::cout << "[PREDICT] Received " << all_results.results.size() << " results" << std::endl; if (all_results.results.size() == 1) { // single result GGML_ASSERT(dynamic_cast(all_results.results[0].get()) != nullptr); @@ -1350,13 +1424,15 @@ class BackendServiceImpl final : public backend::Backend::Service { } } - std::cout << "[DEBUG] Predict request completed successfully" << std::endl; + std::cout << "[PREDICT] Predict request completed successfully" << std::endl; // Check if context was cancelled during processing if (context->IsCancelled()) { + std::cout << "[PREDICT] Context was cancelled, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } + std::cout << "[PREDICT] Returning OK status" << std::endl; return grpc::Status::OK; } From fde6af74b8894d89b74f36760f6f7edc8424db5a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 13 Nov 2025 21:01:33 +0100 Subject: [PATCH 3/5] Bump Signed-off-by: Ettore Di Giacinto --- backend/cpp/llama-cpp/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/cpp/llama-cpp/Makefile b/backend/cpp/llama-cpp/Makefile index 02f12013f560..c93bd61f19cb 100644 --- a/backend/cpp/llama-cpp/Makefile +++ b/backend/cpp/llama-cpp/Makefile @@ -1,5 +1,5 @@ -LLAMA_VERSION?=92bb442ad999a0d52df0af2730cd861012e8ac5c +LLAMA_VERSION?=c4abcb2457217198efdd67d02675f5fddb7071c2 LLAMA_REPO?=https://github.com/ggerganov/llama.cpp CMAKE_ARGS?= From f23e9e025a164af41738e8197b9be8f9465d30a7 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 13 Nov 2025 23:09:31 +0100 Subject: [PATCH 4/5] test/debug Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/chat.go | 2 +- core/http/endpoints/openai/mcp.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/http/endpoints/openai/chat.go b/core/http/endpoints/openai/chat.go index 975aac3e872a..6385997bf4fb 100644 --- a/core/http/endpoints/openai/chat.go +++ b/core/http/endpoints/openai/chat.go @@ -591,7 +591,7 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator // NOTE: this is a workaround as fasthttp // context cancellation does not fire in non-streaming requests - handleConnectionCancellation(c, input.Cancel, input.Context) + // handleConnectionCancellation(c, input.Cancel, input.Context) result, tokenUsage, err := ComputeChoices( input, diff --git a/core/http/endpoints/openai/mcp.go b/core/http/endpoints/openai/mcp.go index fe018bbbd09c..efb3c6d29096 100644 --- a/core/http/endpoints/openai/mcp.go +++ b/core/http/endpoints/openai/mcp.go @@ -80,7 +80,7 @@ func MCPCompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, ctxWithCancellation, cancel := context.WithCancel(ctx) defer cancel() - handleConnectionCancellation(c, cancel, ctxWithCancellation) + //handleConnectionCancellation(c, cancel, ctxWithCancellation) // TODO: instead of connecting to the API, we should just wire this internally // and act like completion.go. // We can do this as cogito expects an interface and we can create one that From a83d775d2a95aaef7a3510e9b8b4f67bc12afbc1 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 14 Nov 2025 09:21:40 +0100 Subject: [PATCH 5/5] Revert "DEBUG" This reverts commit 2501ca3ff242076d623c13c86b3d6afcec426281. Signed-off-by: Ettore Di Giacinto --- backend/cpp/llama-cpp/grpc-server.cpp | 102 ++++---------------------- 1 file changed, 13 insertions(+), 89 deletions(-) diff --git a/backend/cpp/llama-cpp/grpc-server.cpp b/backend/cpp/llama-cpp/grpc-server.cpp index 2b8c00a4500d..72ed1b09b568 100644 --- a/backend/cpp/llama-cpp/grpc-server.cpp +++ b/backend/cpp/llama-cpp/grpc-server.cpp @@ -569,21 +569,18 @@ class BackendServiceImpl final : public backend::Backend::Service { } grpc::Status PredictStream(grpc::ServerContext* context, const backend::PredictOptions* request, grpc::ServerWriter* writer) override { - std::cout << "[PredictStream] Starting PredictStream request" << std::endl; json data = parse_options(true, request, ctx_server); - std::cout << "[PredictStream] Parsed options, stream=true" << std::endl; + //Raise error if embeddings is set to true if (ctx_server.params_base.embedding) { - std::cout << "[PredictStream] ERROR: Embedding is not supported in streaming mode" << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in streaming mode"); } + auto completion_id = gen_chatcmplid(); - std::cout << "[PredictStream] Generated completion_id: " << completion_id << std::endl; // need to store the reader as a pointer, so that it won't be destroyed when the handle returns const auto rd = std::make_shared(ctx_server); - std::cout << "[PredictStream] Created server_response_reader" << std::endl; try { std::vector tasks; @@ -876,44 +873,25 @@ class BackendServiceImpl final : public backend::Backend::Service { } rd->post_tasks(std::move(tasks)); - std::cout << "[PredictStream] Posted " << tasks.size() << " tasks to queue" << std::endl; } catch (const std::exception & e) { - std::cout << "[PredictStream] EXCEPTION during task creation: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } - std::cout << "[PredictStream] Waiting for first result..." << std::endl; // Get first result for error checking (following server.cpp pattern) server_task_result_ptr first_result = rd->next([&context]() { return context->IsCancelled(); }); - std::cout << "[PredictStream] Received first result, is_null=" << (first_result == nullptr) << std::endl; if (first_result == nullptr) { // connection is closed - std::cout << "[PredictStream] First result is nullptr, connection closed" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } else if (first_result->is_error()) { - std::cout << "[PredictStream] First result is an ERROR" << std::endl; json error_json = first_result->to_json(); - std::cout << "[PredictStream] Error JSON: " << error_json.dump() << std::endl; backend::Reply reply; reply.set_message(error_json.value("message", "")); - std::cout << "[PredictStream] Writing error reply to stream" << std::endl; writer->Write(reply); - std::cout << "[PredictStream] Returning INTERNAL error status" << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); - } else { - // Ensure first result is a completion result (partial or final) - std::cout << "[PredictStream] First result is valid, checking type..." << std::endl; - GGML_ASSERT( - dynamic_cast(first_result.get()) != nullptr - || dynamic_cast(first_result.get()) != nullptr - ); - std::cout << "[PredictStream] First result type check passed" << std::endl; } // Process first result - std::cout << "[PredictStream] Processing first result..." << std::endl; json first_res_json = first_result->to_json(); - std::cout << "[PredictStream] First result JSON: " << first_res_json.dump(2) << std::endl; if (first_res_json.is_array()) { for (const auto & res : first_res_json) { std::string completion_text = res.value("content", ""); @@ -932,9 +910,7 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - std::cout << "[PredictStream] Writing first result array element, message length=" << completion_text.length() << std::endl; - bool write_ok = writer->Write(reply); - std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; + writer->Write(reply); } } else { std::string completion_text = first_res_json.value("content", ""); @@ -953,55 +929,23 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - std::cout << "[PredictStream] Writing first result (non-array), message length=" << completion_text.length() << std::endl; - bool write_ok = writer->Write(reply); - std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; + writer->Write(reply); } // Process subsequent results - std::cout << "[PredictStream] Starting to process subsequent results, has_next=" << rd->has_next() << std::endl; - int result_count = 0; while (rd->has_next()) { - result_count++; - std::cout << "[PredictStream] Processing result #" << result_count << std::endl; // Check if context is cancelled before processing result if (context->IsCancelled()) { - std::cout << "[PredictStream] Context cancelled, breaking loop" << std::endl; break; } - std::cout << "[PredictStream] Calling rd->next()..." << std::endl; auto result = rd->next([&context]() { return context->IsCancelled(); }); - std::cout << "[PredictStream] Received result, is_null=" << (result == nullptr) << std::endl; if (result == nullptr) { // connection is closed - std::cout << "[PredictStream] Result is nullptr, connection closed, breaking" << std::endl; break; } - // Check for errors in subsequent results - if (result->is_error()) { - std::cout << "[PredictStream] Result #" << result_count << " is an ERROR" << std::endl; - json error_json = result->to_json(); - std::cout << "[PredictStream] Error JSON: " << error_json.dump() << std::endl; - backend::Reply reply; - reply.set_message(error_json.value("message", "")); - std::cout << "[PredictStream] Writing error reply to stream" << std::endl; - writer->Write(reply); - std::cout << "[PredictStream] Returning INTERNAL error status" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); - } else { - // Ensure result is a completion result (partial or final) - std::cout << "[PredictStream] Result #" << result_count << " is valid, checking type..." << std::endl; - GGML_ASSERT( - dynamic_cast(result.get()) != nullptr - || dynamic_cast(result.get()) != nullptr - ); - std::cout << "[PredictStream] Result #" << result_count << " type check passed" << std::endl; - } - json res_json = result->to_json(); - std::cout << "[PredictStream] Result #" << result_count << " JSON: " << res_json.dump(2) << std::endl; if (res_json.is_array()) { for (const auto & res : res_json) { std::string completion_text = res.value("content", ""); @@ -1020,9 +964,7 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - std::cout << "[PredictStream] Writing result #" << result_count << " array element, message length=" << completion_text.length() << std::endl; - bool write_ok = writer->Write(reply); - std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; + writer->Write(reply); } } else { std::string completion_text = res_json.value("content", ""); @@ -1041,20 +983,15 @@ class BackendServiceImpl final : public backend::Backend::Service { reply.set_timing_token_generation(timing_token_generation); } - std::cout << "[PredictStream] Writing result #" << result_count << " (non-array), message length=" << completion_text.length() << std::endl; - bool write_ok = writer->Write(reply); - std::cout << "[PredictStream] Write result: " << (write_ok ? "OK" : "FAILED") << std::endl; + writer->Write(reply); } } - std::cout << "[PredictStream] Finished processing all results, processed " << result_count << " subsequent results" << std::endl; // Check if context was cancelled during processing if (context->IsCancelled()) { - std::cout << "[PredictStream] Context was cancelled, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } - std::cout << "[PredictStream] Returning OK status" << std::endl; return grpc::Status::OK; } @@ -1066,12 +1003,9 @@ class BackendServiceImpl final : public backend::Backend::Service { if (ctx_server.params_base.embedding) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in Predict mode"); } - std::cout << "[PREDICT] Starting Predict request" << std::endl; std::cout << "[PREDICT] Received result: " << data.dump(2) << std::endl; auto completion_id = gen_chatcmplid(); - std::cout << "[PREDICT] Generated completion_id: " << completion_id << std::endl; const auto rd = std::make_shared(ctx_server); - std::cout << "[PREDICT] Created server_response_reader" << std::endl; try { std::vector tasks; @@ -1370,32 +1304,24 @@ class BackendServiceImpl final : public backend::Backend::Service { } rd->post_tasks(std::move(tasks)); - std::cout << "[PREDICT] Posted " << tasks.size() << " tasks to queue" << std::endl; } catch (const std::exception & e) { - std::cout << "[PREDICT] EXCEPTION during task creation: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); } - std::cout << "[PREDICT] Waiting for all results..." << std::endl; + + std::cout << "[DEBUG] Waiting for results..." << std::endl; // Wait for all results auto all_results = rd->wait_for_all([&context]() { return context->IsCancelled(); }); - std::cout << "[PREDICT] wait_for_all returned, is_terminated=" << all_results.is_terminated - << ", has_error=" << (all_results.error != nullptr) - << ", results_count=" << all_results.results.size() << std::endl; if (all_results.is_terminated) { - std::cout << "[PREDICT] Request was terminated, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } else if (all_results.error) { - std::cout << "[PREDICT] Error in results: " << all_results.error->to_json().value("message", "") << std::endl; - json error_json = all_results.error->to_json(); - std::cout << "[PREDICT] Error JSON: " << error_json.dump() << std::endl; - reply->set_message(error_json.value("message", "")); - std::cout << "[PREDICT] Returning INTERNAL error status" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred")); + std::cout << "[DEBUG] Error in results: " << all_results.error->to_json().value("message", "") << std::endl; + reply->set_message(all_results.error->to_json().value("message", "")); + return grpc::Status(grpc::StatusCode::INTERNAL, all_results.error->to_json().value("message", "Error occurred")); } else { - std::cout << "[PREDICT] Received " << all_results.results.size() << " results" << std::endl; + std::cout << "[DEBUG] Received " << all_results.results.size() << " results" << std::endl; if (all_results.results.size() == 1) { // single result GGML_ASSERT(dynamic_cast(all_results.results[0].get()) != nullptr); @@ -1424,15 +1350,13 @@ class BackendServiceImpl final : public backend::Backend::Service { } } - std::cout << "[PREDICT] Predict request completed successfully" << std::endl; + std::cout << "[DEBUG] Predict request completed successfully" << std::endl; // Check if context was cancelled during processing if (context->IsCancelled()) { - std::cout << "[PREDICT] Context was cancelled, returning CANCELLED status" << std::endl; return grpc::Status(grpc::StatusCode::CANCELLED, "Request cancelled by client"); } - std::cout << "[PREDICT] Returning OK status" << std::endl; return grpc::Status::OK; }