-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathCluster.h
314 lines (245 loc) · 11.3 KB
/
Cluster.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#pragma once
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Poco/Net/SocketAddress.h>
#include <map>
#include <string>
#include <unordered_set>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Cluster contains connection pools to each node
/// With the local nodes, the connection is not established, but the request is executed directly.
/// Therefore we store only the number of local nodes
/// In the config, the cluster includes nodes <node> or <shard>
class Cluster
{
public:
Cluster(const Poco::Util::AbstractConfiguration & config,
const Settings & settings,
const String & config_prefix_,
const String & cluster_name);
/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
/// Local are also treated as remote if treat_local_port_as_remote is set and the local address includes a port
/// 'clickhouse_port' - port that this server instance listen for queries.
/// This parameter is needed only to check that some address is local (points to ourself).
///
/// Used for remote() function.
Cluster(
const Settings & settings,
const std::vector<std::vector<String>> & names,
const String & username,
const String & password,
UInt16 clickhouse_port,
bool treat_local_as_remote,
bool treat_local_port_as_remote,
bool secure = false,
Int64 priority = 1);
Cluster(const Cluster &)= delete;
Cluster & operator=(const Cluster &) = delete;
/// is used to set a limit on the size of the timeout
static Poco::Timespan saturate(Poco::Timespan v, Poco::Timespan limit);
public:
using SlotToShard = std::vector<UInt64>;
struct Address
{
/** In configuration file,
* addresses are located either in <node> elements:
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database>, <compression>, <priority>. <secure> if needed -->
* </node>
* ...
* or in <shard> and inside in <replica> elements:
* <shard>
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database>, <compression>, <priority>. <secure> if needed -->
* </replica>
* </shard>
*/
String host_name;
UInt16 port{0};
String user;
String password;
String quota_key;
/// For inter-server authorization
String cluster;
String cluster_secret;
UInt32 shard_index{}; /// shard serial number in configuration file, starting from 1.
UInt32 replica_index{}; /// replica serial number in this shard, starting from 1; zero means no replicas.
/// This database is selected when no database is specified for Distributed table
String default_database;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local = false;
bool user_specified = false;
Protocol::Compression compression = Protocol::Compression::Enable;
Protocol::Secure secure = Protocol::Secure::Disable;
Int64 priority = 1;
Address() = default;
Address(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const String & cluster_,
const String & cluster_secret_,
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);
Address(
const String & host_port_,
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool treat_local_port_as_remote,
bool secure_ = false,
Int64 priority_ = 1,
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);
/// Returns 'escaped_host_name:port'
String toString() const;
/// Returns 'host_name:port'
String readableString() const;
static String toString(const String & host_name, UInt16 port);
static std::pair<String, UInt16> fromString(const String & host_port_string);
/// Returns escaped shard{shard_index}_replica{replica_index} or escaped
/// user:password@resolved_host_address:resolved_host_port#default_database
/// depending on use_compact_format flag
String toFullString(bool use_compact_format) const;
/// Returns address with only shard index and replica index or full address without shard index and replica index
static Address fromFullString(const String & address_full_string);
/// Returns resolved address if it does resolve.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); }
bool operator==(const Address & other) const { return tuple() == other.tuple(); }
private:
bool isLocal(UInt16 clickhouse_port) const;
};
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
///
/// Contains different path for permutations of:
/// - prefer_localhost_replica
/// Notes with prefer_localhost_replica==0 will contains local nodes.
/// - use_compact_format_in_distributed_parts_names
/// See toFullString()
///
/// This is cached to avoid looping by replicas in insertPathForInternalReplication().
struct ShardInfoInsertPathForInternalReplication
{
/// prefer_localhost_replica == 1 && use_compact_format_in_distributed_parts_names=0
std::string prefer_localhost_replica;
/// prefer_localhost_replica == 0 && use_compact_format_in_distributed_parts_names=0
std::string no_prefer_localhost_replica;
/// use_compact_format_in_distributed_parts_names=1
std::string compact;
};
struct ShardInfo
{
public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); }
size_t getLocalNodeCount() const { return local_addresses.size(); }
size_t getRemoteNodeCount() const { return per_replica_pools.size() - local_addresses.size(); }
size_t getAllNodeCount() const { return per_replica_pools.size(); }
bool hasInternalReplication() const { return has_internal_replication; }
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const;
public:
ShardInfoInsertPathForInternalReplication insert_path_for_internal_replication;
/// Number of the shard, the indexation begins with 1
UInt32 shard_num = 0;
UInt32 weight = 1;
Addresses local_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
/// Connection pool for each replica, contains nullptr for local replicas
ConnectionPoolPtrs per_replica_pools;
bool has_internal_replication = false;
};
using ShardsInfo = std::vector<ShardInfo>;
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
const ShardInfo & getAnyShardInfo() const
{
if (shards_info.empty())
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
return shards_info.front();
}
/// The number of remote shards.
size_t getRemoteShardCount() const { return remote_shard_count; }
/// The number of clickhouse nodes located locally
/// we access the local nodes directly.
size_t getLocalShardCount() const { return local_shard_count; }
/// The number of all shards.
size_t getShardCount() const { return shards_info.size(); }
const String & getSecret() const { return secret; }
/// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
/// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
bool maybeCrossReplication() const;
private:
SlotToShard slot_to_shard;
public:
const SlotToShard & getSlotToShard() const { return slot_to_shard; }
private:
void initMisc();
/// For getClusterWithMultipleShards implementation.
struct SubclusterTag {};
Cluster(SubclusterTag, const Cluster & from, const std::vector<size_t> & indices);
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
/// Inter-server secret
String secret;
String hash_of_addresses;
/// Description of the cluster shards.
ShardsInfo shards_info;
/// Any remote shard.
ShardInfo * any_remote_shard_info = nullptr;
/// Non-empty is either addresses or addresses_with_failover.
/// The size and order of the elements in the corresponding array corresponds to shards_info.
/// An array of shards. For each shard, an array of replica addresses (servers that are considered identical).
AddressesWithFailover addresses_with_failover;
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
String name;
};
using ClusterPtr = std::shared_ptr<Cluster>;
class Clusters
{
public:
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix = "remote_servers");
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(const Poco::Util::AbstractConfiguration & new_config, const Settings & settings, const String & config_prefix, Poco::Util::AbstractConfiguration * old_config = nullptr);
using Impl = std::map<String, ClusterPtr>;
Impl getContainer() const;
protected:
/// setup outside of this class, stored to prevent deleting from impl on config update
std::unordered_set<std::string> automatic_clusters;
Impl impl;
mutable std::mutex mutex;
};
}