Skip to content

Commit

Permalink
Merge pull request #8 from sogou/master
Browse files Browse the repository at this point in the history
merge from master
  • Loading branch information
holmes1412 committed May 1, 2021
2 parents 5c2f6e7 + eb0dd3a commit 8a1afd4
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 55 deletions.
6 changes: 2 additions & 4 deletions src/generator/printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <unordered_map>

#include "descriptor.h"
#include "thrift/rpc_thrift_enum.h"

Expand Down Expand Up @@ -944,9 +943,8 @@ class Printer
std::string srpc_include_format = R"(#pragma once
#include <stdio.h>
#include <string>
#include "%s.%s.h"
#include "srpc/rpc_define.h"
#include "%s.%s.h"
)";

std::string thrift_include_package_format = R"(
Expand Down Expand Up @@ -1168,7 +1166,7 @@ inline %sClient::%sClient(const struct srpc::RPCClientParams *params):
)";

std::string client_constructor_methods_params_srpc_thrift_format = R"(
if (params->task_params.data_type == INT_MAX)
if (params->task_params.data_type == srpc::RPCDataUndefined)
{
temp_params = *temp;
temp_params.task_params.data_type = srpc::RPCDataThrift;
Expand Down
3 changes: 2 additions & 1 deletion src/rpc_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ using ProtobufIDLMessage = google::protobuf::Message;

enum RPCDataType
{
RPCDataUndefined = -1,
RPCDataProtobuf = 0,
RPCDataThrift = 1,
RPCDataJson = 2
RPCDataJson = 2,
};

enum RPCStatusCode
Expand Down
4 changes: 2 additions & 2 deletions src/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ inline void RPCClient<RPCTYPE>::init(const RPCClientParams *params)
{
this->params = *params;

if (this->params.task_params.data_type == INT_MAX)
if (this->params.task_params.data_type == RPCDataUndefined)
this->params.task_params.data_type = RPCTYPE::default_data_type;

this->has_addr_info = SRPCGlobal::get_instance()->task_init(this->params,
Expand All @@ -146,7 +146,7 @@ inline void RPCClient<RPCTYPE>::__task_init(COMPLEXTASK *task) const
else
{
task->init(this->uri);
task->set_type(this->params.is_ssl ? TT_TCP_SSL : TT_TCP);
task->set_transport_type(this->params.is_ssl ? TT_TCP_SSL : TT_TCP);
}
}

Expand Down
20 changes: 11 additions & 9 deletions src/rpc_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
#include <string>
#include <workflow/WFServer.h>
#include <workflow/URIParser.h>
//#include "rpc_span_policies.h"
#include "rpc_basic.h"

namespace srpc {

struct RPCTaskParams
{
int send_timeout;
int receive_timeout;
int watch_timeout;
int keep_alive_timeout;
int retry_max;
Expand Down Expand Up @@ -60,17 +61,18 @@ struct RPCServerParams : public WFServerParams
{}
};

static constexpr RPCTaskParams RPC_TASK_PARAMS_DEFAULT =
static constexpr struct RPCTaskParams RPC_TASK_PARAMS_DEFAULT =
{
/* .send_timeout = */ INT_MAX,
/* .watch_timeout = */ INT_MAX,
/* .keep_alive_timeout = */ INT_MAX,
/* .retry_max = */ INT_MAX,
/* .compress_type = */ INT_MAX,
/* .data_type = */ INT_MAX
/* .send_timeout = */ -1,
/* .receive_timeout = */ -1,
/* .watch_timeout = */ 0,
/* .keep_alive_timeout = */ 30 * 1000,
/* .retry_max = */ 0,
/* .compress_type = */ RPCCompressNone,
/* .data_type = */ RPCDataUndefined
};

static const RPCClientParams RPC_CLIENT_PARAMS_DEFAULT =
static const struct RPCClientParams RPC_CLIENT_PARAMS_DEFAULT =
{
/* .task_params = */ RPC_TASK_PARAMS_DEFAULT,
/* .host = */ "",
Expand Down
22 changes: 7 additions & 15 deletions src/rpc_task.inl
Original file line number Diff line number Diff line change
Expand Up @@ -401,24 +401,16 @@ inline RPCClientTask<RPCREQ, RPCRESP>::RPCClientTask(
this->set_callback(std::bind(&RPCClientTask::rpc_callback,
this, std::placeholders::_1));

if (params->send_timeout != INT_MAX)
this->set_send_timeout(params->send_timeout);
this->set_send_timeout(params->send_timeout);
this->set_receive_timeout(params->receive_timeout);
watch_timeout_ = params->watch_timeout;
this->set_keep_alive(params->keep_alive_timeout);
this->set_retry_max(params->retry_max);

if (params->watch_timeout != INT_MAX)
watch_timeout_ = params->watch_timeout;
else
watch_timeout_ = 0;

if (params->keep_alive_timeout != INT_MAX)
this->set_keep_alive(params->keep_alive_timeout);

if (params->retry_max != INT_MAX)
this->set_retry_max(params->retry_max);

if (params->compress_type != INT_MAX)
if (params->compress_type != RPCCompressNone)
this->req.set_compress_type(params->compress_type);

if (params->data_type != INT_MAX)
if (params->data_type != RPCDataUndefined)
this->req.set_data_type(params->data_type);

this->req.set_service_name(service_name);
Expand Down
66 changes: 43 additions & 23 deletions test/unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,30 @@ void test_pb(SERVER& server)
req1.set_b(456);
client.Add(&req1, [&](AddResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_EQ(response->c(), 123 + 456);

SubstrRequest req2;

req2.set_str("hello world!");
req2.set_idx(6);
client.Substr(&req2, [&](SubstrResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_TRUE(response->str() == "world!");
if (ctx->success())
{
EXPECT_EQ(response->c(), 123 + 456);

SubstrRequest req2;

req2.set_str("hello world!");
req2.set_idx(6);
client.Substr(&req2, [&](SubstrResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_TRUE(response->str() == "world!");
mutex.lock();
done = true;
mutex.unlock();
cond.notify_one();
});
}
else
{
mutex.lock();
done = true;
mutex.unlock();
cond.notify_one();
});
}
});

std::unique_lock<std::mutex> lock(mutex);
Expand Down Expand Up @@ -141,10 +151,10 @@ void test_thrift(SERVER& server)
TestThriftServiceImpl impl;

server.add_service(&impl);
EXPECT_TRUE(server.start("127.0.0.1", 9964) == 0) << "server start failed";
EXPECT_TRUE(server.start("127.0.0.1", 9965) == 0) << "server start failed";

client_params.host = "127.0.0.1";
client_params.port = 9964;
client_params.port = 9965;
CLIENT client(&client_params);

TestThrift::addRequest req1;
Expand All @@ -153,21 +163,31 @@ void test_thrift(SERVER& server)
req1.b = 456;
client.add(&req1, [&](TestThrift::addResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_EQ(response->result, 123 + 456);

TestThrift::substrRequest req2;

req2.str = "hello world!";
req2.idx = 6;
req2.length = -1;
client.substr(&req2, [&](TestThrift::substrResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_TRUE(response->result == "world!");
if (ctx->success())
{
EXPECT_EQ(response->result, 123 + 456);

TestThrift::substrRequest req2;

req2.str = "hello world!";
req2.idx = 6;
req2.length = -1;
client.substr(&req2, [&](TestThrift::substrResponse *response, RPCContext *ctx) {
EXPECT_EQ(ctx->success(), true);
EXPECT_TRUE(response->result == "world!");
mutex.lock();
done = true;
mutex.unlock();
cond.notify_one();
});
}
else
{
mutex.lock();
done = true;
mutex.unlock();
cond.notify_one();
});
}
});

std::unique_lock<std::mutex> lock(mutex);
Expand Down

0 comments on commit 8a1afd4

Please sign in to comment.