Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions engine/repositories/message_fs_repository.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ MessageFsRepository::ListMessages(const std::string& thread_id, uint8_t limit,
const std::string& before,
const std::string& run_id) const {
CTL_INF("Listing messages for thread " + thread_id);
auto path = GetMessagePath(thread_id);

// Early validation
if (limit == 0) {
return std::vector<OpenAi::Message>();
}
if (!after.empty() && !before.empty() && after >= before) {
return cpp::fail("Invalid range: 'after' must be less than 'before'");
}

auto mutex = GrabMutex(thread_id);
std::shared_lock<std::shared_mutex> lock(*mutex);
Expand All @@ -60,6 +67,11 @@ MessageFsRepository::ListMessages(const std::string& thread_id, uint8_t limit,

std::vector<OpenAi::Message> messages = std::move(read_result.value());

if (messages.empty()) {
return messages;
}

// Filter by run_id
if (!run_id.empty()) {
messages.erase(std::remove_if(messages.begin(), messages.end(),
[&run_id](const OpenAi::Message& msg) {
Expand All @@ -68,52 +80,52 @@ MessageFsRepository::ListMessages(const std::string& thread_id, uint8_t limit,
messages.end());
}

std::sort(messages.begin(), messages.end(),
[&order](const OpenAi::Message& a, const OpenAi::Message& b) {
if (order == "desc") {
return a.created_at > b.created_at;
}
return a.created_at < b.created_at;
});
const bool is_descending = (order == "desc");
std::sort(
messages.begin(), messages.end(),
[is_descending](const OpenAi::Message& a, const OpenAi::Message& b) {
return is_descending ? (a.id > b.id) : (a.id < b.id);
});

auto start_it = messages.begin();
auto end_it = messages.end();

if (!after.empty()) {
start_it = std::find_if(
messages.begin(), messages.end(),
[&after](const OpenAi::Message& msg) { return msg.id == after; });
if (start_it != messages.end()) {
++start_it; // Start from the message after the 'after' message
} else {
start_it = messages.begin();
start_it = std::lower_bound(
messages.begin(), messages.end(), after,
[is_descending](const OpenAi::Message& msg, const std::string& value) {
return is_descending ? (msg.id > value) : (msg.id < value);
});

if (start_it != messages.end() && start_it->id == after) {
++start_it;
}
}

if (!before.empty()) {
end_it = std::find_if(
messages.begin(), messages.end(),
[&before](const OpenAi::Message& msg) { return msg.id == before; });
end_it = std::upper_bound(
start_it, messages.end(), before,
[is_descending](const std::string& value, const OpenAi::Message& msg) {
return is_descending ? (value > msg.id) : (value < msg.id);
});
}

std::vector<OpenAi::Message> result;
size_t distance = std::distance(start_it, end_it);
size_t limit_size = static_cast<size_t>(limit);
CTL_INF("Distance: " + std::to_string(distance) +
", limit_size: " + std::to_string(limit_size));
result.reserve(distance < limit_size ? distance : limit_size);
const size_t available_messages = std::distance(start_it, end_it);
const size_t result_size =
std::min(static_cast<size_t>(limit), available_messages);

for (auto it = start_it; it != end_it && result.size() < limit_size; ++it) {
result.push_back(std::move(*it));
}
CTL_INF("Available messages: " + std::to_string(available_messages) +
", result size: " + std::to_string(result_size));

std::vector<OpenAi::Message> result;
result.reserve(result_size);
std::move(start_it, start_it + result_size, std::back_inserter(result));

return result;
}

cpp::result<OpenAi::Message, std::string> MessageFsRepository::RetrieveMessage(
const std::string& thread_id, const std::string& message_id) const {
auto path = GetMessagePath(thread_id);

auto mutex = GrabMutex(thread_id);
std::unique_lock<std::shared_mutex> lock(*mutex);

Expand All @@ -133,8 +145,6 @@ cpp::result<OpenAi::Message, std::string> MessageFsRepository::RetrieveMessage(

cpp::result<void, std::string> MessageFsRepository::ModifyMessage(
OpenAi::Message& message) {
auto path = GetMessagePath(message.thread_id);

auto mutex = GrabMutex(message.thread_id);
std::unique_lock<std::shared_mutex> lock(*mutex);

Expand All @@ -143,6 +153,7 @@ cpp::result<void, std::string> MessageFsRepository::ModifyMessage(
return cpp::fail(messages.error());
}

auto path = GetMessagePath(message.thread_id);
std::ofstream file(path, std::ios::trunc);
if (!file) {
return cpp::fail("Failed to open file for writing: " + path.string());
Expand Down