-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathConnectionCollector.cpp
124 lines (107 loc) · 3.78 KB
/
ConnectionCollector.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <QueryPipeline/ConnectionCollector.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include "Core/Protocol.h"
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric AsyncDrainedConnections;
extern const Metric ActiveAsyncDrainedConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_SERVER;
}
std::unique_ptr<ConnectionCollector> ConnectionCollector::connection_collector;
static constexpr UInt64 max_connection_draining_tasks_per_thread = 20;
ConnectionCollector::ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads)
: WithMutableContext(global_context_), pool(max_threads, max_threads, max_threads * max_connection_draining_tasks_per_thread)
{
}
ConnectionCollector & ConnectionCollector::init(ContextMutablePtr global_context_, size_t max_threads)
{
if (connection_collector)
{
throw Exception("Connection collector is initialized twice. This is a bug", ErrorCodes::LOGICAL_ERROR);
}
connection_collector.reset(new ConnectionCollector(global_context_, max_threads));
return *connection_collector;
}
struct AsyncDrainTask
{
const ConnectionPoolWithFailoverPtr pool;
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
std::shared_ptr<CurrentMetrics::Increment> metric_increment
= std::make_shared<CurrentMetrics::Increment>(CurrentMetrics::ActiveAsyncDrainedConnections);
};
std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept
{
if (!connections)
return nullptr;
if (connection_collector)
{
if (connection_collector->pool.trySchedule(AsyncDrainTask{pool, connections}))
{
CurrentMetrics::add(CurrentMetrics::AsyncDrainedConnections, 1);
return nullptr;
}
}
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error)
{
bool is_drained = false;
try
{
Packet packet = connections.drain();
is_drained = true;
switch (packet.type)
{
case Protocol::Server::EndOfStream:
case Protocol::Server::Log:
case Protocol::Server::ProfileEvents:
break;
case Protocol::Server::Exception:
packet.exception->rethrow();
break;
default:
/// Connection should be closed in case of unexpected packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw NetException(
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER,
"Unexpected packet {} from one of the following replicas: {}. (expected EndOfStream, Log, ProfileEvents or Exception)",
Protocol::Server::toString(packet.type),
connections.dumpAddresses());
}
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
if (!is_drained)
{
try
{
connections.disconnect();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
if (throw_error)
throw;
}
}
}