-
Notifications
You must be signed in to change notification settings - Fork 0
/
dht.h
218 lines (199 loc) · 6.22 KB
/
dht.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#pragma once
#include "udp.h"
#include "time.h"
#include "bencode.h"
#include <set>
#include <boost/operators.hpp>
#include <queue>
class hash_id : boost::totally_ordered<hash_id> {
public:
// Return a random ID
static hash_id random();
// Construct a node-id based on the hash of some data
static hash_id hash_of(const std::string& data);
// Make a hash from raw data
explicit hash_id(const char* data);
// Make a hash from ip address
explicit hash_id(const ip_address& ip);
// Back to std::string for packing up
std::string pack() const;
// Human readable std::string for printing
std::string to_string() const;
// Compute # of shared bits
size_t shared_bits(const hash_id& rhs) const;
// Randomize lower bits to increase privacy + search breadth
hash_id randomize(size_t depth) const;
// Ordering
bool operator<(const hash_id& rhs) const;
bool operator==(const hash_id& rhs) const;
private:
hash_id() {}
unsigned char m_buf[20];
};
// Supports bencoding + basic query timeouts, etc
// Often passes be_map& to places that would normally be const
// to allow the use of default constructed map entries
class dht_rpc
{
public:
// Called when a query succeeds
typedef std::function<void (const std::string&, be_map&, be_map&)> success_handler_t;
// Called when a query fails
typedef std::function<void (const std::string&)> failure_handler_t;
// Called for inbound queries
typedef std::function<be_map (be_map&)> query_handler_t;
// Construct a new RPC handler
dht_rpc(timer_mgr& tm, udp_port& udp);
// Send an outbound request
std::string send_request(const udp_endpoint& who, const std::string& rtype, const be_map& args,
const success_handler_t& on_success, const failure_handler_t& on_failure);
// Cancel an outstanding request
void cancel_request(const std::string& tx_id);
// Add a query handler
void add_handler(const std::string& qtype, const query_handler_t& q);
private:
bool on_incoming(const udp_endpoint& who, const char* buf, size_t len);
void on_query(const udp_endpoint& who, be_map& query);
void on_response(const udp_endpoint& who, const std::string& type, be_map& resp);
void on_timeout(const std::string& tx_id);
timer_mgr& m_tm;
udp_port& m_udp;
struct pending_t {
udp_endpoint who;
std::string qtype;
success_handler_t on_success;
failure_handler_t on_failure;
timer_id timer;
};
std::map<std::string, pending_t> m_pending;
std::map<std::string, query_handler_t> m_handlers;
};
struct dht_node
{
dht_node(const udp_endpoint& addr, const hash_id& nid, int depth);
udp_endpoint addr;
hash_id nid;
int depth;
uint32_t rand_key;
uint32_t responses; // Total number of responses
uint32_t errors; // Number of errors since last valid response
timer_id stale_timer; // When do I go stale (if good)
};
typedef std::shared_ptr<dht_node> dht_node_ptr;
class ptr_less {
public:
bool operator()(const dht_node_ptr& a, const dht_node_ptr& b) const;
};
class dht_location;
class dht_bucket
{
friend class dht_location;
public:
// Construct a new bucket
dht_bucket(dht_location& location, size_t depth);
// Heard about a node
void on_node(const udp_endpoint& addr, const hash_id& nid);
// Try to get closer to my location, return if I sent a packet
bool try_send();
// Print
bool print() const;
// Cancel all outstanding goo
void cancel();
private:
// Handle various callbacks
void on_get_success(const std::string& tx_id, const dht_node_ptr& p, be_map& resp);
void on_failure(const std::string& tx_id, const dht_node_ptr& p);
void on_good_timeout(const dht_node_ptr& p);
// Which location am I part of
dht_location& m_location;
// My depth
size_t m_depth;
// All nodes by endpoint address
std::map<udp_endpoint, dht_node_ptr> m_all;
// Nodes which are currently good, ordered by total responses
std::set<dht_node_ptr, ptr_less> m_good;
// Nodes which are we have never heard from, or not in a while, ordered by total responses
std::set<dht_node_ptr, ptr_less> m_potential;
// tx_id of pending requests of some sort (in neither map)
std::set<std::string> m_pending;
// Nodes that recently failed
std::queue<udp_endpoint> m_failures;
};
class dht;
class dht_location
{
friend class dht_bucket;
public:
// Make a new DHT location
dht_location(dht& dht, const hash_id& tid, const duration& peer_delay);
~dht_location();
// Set handler
void set_ready_handler(const std::function<void ()>& on_ready);
// Set publish flag
void set_publish(bool publish);
// Handle a new node being found
void on_node(const udp_endpoint& addr, const hash_id& nid);
// Print the current state
void print() const;
// Get current node list
std::map<udp_endpoint, int> get_peers() const;
// Do deep cancelation
void cancel();
private:
struct peer_info
{
peer_info() : pending(false) {}
bool pending;
time_point when;
std::set<udp_endpoint> what;
};
void start_timer();
void on_timer();
void send_bootstrap(const udp_endpoint& ep);
void on_bootstrap(be_map& resp);
void on_ready();
void on_good_up(dht_node_ptr p);
void on_good_down(dht_node_ptr p);
void send_get_peers(dht_node_ptr p);
void on_get_peers(dht_node_ptr p, be_map& m);
void on_error(dht_node_ptr p);
void on_peer_timer();
dht& m_dht;
hash_id m_tid;
bool m_publish;
duration m_peer_delay;
std::function<void()> m_on_ready;
timer_id m_send_timer;
std::vector<dht_bucket> m_buckets;
size_t m_node_count;
time_point m_last_bootstrap;
bool m_is_ready;
std::map<dht_node_ptr, peer_info, ptr_less> m_good;
timer_id m_peer_timer;
std::set<std::string> m_pending;
};
class dht
{
friend class dht_bucket;
friend class dht_location;
friend class dht_bootstrap_node;
public:
dht(timer_mgr& tm, udp_port& udp);
void set_external(const udp_endpoint& ep);
void add_bootstrap(const udp_endpoint& ep);
size_t run_query(const hash_id& nid, const duration& refresh_rate);
void set_publish(size_t which, bool publish);
void set_ready_handler(size_t which, const std::function<void()>& on_done);
std::map<udp_endpoint, int> check_query(size_t which);
void cancel_query(size_t which);
void stop_all();
private:
void process_nodes(const std::string& nodes);
timer_mgr& m_tm;
dht_rpc m_rpc;
udp_endpoint m_external;
hash_id m_nid;
std::vector<udp_endpoint> m_bootstraps;
size_t m_next_query_id;
std::map<size_t, std::shared_ptr<dht_location>> m_locations;
};