-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathKeeperTCPHandler.h
108 lines (80 loc) · 3.09 KB
/
KeeperTCPHandler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#pragma once
#include "config.h"
#if USE_NURAFT
#include <Poco/Net/TCPServerConnection.h>
#include <Common/MultiVersion.h>
#include "IServer.h"
#include <Common/Stopwatch.h>
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Coordination/KeeperDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <unordered_map>
#include <Coordination/KeeperConnectionStats.h>
#include <Poco/Timestamp.h>
namespace DB
{
struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;
using ThreadSafeResponseQueue = ConcurrentBoundedQueue<Coordination::ZooKeeperResponsePtr>;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;
struct LastOp;
using LastOpMultiVersion = MultiVersion<LastOp>;
using LastOpPtr = LastOpMultiVersion::Version;
class KeeperTCPHandler : public Poco::Net::TCPServerConnection
{
public:
static void registerConnection(KeeperTCPHandler * conn);
static void unregisterConnection(KeeperTCPHandler * conn);
/// dump all connections statistics
static void dumpConnections(WriteBufferFromOwnString & buf, bool brief);
static void resetConnsStats();
private:
static std::mutex conns_mutex;
/// all connections
static std::unordered_set<KeeperTCPHandler *> connections;
public:
KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
void run() override;
KeeperConnectionStats getConnectionStats() const;
void dumpStats(WriteBufferFromOwnString & buf, bool brief);
void resetStats();
~KeeperTCPHandler() override;
private:
IServer & server;
Poco::Logger * log;
ContextPtr global_context;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan min_session_timeout;
Poco::Timespan max_session_timeout;
Poco::Timespan session_timeout;
int64_t session_id{-1};
Stopwatch session_stopwatch;
SocketInterruptablePollWrapperPtr poll_wrapper;
ThreadSafeResponseQueuePtr responses;
Coordination::XID close_xid = Coordination::CLOSE_XID;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out;
void runImpl();
void sendHandshake(bool has_leader);
Poco::Timespan receiveHandshake(int32_t handshake_length);
static bool isHandShake(int32_t handshake_length);
bool tryExecuteFourLetterWordCmd(int32_t command);
std::pair<Coordination::OpNum, Coordination::XID> receiveRequest();
void packageSent();
void packageReceived();
void updateStats(Coordination::ZooKeeperResponsePtr & response);
Poco::Timestamp established;
using Operations = std::unordered_map<Coordination::XID, Poco::Timestamp>;
Operations operations;
LastOpMultiVersion last_op;
mutable std::mutex conn_stats_mutex;
KeeperConnectionStats conn_stats;
};
}
#endif