Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
# rust
Cargo.lock

/local
2 changes: 2 additions & 0 deletions xllm_service/proto/xllm/chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ message ChatRequest {
repeated int32 token_ids = 26;

Routing routing = 27;

optional bool offline = 28;
}

message ChatLogProbData {
Expand Down
2 changes: 2 additions & 0 deletions xllm_service/proto/xllm/completion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ message CompletionRequest {
repeated int32 token_ids = 26;

Routing routing = 27;

optional bool offline = 28;
}

message LogProbs {
Expand Down
1 change: 1 addition & 0 deletions xllm_service/proto/xllm_rpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ service XllmRpcService {
rpc GetInstanceInfo(InstanceID) returns (InstanceMetaInfo) {}
rpc Heartbeat(HeartbeatRequest) returns (Status) {}
rpc GetStaticDecodeList(InstanceID) returns (InstanceIDs) {}
rpc GetStaticPrefillList(InstanceID) returns (InstanceIDs) {}
rpc GetConfig(Empty) returns (ServiceConfig) {}

// xllm service receive response from decode instance directly in disagg pd mode.
Expand Down
2 changes: 2 additions & 0 deletions xllm_service/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct Request {
// whether to return usage
bool include_usage = false;

bool offline = false;

// input prompt
std::string prompt;

Expand Down
18 changes: 18 additions & 0 deletions xllm_service/rpc_service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ std::vector<std::string> XllmRpcServiceImpl::get_static_decode_list(
return scheduler_->get_static_decode_list(instance_name);
}

std::vector<std::string> XllmRpcServiceImpl::get_static_prefill_list(
const std::string& instance_name) {
return scheduler_->get_static_prefill_list(instance_name);
}

bool XllmRpcServiceImpl::handle_generation(
const llm::RequestOutput& request_output) {
return scheduler_->handle_generation(request_output);
Expand Down Expand Up @@ -126,6 +131,19 @@ void XllmRpcService::GetStaticDecodeList(
}
}

void XllmRpcService::GetStaticPrefillList(
google::protobuf::RpcController* cntl_base,
const proto::InstanceID* req,
proto::InstanceIDs* resp,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::vector<std::string> prefill_list =
xllm_rpc_service_impl_->get_static_prefill_list(req->name());
for (auto& p : prefill_list) {
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::move on loop variable p in a range-based for loop can lead to undefined behavior on subsequent iterations. Consider using const auto& for the loop variable or avoid moving.

Suggested change
for (auto& p : prefill_list) {
for (auto p : prefill_list) {

Copilot uses AI. Check for mistakes.
*(resp->mutable_names()->Add()) = std::move(p);
}
}

void XllmRpcService::Generations(google::protobuf::RpcController* cntl_base,
const proto::DisaggStreamGenerations* req,
proto::StatusSet* resp,
Expand Down
8 changes: 8 additions & 0 deletions xllm_service/rpc_service/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class XllmRpcServiceImpl final {
std::vector<std::string> get_static_decode_list(
const std::string& prefill_name);

std::vector<std::string> get_static_prefill_list(
const std::string& decode_name);

public:
// handle generations from prefill/decode instance
bool handle_generation(const llm::RequestOutput& request_output);
Expand Down Expand Up @@ -103,6 +106,11 @@ class XllmRpcService : public proto::XllmRpcService {
proto::InstanceIDs* resp,
google::protobuf::Closure* done) override;

virtual void GetStaticPrefillList(google::protobuf::RpcController* cntl_base,
const proto::InstanceID* req,
proto::InstanceIDs* resp,
google::protobuf::Closure* done) override;

// xllm service receive response from decode instance directly in disagg pd
// mode. This can eliminate the cost brought by forwarding through prefill.
virtual void Generations(google::protobuf::RpcController* cntl_base,
Expand Down
15 changes: 15 additions & 0 deletions xllm_service/scheduler/managers/instance_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ std::vector<std::string> InstanceMgr::get_static_decode_list(
return decode_list;
}

// TODO: refactor later, currently return all prefill instances
std::vector<std::string> InstanceMgr::get_static_prefill_list(
const std::string& instance_name) {
Comment on lines +173 to +174
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter instance_name is not used in the implementation. Consider removing it if not needed, or implement the intended filtering logic if it should be used to filter results.

Copilot uses AI. Check for mistakes.
std::vector<std::string> prefill_list;
std::shared_lock<std::shared_mutex> lock(inst_mutex_);
for (auto& inst : instances_) {
if (inst.second.type == InstanceType::PREFILL ||
inst.second.type == InstanceType::DEFAULT) {
prefill_list.emplace_back(inst.second.name);
}
}

return prefill_list;
}

void InstanceMgr::get_load_metrics(LoadBalanceInfos* infos) {
std::shared_lock<std::shared_mutex> inst_lock(inst_mutex_);
std::shared_lock<std::shared_mutex> metric_lock(load_metric_mutex_);
Expand Down
3 changes: 3 additions & 0 deletions xllm_service/scheduler/managers/instance_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class InstanceMgr final {
std::vector<std::string> get_static_decode_list(
const std::string& instance_name);

std::vector<std::string> get_static_prefill_list(
const std::string& instance_name);

void get_load_metrics(LoadBalanceInfos* infos);

std::shared_ptr<brpc::Channel> get_channel(const std::string& instance_name);
Expand Down
5 changes: 5 additions & 0 deletions xllm_service/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ std::vector<std::string> Scheduler::get_static_decode_list(
return instance_mgr_->get_static_decode_list(instance_name);
}

std::vector<std::string> Scheduler::get_static_prefill_list(
const std::string& instance_name) {
return instance_mgr_->get_static_prefill_list(instance_name);
}

Tokenizer* Scheduler::get_tls_tokenizer() {
thread_local std::unique_ptr<Tokenizer> tls_tokenizer(tokenizer_->clone());
return tls_tokenizer.get();
Expand Down
3 changes: 3 additions & 0 deletions xllm_service/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class Scheduler final {
std::vector<std::string> get_static_decode_list(
const std::string& instance_name);

std::vector<std::string> get_static_prefill_list(
const std::string& instance_name);

void handle_instance_heartbeat(const proto::HeartbeatRequest* req);

void exited() { exited_ = true; }
Expand Down