forked from anqin/trident
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc_server_impl.h
171 lines (125 loc) · 4.59 KB
/
rpc_server_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright (c) 2014 The Trident Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
//
#ifndef _TRIDENT_RPC_SERVER_IMPL_H_
#define _TRIDENT_RPC_SERVER_IMPL_H_
#include <deque>
#include <trident/common_internal.h>
#include <trident/rpc_controller.h>
#include <trident/rpc_server.h>
#include <trident/rpc_server_stream.h>
#include <trident/rpc_endpoint.h>
#include <trident/rpc_listener.h>
#include <trident/service_pool.h>
#include <trident/thread_group_impl.h>
#include <trident/timer_worker.h>
namespace trident {
class RpcServerImpl : public trident::enable_shared_from_this<RpcServerImpl>
{
public:
static const int MAINTAIN_INTERVAL_IN_MS = 100;
public:
RpcServerImpl(const RpcServerOptions& options,
RpcServer::EventHandler* handler);
virtual ~RpcServerImpl();
bool Start(const std::string& server_address);
void Stop();
RpcServerOptions GetOptions();
void ResetOptions(const RpcServerOptions& options);
bool RegisterService(google::protobuf::Service* service, bool take_ownership);
int ServiceCount();
int ConnectionCount();
void GetPendingStat(int64* pending_message_count,
int64* pending_buffer_size, int64* pending_data_size);
bool IsListening();
// Restart the listener. It will restart listening anyway, even if it is already in
// listening. Return false if the server is not started, or fail to restart listening.
bool RestartListen();
private:
void OnCreated(const RpcServerStreamPtr& stream);
void OnAccepted(const RpcServerStreamPtr& stream);
void OnAcceptFailed(
RpcErrorCode error_code,
const std::string& error_text);
void OnReceived(
const RpcEndpoint& local_endpoint,
const RpcEndpoint& remote_endpoint,
const RpcMeta& meta,
const RpcServerStreamWPtr& stream,
const ReadBufferPtr& buffer,
int64 data_size);
static void OnCallMethodDone(
RpcController* controller,
google::protobuf::Message* request,
google::protobuf::Message* response,
MethodBoard* method_board,
PTime start_time);
static void SendFailedResponse(
const RpcServerStreamWPtr& stream,
uint64 sequence_id,
int32 error_code,
const std::string& reason);
static void SendSucceedResponse(
const RpcServerStreamWPtr& stream,
uint64 sequence_id,
CompressType compress_type,
google::protobuf::Message* response);
static void OnSendResponseDone(
const RpcEndpoint& remote_endpoint,
uint64 sequence_id,
RpcErrorCode error_code);
void StopStreams();
void ClearStreams();
void TimerMaintain(const PTime& now);
static bool ParseMethodFullName(const std::string& method_full_name,
std::string* service_full_name, std::string* method_name);
private:
struct FlowControlItem
{
int token; // always <= 0
RpcServerStream* stream;
FlowControlItem(int t, RpcServerStream* s) : token(t), stream(s) {}
// comparator: nearer to zero, higher priority
bool operator< (const FlowControlItem& o) const
{
return token > o.token;
}
};
private:
RpcServerOptions _options;
RpcServer::EventHandler* _event_handler;
volatile bool _is_running;
MutexLock _start_stop_lock;
PTime _epoch_time;
int64 _ticks_per_second;
int64 _last_maintain_ticks;
int64 _last_restart_listen_ticks;
int64 _last_switch_stat_slot_ticks;
int64 _last_print_connection_ticks;
int64 _slice_count;
int64 _slice_quota_in;
int64 _slice_quota_out;
int64 _max_pending_buffer_size;
int64 _keep_alive_ticks;
int64 _restart_listen_interval_ticks;
int64 _switch_stat_slot_interval_ticks;
int64 _print_connection_interval_ticks;
ServicePoolPtr _service_pool;
FlowControllerPtr _flow_controller;
ThreadGroupImplPtr _maintain_thread_group;
ThreadGroupImplPtr _work_thread_group;
std::string _server_address;
RpcEndpoint _listen_endpoint;
RpcListenerPtr _listener;
TimerWorkerPtr _timer_worker;
typedef std::deque<RpcServerStreamPtr> StreamList;
StreamList _stream_list;
FastLock _stream_list_lock;
volatile int _live_stream_count;
TRIDENT_DISALLOW_EVIL_CONSTRUCTORS(RpcServerImpl);
}; // class RpcServerImpl
} // namespace trident
#endif // _TRIDENT_RPC_SERVER_IMPL_H_
/* vim: set ts=4 sw=4 sts=4 tw=100 */