Skip to content

Commit

Permalink
Merge pull request #12 from lambdaliu/refactor
Browse files Browse the repository at this point in the history
merge minor refator and fix
  • Loading branch information
lambdaliu committed Oct 4, 2021
2 parents 887c064 + 76f2cf9 commit 4d31bae
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 16 deletions.
4 changes: 2 additions & 2 deletions include/polaris/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class Instance {

std::string& GetId(); ///< 服务实例ID

std::string& GetHost(); ///< 服务的节点IP或者域名
std::string& GetHost() const; ///< 服务的节点IP或者域名

int GetPort(); ///< 节点端口号
int GetPort() const; ///< 节点端口号

uint64_t GetLocalId(); /// 本地生成的唯一ID

Expand Down
2 changes: 1 addition & 1 deletion polaris/api/limit_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ LimitApi* LimitApi::Create(Context* context, std::string& err_msg) {
POLARIS_LOG(LOG_ERROR, "%s", err_msg.c_str());
return NULL;
}
if (context->GetContextMode() != kLimitContext) {
if (context->GetContextMode() != kLimitContext && context->GetContextMode() != kShareContext) {
err_msg = "create limit api failed because context isn't init with limit mode";
POLARIS_LOG(LOG_ERROR, "%s", err_msg.c_str());
return NULL;
Expand Down
7 changes: 6 additions & 1 deletion polaris/cache/cache_persist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "cache/persist_task.h"
#include "logger.h"
#include "model/constants.h"
#include "model/model_impl.h"
#include "polaris/config.h"
#include "reactor/reactor.h"
#include "utils/file_utils.h"
Expand Down Expand Up @@ -147,7 +148,8 @@ ServiceData* CachePersist::LoadServiceData(const ServiceKey& service_key,
}
std::string data((std::istreambuf_iterator<char>(input_file)), std::istreambuf_iterator<char>());
input_file.close();
uint64_t available_time = Time::GetCurrentTimeMs();
uint64_t current_time = Time::GetCurrentTimeMs();
uint64_t available_time = current_time;
// 如果磁盘缓存已经不在可用时间范围内,则需等待一段时间后从服务器同步失败则升级立即使用
if (sync_time + persist_config_.GetAvailableTime() < available_time) {
available_time += persist_config_.GetUpgradeWaitTime();
Expand All @@ -166,6 +168,9 @@ ServiceData* CachePersist::LoadServiceData(const ServiceKey& service_key,
service_data->DecrementRef();
return NULL;
}
POLARIS_LOG(LOG_INFO, "load %s from disk for service[%s/%s] succ, available after %" PRIu64 "s",
DataTypeToStr(data_type), service_key.namespace_.c_str(), service_key.name_.c_str(),
available_time - current_time);
return service_data;
}

Expand Down
9 changes: 7 additions & 2 deletions polaris/grpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void GrpcStream::OnData(Buffer& data, bool end_stream) {
}
if (end_stream) {
callback_.OnRemoteClose(kGrpcStatusOk, "end stream with data frame");
remote_end_ = true;
}
}

Expand All @@ -140,13 +141,17 @@ void GrpcStream::OnTrailers(HeaderMap* trailers) {
}
std::string grpc_message = trailers->GetGrpcMessage();
delete trailers;
if (!remote_end_) {
callback_.OnRemoteClose(grpc_status, grpc_message);
}
remote_end_ = true;
callback_.OnRemoteClose(grpc_status, grpc_message);
}

void GrpcStream::OnReset(GrpcStatusCode status, const std::string& message) {
if (!remote_end_) {
callback_.OnRemoteClose(status, message);
}
remote_end_ = true;
callback_.OnRemoteClose(status, message);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions polaris/model/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ Instance::~Instance() {
if (impl != NULL) delete impl;
}

std::string& Instance::GetHost() { return impl->host_; }
std::string& Instance::GetHost() const { return impl->host_; }

int Instance::GetPort() { return impl->port_; }
int Instance::GetPort() const { return impl->port_; }

std::string& Instance::GetVpcId() { return impl->vpc_id_; }

Expand Down
14 changes: 9 additions & 5 deletions polaris/reactor/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ Reactor::Reactor() {
epoll_fd_ = epoll_create(kEpollEventSize);
epoll_events_ = new epoll_event[kEpollEventSize];
POLARIS_ASSERT(epoll_fd_ >= 0 && "reactor create epoll failed!");
stopped_ = true;
status_ = kReactorInit;
executor_tid_ = 0;
AddEventHandler(&notifier_);
}

Reactor::~Reactor() {
POLARIS_ASSERT(stopped_ == true);
POLARIS_ASSERT(status_ != kReactorRun);
RemoveEventHandler(notifier_.GetFd());

// 这里必须先删除timeout,因为有些定时任务会用于检查请求超时
Expand Down Expand Up @@ -102,7 +102,7 @@ void Reactor::SubmitTask(Task* task) {
}

void Reactor::Stop() {
stopped_ = true;
status_ = kReactorStop;
notifier_.Notify();
}

Expand Down Expand Up @@ -175,7 +175,11 @@ void Reactor::RunEpollTask(uint64_t timeout) {
}

void Reactor::Run(bool once) {
stopped_ = false;
if (once) { // 测试代码会重复调用本函数,直接进入Run状态
status_ = kReactorRun;
} else if (!status_.Cas(kReactorInit, kReactorRun)) {
return;
}
executor_tid_ = pthread_self();

// 屏蔽线程的pipe broken singal
Expand All @@ -185,7 +189,7 @@ void Reactor::Run(bool once) {
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
POLARIS_ASSERT(rc == 0)

while (!stopped_) {
while (status_ == kReactorRun) {
RunPendingTask();

RunEpollTask(CalculateEpollWaitTime());
Expand Down
5 changes: 4 additions & 1 deletion polaris/reactor/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

#include "reactor/notify.h"
#include "reactor/task.h"
#include "sync/atomic.h"
#include "sync/mutex.h"

struct epoll_event;

namespace polaris {

enum ReactorStatus { kReactorInit, kReactorRun, kReactorStop };

class Reactor {
public:
Reactor();
Expand Down Expand Up @@ -66,7 +69,7 @@ class Reactor {
int epoll_fd_;
epoll_event* epoll_events_;
pthread_t executor_tid_; // 记录运行Reactor循环的线程,用于检查线程不安全方法的调用
volatile bool stopped_;
sync::Atomic<ReactorStatus> status_;
Notifier notifier_;
std::map<int, EventBase*> fd_holder_; // 记录fd对应的event handler

Expand Down
6 changes: 4 additions & 2 deletions test/api/limit_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ TEST_F(LimitApiTest, TestCreateFromContext) {
this->DeleteConfig();
ASSERT_TRUE(context != NULL);
limit_api = LimitApi::Create(context);
ASSERT_FALSE(limit_api != NULL);
ASSERT_TRUE(limit_api != NULL);
delete limit_api;
delete context;

this->CreateConfig();
Expand Down Expand Up @@ -217,7 +218,8 @@ TEST_F(LimitApiTest, TestCreateFromShareContext) {
this->DeleteConfig();
ASSERT_TRUE(context != NULL);
limit_api = LimitApi::Create(context);
ASSERT_FALSE(limit_api != NULL);
ASSERT_TRUE(limit_api != NULL);
delete limit_api;
delete context;
}

Expand Down

0 comments on commit 4d31bae

Please sign in to comment.