Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge minor refator and fix #12

Merged
merged 6 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/polaris/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class Instance {

std::string& GetId(); ///< 服务实例ID

std::string& GetHost(); ///< 服务的节点IP或者域名
std::string& GetHost() const; ///< 服务的节点IP或者域名

int GetPort(); ///< 节点端口号
int GetPort() const; ///< 节点端口号

uint64_t GetLocalId(); /// 本地生成的唯一ID

Expand Down
2 changes: 1 addition & 1 deletion polaris/api/limit_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ LimitApi* LimitApi::Create(Context* context, std::string& err_msg) {
POLARIS_LOG(LOG_ERROR, "%s", err_msg.c_str());
return NULL;
}
if (context->GetContextMode() != kLimitContext) {
if (context->GetContextMode() != kLimitContext && context->GetContextMode() != kShareContext) {
err_msg = "create limit api failed because context isn't init with limit mode";
POLARIS_LOG(LOG_ERROR, "%s", err_msg.c_str());
return NULL;
Expand Down
7 changes: 6 additions & 1 deletion polaris/cache/cache_persist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "cache/persist_task.h"
#include "logger.h"
#include "model/constants.h"
#include "model/model_impl.h"
#include "polaris/config.h"
#include "reactor/reactor.h"
#include "utils/file_utils.h"
Expand Down Expand Up @@ -147,7 +148,8 @@ ServiceData* CachePersist::LoadServiceData(const ServiceKey& service_key,
}
std::string data((std::istreambuf_iterator<char>(input_file)), std::istreambuf_iterator<char>());
input_file.close();
uint64_t available_time = Time::GetCurrentTimeMs();
uint64_t current_time = Time::GetCurrentTimeMs();
uint64_t available_time = current_time;
// 如果磁盘缓存已经不在可用时间范围内,则需等待一段时间后从服务器同步失败则升级立即使用
if (sync_time + persist_config_.GetAvailableTime() < available_time) {
available_time += persist_config_.GetUpgradeWaitTime();
Expand All @@ -166,6 +168,9 @@ ServiceData* CachePersist::LoadServiceData(const ServiceKey& service_key,
service_data->DecrementRef();
return NULL;
}
POLARIS_LOG(LOG_INFO, "load %s from disk for service[%s/%s] succ, available after %" PRIu64 "s",
DataTypeToStr(data_type), service_key.namespace_.c_str(), service_key.name_.c_str(),
available_time - current_time);
return service_data;
}

Expand Down
9 changes: 7 additions & 2 deletions polaris/grpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void GrpcStream::OnData(Buffer& data, bool end_stream) {
}
if (end_stream) {
callback_.OnRemoteClose(kGrpcStatusOk, "end stream with data frame");
remote_end_ = true;
}
}

Expand All @@ -140,13 +141,17 @@ void GrpcStream::OnTrailers(HeaderMap* trailers) {
}
std::string grpc_message = trailers->GetGrpcMessage();
delete trailers;
if (!remote_end_) {
callback_.OnRemoteClose(grpc_status, grpc_message);
}
remote_end_ = true;
callback_.OnRemoteClose(grpc_status, grpc_message);
}

void GrpcStream::OnReset(GrpcStatusCode status, const std::string& message) {
if (!remote_end_) {
callback_.OnRemoteClose(status, message);
}
remote_end_ = true;
callback_.OnRemoteClose(status, message);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions polaris/model/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ Instance::~Instance() {
if (impl != NULL) delete impl;
}

std::string& Instance::GetHost() { return impl->host_; }
std::string& Instance::GetHost() const { return impl->host_; }

int Instance::GetPort() { return impl->port_; }
int Instance::GetPort() const { return impl->port_; }

std::string& Instance::GetVpcId() { return impl->vpc_id_; }

Expand Down
14 changes: 9 additions & 5 deletions polaris/reactor/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ Reactor::Reactor() {
epoll_fd_ = epoll_create(kEpollEventSize);
epoll_events_ = new epoll_event[kEpollEventSize];
POLARIS_ASSERT(epoll_fd_ >= 0 && "reactor create epoll failed!");
stopped_ = true;
status_ = kReactorInit;
executor_tid_ = 0;
AddEventHandler(&notifier_);
}

Reactor::~Reactor() {
POLARIS_ASSERT(stopped_ == true);
POLARIS_ASSERT(status_ != kReactorRun);
RemoveEventHandler(notifier_.GetFd());

// 这里必须先删除timeout,因为有些定时任务会用于检查请求超时
Expand Down Expand Up @@ -102,7 +102,7 @@ void Reactor::SubmitTask(Task* task) {
}

void Reactor::Stop() {
stopped_ = true;
status_ = kReactorStop;
notifier_.Notify();
}

Expand Down Expand Up @@ -175,7 +175,11 @@ void Reactor::RunEpollTask(uint64_t timeout) {
}

void Reactor::Run(bool once) {
stopped_ = false;
if (once) { // 测试代码会重复调用本函数,直接进入Run状态
status_ = kReactorRun;
} else if (!status_.Cas(kReactorInit, kReactorRun)) {
return;
}
executor_tid_ = pthread_self();

// 屏蔽线程的pipe broken singal
Expand All @@ -185,7 +189,7 @@ void Reactor::Run(bool once) {
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
POLARIS_ASSERT(rc == 0)

while (!stopped_) {
while (status_ == kReactorRun) {
RunPendingTask();

RunEpollTask(CalculateEpollWaitTime());
Expand Down
5 changes: 4 additions & 1 deletion polaris/reactor/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

#include "reactor/notify.h"
#include "reactor/task.h"
#include "sync/atomic.h"
#include "sync/mutex.h"

struct epoll_event;

namespace polaris {

enum ReactorStatus { kReactorInit, kReactorRun, kReactorStop };

class Reactor {
public:
Reactor();
Expand Down Expand Up @@ -66,7 +69,7 @@ class Reactor {
int epoll_fd_;
epoll_event* epoll_events_;
pthread_t executor_tid_; // 记录运行Reactor循环的线程,用于检查线程不安全方法的调用
volatile bool stopped_;
sync::Atomic<ReactorStatus> status_;
Notifier notifier_;
std::map<int, EventBase*> fd_holder_; // 记录fd对应的event handler

Expand Down
6 changes: 4 additions & 2 deletions test/api/limit_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ TEST_F(LimitApiTest, TestCreateFromContext) {
this->DeleteConfig();
ASSERT_TRUE(context != NULL);
limit_api = LimitApi::Create(context);
ASSERT_FALSE(limit_api != NULL);
ASSERT_TRUE(limit_api != NULL);
delete limit_api;
delete context;

this->CreateConfig();
Expand Down Expand Up @@ -217,7 +218,8 @@ TEST_F(LimitApiTest, TestCreateFromShareContext) {
this->DeleteConfig();
ASSERT_TRUE(context != NULL);
limit_api = LimitApi::Create(context);
ASSERT_FALSE(limit_api != NULL);
ASSERT_TRUE(limit_api != NULL);
delete limit_api;
delete context;
}

Expand Down