From 3061fcf489cddbaf2cea150d64ac8ab8fd623785 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sun, 1 Dec 2013 15:27:32 -0800 Subject: [PATCH 1/8] create get_peers from find_data get_peers has all of the code specific to queries for peer info leaving find_data with the common base functionality suitable for use with other types of queries (e.g. get). --- Jamfile | 1 + include/libtorrent/kademlia/find_data.hpp | 47 +--- include/libtorrent/kademlia/get_peers.hpp | 101 +++++++ include/libtorrent/kademlia/refresh.hpp | 6 +- .../kademlia/traversal_algorithm.hpp | 2 +- src/kademlia/find_data.cpp | 199 +------------- src/kademlia/get_peers.cpp | 255 ++++++++++++++++++ src/kademlia/node.cpp | 6 +- src/kademlia/refresh.cpp | 4 +- 9 files changed, 380 insertions(+), 241 deletions(-) create mode 100644 include/libtorrent/kademlia/get_peers.hpp create mode 100644 src/kademlia/get_peers.cpp diff --git a/Jamfile b/Jamfile index 97fee1e660..e414c0f9f4 100755 --- a/Jamfile +++ b/Jamfile @@ -576,6 +576,7 @@ KADEMLIA_SOURCES = routing_table traversal_algorithm logging + get_peers ; ED25519_SOURCES = diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 5c6134914b..bc8047b802 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -57,22 +57,16 @@ class node_impl; // -------- find data ----------- -//TODO: 3 rename this class to get_peers, since that's what it does -// find_data is an unnecessarily generic name class find_data : public traversal_algorithm { public: - typedef boost::function const&)> data_callback; - typedef boost::function > const&, bool)> nodes_callback; + typedef boost::function > const&)> nodes_callback; - void got_peers(std::vector const& peers); void got_write_token(node_id const& n, std::string const& write_token) { m_write_tokens[n] = write_token; } find_data(node_impl& node, node_id target - , data_callback const& dcallback - , nodes_callback const& ncallback - , bool noseeds); + , nodes_callback const& ncallback); virtual void start(); @@ -83,39 +77,11 @@ class find_data : public traversal_algorithm protected: virtual void done(); - observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); - virtual bool invoke(observer_ptr o); + virtual observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); - data_callback m_data_callback; nodes_callback m_nodes_callback; std::map m_write_tokens; - node_id const m_target; - bool m_done:1; - bool m_got_peers:1; - bool m_noseeds:1; -}; - -class obfuscated_get_peers : public find_data -{ -public: - typedef find_data::nodes_callback done_callback; - - obfuscated_get_peers(node_impl& node, node_id target - , data_callback const& dcallback - , nodes_callback const& ncallback - , bool noseeds); - - virtual char const* name() const; - -protected: - - observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); - virtual bool invoke(observer_ptr o); - virtual void done(); -private: - // when set to false, we no longer obfuscate - // the target hash, and send regular get_peers - bool m_obfuscated; + bool m_done; }; class find_data_observer : public observer @@ -126,7 +92,10 @@ class find_data_observer : public observer , udp::endpoint const& ep, node_id const& id) : observer(algorithm, ep, id) {} - void reply(msg const&); + virtual void reply(msg const&); + +protected: + virtual void extract_data(lazy_entry const*) {} }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/get_peers.hpp b/include/libtorrent/kademlia/get_peers.hpp new file mode 100644 index 0000000000..a8d60d11ca --- /dev/null +++ b/include/libtorrent/kademlia/get_peers.hpp @@ -0,0 +1,101 @@ +/* + +Copyright (c) 2006-2013, Arvid Norberg & Daniel Wallin +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef LIBTORRENT_GET_PEERS_HPP +#define LIBTORRENT_GET_PEERS_HPP + +#include + +namespace libtorrent { namespace dht +{ + +class get_peers : public find_data +{ +public: + typedef boost::function const&)> data_callback; + + void got_peers(std::vector const& peers); + + get_peers(node_impl& node, node_id target + , data_callback const& dcallback + , nodes_callback const& ncallback + , bool noseeds); + + virtual char const* name() const; + +protected: + virtual bool invoke(observer_ptr o); + virtual observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + + data_callback m_data_callback; + bool m_noseeds; +}; + +class obfuscated_get_peers : public get_peers +{ +public: + typedef find_data::nodes_callback done_callback; + + obfuscated_get_peers(node_impl& node, node_id target + , data_callback const& dcallback + , nodes_callback const& ncallback + , bool noseeds); + + virtual char const* name() const; + +protected: + + observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); + virtual void done(); +private: + // when set to false, we no longer obfuscate + // the target hash, and send regular get_peers + bool m_obfuscated; +}; + +class get_peers_observer : public find_data_observer +{ +public: + get_peers_observer( + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id) + : find_data_observer(algorithm, ep, id) + {} + +protected: + virtual void extract_data(lazy_entry const* r); +}; + +} } // namespace libtorrent::dht + +#endif // LIBTORRENT_GET_PEERS_HPP diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index 0079d39e9e..a2a99c0da4 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -35,7 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#include +#include namespace libtorrent { namespace dht { @@ -43,10 +43,10 @@ namespace libtorrent { namespace dht class routing_table; class rpc_manager; -class refresh : public find_data +class refresh : public get_peers { public: - typedef find_data::nodes_callback done_callback; + typedef get_peers::nodes_callback done_callback; refresh(node_impl& node, node_id target , done_callback const& callback); diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 2125709632..5a46d66204 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -112,7 +112,7 @@ struct traversal_algorithm : boost::noncopyable int m_ref_count; node_impl& m_node; - node_id m_target; + node_id const m_target; std::vector m_results; int m_invoke_count; int m_branch_factor; diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 7860386f9d..4c2038726d 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -80,50 +80,10 @@ void find_data_observer::reply(msg const& m) node_id(id->string_ptr()), token->string_value()); } - // look for peers - lazy_entry const* n = r->dict_find_list("values"); - if (n) - { - std::vector peer_list; - if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t) - { - // assume it's mainline format - char const* peers = n->list_at(0)->string_ptr(); - char const* end = peers + n->list_at(0)->string_length(); - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) - << "[" << m_algorithm.get() << "] PEERS" - << " invoke-count: " << m_algorithm->invoke_count() - << " branch-factor: " << m_algorithm->branch_factor() - << " addr: " << m.addr - << " id: " << node_id(id->string_ptr()) - << " distance: " << distance_exp(m_algorithm->target(), node_id(id->string_ptr())) - << " p: " << ((end - peers) / 6); -#endif - while (end - peers >= 6) - peer_list.push_back(read_v4_endpoint(peers)); - } - else - { - // assume it's uTorrent/libtorrent format - read_endpoint_list(n, peer_list); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) - << "[" << m_algorithm.get() << "] PEERS" - << " invoke-count: " << m_algorithm->invoke_count() - << " branch-factor: " << m_algorithm->branch_factor() - << " addr: " << m.addr - << " id: " << node_id(id->string_ptr()) - << " distance: " << distance_exp(m_algorithm->target(), node_id(id->string_ptr())) - << " p: " << n->list_size(); -#endif - } - static_cast(m_algorithm.get())->got_peers(peer_list); - } + extract_data(r); // look for nodes - n = r->dict_find_string("nodes"); + lazy_entry const* n = r->dict_find_string("nodes"); if (n) { std::vector node_list; @@ -172,16 +132,10 @@ void add_entry_fun(void* userdata, node_entry const& e) find_data::find_data( node_impl& node , node_id target - , data_callback const& dcallback - , nodes_callback const& ncallback - , bool noseeds) + , nodes_callback const& ncallback) : traversal_algorithm(node, target) - , m_data_callback(dcallback) , m_nodes_callback(ncallback) - , m_target(target) , m_done(false) - , m_got_peers(false) - , m_noseeds(noseeds) { } @@ -205,32 +159,7 @@ observer_ptr find_data::new_observer(void* ptr return o; } -char const* find_data::name() const { return "get_peers"; } - -bool find_data::invoke(observer_ptr o) -{ - if (m_done) - { - m_invoke_count = -1; - return false; - } - - entry e; - e["y"] = "q"; - entry& a = e["a"]; - - e["q"] = "get_peers"; - a["info_hash"] = m_target.to_string(); - if (m_noseeds) a["noseed"] = 1; - - return m_node.m_rpc.invoke(e, o->target_ep(), o); -} - -void find_data::got_peers(std::vector const& peers) -{ - if (!peers.empty()) m_got_peers = true; - if (m_data_callback) m_data_callback(peers); -} +char const* find_data::name() const { return "find_data"; } void find_data::done() { @@ -239,7 +168,7 @@ void find_data::done() m_done = true; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "[" << this << "] get_peers DONE"; + TORRENT_LOG(traversal) << "[" << this << "] find_data DONE"; #endif std::vector > results; @@ -255,126 +184,10 @@ void find_data::done() results.push_back(std::make_pair(node_entry(o->id(), o->target_ep()), j->second)); --num_results; } - if (m_nodes_callback) m_nodes_callback(results, m_got_peers); + if (m_nodes_callback) m_nodes_callback(results); traversal_algorithm::done(); } -obfuscated_get_peers::obfuscated_get_peers( - node_impl& node - , node_id info_hash - , data_callback const& dcallback - , nodes_callback const& ncallback - , bool noseeds) - : find_data(node, info_hash, dcallback, ncallback, noseeds) - , m_obfuscated(true) -{ -} - -char const* obfuscated_get_peers::name() const { return !m_obfuscated ? find_data::name() : "get_peers [obfuscated]"; } - -observer_ptr obfuscated_get_peers::new_observer(void* ptr - , udp::endpoint const& ep, node_id const& id) -{ - observer_ptr o(new (ptr) find_data_observer(this, ep, id)); -#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS - o->m_in_constructor = false; -#endif - return o; -} - -bool obfuscated_get_peers::invoke(observer_ptr o) -{ - if (!m_obfuscated) return find_data::invoke(o); - - node_id id = o->id(); - int shared_prefix = 160 - distance_exp(id, m_target); - - // when we get close to the target zone in the DHT - // start using the correct info-hash, in order to - // start receiving peers - if (shared_prefix > m_node.m_table.depth() - 10) - { - m_obfuscated = false; - // clear the queried bits on all successful nodes in - // our node-list for this traversal algorithm, to - // allow the get_peers traversal to regress in case - // nodes further down end up being dead - for (std::vector::iterator i = m_results.begin() - , end(m_results.end()); i != end; ++i) - { - observer* o = i->get(); - // don't re-request from nodes that didn't respond - if (o->flags & observer::flag_failed) continue; - // don't interrupt with queries that are already in-flight - if ((o->flags & observer::flag_alive) == 0) continue; - o->flags &= ~(observer::flag_queried | observer::flag_alive); - } - return find_data::invoke(o); - } - - entry e; - e["y"] = "q"; - e["q"] = "find_node"; - entry& a = e["a"]; - - // This logic will obfuscate the target info-hash - // we're looking up, in order to preserve more privacy - // on the DHT. This is done by only including enough - // bits in the info-hash for the node we're querying to - // give a good answer, but not more. - - // now, obfuscate the bits past shared_prefix + 5 - node_id obfuscated_target = generate_random_id(); - obfuscated_target >>= shared_prefix + 3; - obfuscated_target^= m_target; - a["target"] = obfuscated_target.to_string(); - - return m_node.m_rpc.invoke(e, o->target_ep(), o); -} - -void obfuscated_get_peers::done() -{ - if (!m_obfuscated) return find_data::done(); - - // oops, we failed to switch over to the non-obfuscated - // mode early enough. do it now - - boost::intrusive_ptr ta(new find_data(m_node, m_target - , m_data_callback - , m_nodes_callback - , m_noseeds)); - - // don't call these when the obfuscated_get_peers - // is done, we're passing them on to be called when - // ta completes. - m_data_callback.clear(); - m_nodes_callback.clear(); - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << " [" << this << "]" - << " obfuscated get_peers phase 1 done, spawning get_peers [" << ta.get() << "]"; -#endif - - int num_added = 0; - for (std::vector::iterator i = m_results.begin() - , end(m_results.end()); i != end && num_added < 16; ++i) - { - observer_ptr o = *i; - - // only add nodes whose node ID we know and that - // we know are alive - if (o->flags & observer::flag_no_id) continue; - if ((o->flags & observer::flag_alive) == 0) continue; - - ta->add_entry(o->id(), o->target_ep(), observer::flag_initial); - ++num_added; - } - - ta->start(); - - find_data::done(); -} - } } // namespace libtorrent::dht diff --git a/src/kademlia/get_peers.cpp b/src/kademlia/get_peers.cpp new file mode 100644 index 0000000000..68865978fd --- /dev/null +++ b/src/kademlia/get_peers.cpp @@ -0,0 +1,255 @@ +/* + +Copyright (c) 2006-2013, Arvid Norberg & Daniel Wallin +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +using detail::read_endpoint_list; +using detail::read_v4_endpoint; +#if TORRENT_USE_IPV6 +using detail::read_v6_endpoint; +#endif + +void get_peers_observer::extract_data(lazy_entry const* r) +{ + // look for peers + lazy_entry const* n = r->dict_find_list("values"); + if (n) + { + std::vector peer_list; + if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t) + { + // assume it's mainline format + char const* peers = n->list_at(0)->string_ptr(); + char const* end = peers + n->list_at(0)->string_length(); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) + << "[" << m_algorithm.get() << "] PEERS" + << " invoke-count: " << m_algorithm->invoke_count() + << " branch-factor: " << m_algorithm->branch_factor() + << " addr: " << m.addr + << " id: " << node_id(id->string_ptr()) + << " distance: " << distance_exp(m_algorithm->target(), node_id(id->string_ptr())) + << " p: " << ((end - peers) / 6); +#endif + while (end - peers >= 6) + peer_list.push_back(read_v4_endpoint(peers)); + } + else + { + // assume it's uTorrent/libtorrent format + read_endpoint_list(n, peer_list); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) + << "[" << m_algorithm.get() << "] PEERS" + << " invoke-count: " << m_algorithm->invoke_count() + << " branch-factor: " << m_algorithm->branch_factor() + << " addr: " << m.addr + << " id: " << node_id(id->string_ptr()) + << " distance: " << distance_exp(m_algorithm->target(), node_id(id->string_ptr())) + << " p: " << n->list_size(); +#endif + } + static_cast(m_algorithm.get())->got_peers(peer_list); + } +} + +void get_peers::got_peers(std::vector const& peers) +{ + if (m_data_callback) m_data_callback(peers); +} + +get_peers::get_peers( + node_impl& node + , node_id target + , data_callback const& dcallback + , nodes_callback const& ncallback + , bool noseeds) + : find_data(node, target, ncallback) + , m_data_callback(dcallback) + , m_noseeds(noseeds) +{ +} + +char const* get_peers::name() const { return "get_peers"; } + +bool get_peers::invoke(observer_ptr o) +{ + if (m_done) + { + m_invoke_count = -1; + return false; + } + + entry e; + e["y"] = "q"; + entry& a = e["a"]; + + e["q"] = "get_peers"; + a["info_hash"] = m_target.to_string(); + if (m_noseeds) a["noseed"] = 1; + + return m_node.m_rpc.invoke(e, o->target_ep(), o); +} + +observer_ptr get_peers::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) get_peers_observer(this, ep, id)); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + return o; +} + +obfuscated_get_peers::obfuscated_get_peers( + node_impl& node + , node_id info_hash + , data_callback const& dcallback + , nodes_callback const& ncallback + , bool noseeds) + : get_peers(node, info_hash, dcallback, ncallback, noseeds) + , m_obfuscated(true) +{ +} + +char const* obfuscated_get_peers::name() const { return !m_obfuscated ? get_peers::name() : "get_peers [obfuscated]"; } + +observer_ptr obfuscated_get_peers::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) find_data_observer(this, ep, id)); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + return o; +} + +bool obfuscated_get_peers::invoke(observer_ptr o) +{ + if (!m_obfuscated) return get_peers::invoke(o); + + node_id id = o->id(); + int shared_prefix = 160 - distance_exp(id, m_target); + + // when we get close to the target zone in the DHT + // start using the correct info-hash, in order to + // start receiving peers + if (shared_prefix > m_node.m_table.depth() - 10) + { + m_obfuscated = false; + // clear the queried bits on all successful nodes in + // our node-list for this traversal algorithm, to + // allow the get_peers traversal to regress in case + // nodes further down end up being dead + for (std::vector::iterator i = m_results.begin() + , end(m_results.end()); i != end; ++i) + { + observer* o = i->get(); + // don't re-request from nodes that didn't respond + if (o->flags & observer::flag_failed) continue; + // don't interrupt with queries that are already in-flight + if ((o->flags & observer::flag_alive) == 0) continue; + o->flags &= ~(observer::flag_queried | observer::flag_alive); + } + return get_peers::invoke(o); + } + + entry e; + e["y"] = "q"; + e["q"] = "find_node"; + entry& a = e["a"]; + + // This logic will obfuscate the target info-hash + // we're looking up, in order to preserve more privacy + // on the DHT. This is done by only including enough + // bits in the info-hash for the node we're querying to + // give a good answer, but not more. + + // now, obfuscate the bits past shared_prefix + 5 + node_id obfuscated_target = generate_random_id(); + obfuscated_target >>= shared_prefix + 3; + obfuscated_target^= m_target; + a["target"] = obfuscated_target.to_string(); + + return m_node.m_rpc.invoke(e, o->target_ep(), o); +} + +void obfuscated_get_peers::done() +{ + if (!m_obfuscated) return get_peers::done(); + + // oops, we failed to switch over to the non-obfuscated + // mode early enough. do it now + + boost::intrusive_ptr ta(new get_peers(m_node, m_target + , m_data_callback + , m_nodes_callback + , m_noseeds)); + + // don't call these when the obfuscated_get_peers + // is done, we're passing them on to be called when + // ta completes. + m_data_callback.clear(); + m_nodes_callback.clear(); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << " [" << this << "]" + << " obfuscated get_peers phase 1 done, spawning get_peers [" << ta.get() << "]"; +#endif + + int num_added = 0; + for (std::vector::iterator i = m_results.begin() + , end(m_results.end()); i != end && num_added < 16; ++i) + { + observer_ptr o = *i; + + // only add nodes whose node ID we know and that + // we know are alive + if (o->flags & observer::flag_no_id) continue; + if ((o->flags & observer::flag_alive) == 0) continue; + + ta->add_entry(o->id(), o->target_ep(), observer::flag_initial); + ++num_added; + } + + ta->start(); + + get_peers::done(); +} + +} } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 0d06d9b1d4..988e1cec98 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -51,7 +51,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include "libtorrent/kademlia/refresh.hpp" -#include "libtorrent/kademlia/find_data.hpp" +#include "libtorrent/kademlia/get_peers.hpp" #include "ed25519.h" @@ -360,7 +360,7 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port, bool seed // search for nodes with ids close to id or with peers // for info-hash id. then send announce_peer to them. - boost::intrusive_ptr ta; + boost::intrusive_ptr ta; if (m_settings.privacy_lookups) { ta.reset(new obfuscated_get_peers(*this, info_hash, f @@ -369,7 +369,7 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port, bool seed } else { - ta.reset(new find_data(*this, info_hash, f + ta.reset(new get_peers(*this, info_hash, f , boost::bind(&announce_fun, _1, boost::ref(*this) , listen_port, info_hash, seed), seed)); } diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 294c6b6f4c..26c56cc95b 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -49,7 +49,7 @@ refresh::refresh( node_impl& node , node_id target , done_callback const& callback) - : find_data(node, target, find_data::data_callback(), callback, false) + : get_peers(node, target, get_peers::data_callback(), callback, false) { } @@ -61,7 +61,7 @@ char const* refresh::name() const observer_ptr refresh::new_observer(void* ptr , udp::endpoint const& ep, node_id const& id) { - observer_ptr o(new (ptr) find_data_observer(this, ep, id)); + observer_ptr o(new (ptr) get_peers_observer(this, ep, id)); #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS o->m_in_constructor = false; #endif From 06d9a0c3ace51b4a109bc6b4e6f9cf56ddf8f61f Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Fri, 13 Dec 2013 10:00:50 -0800 Subject: [PATCH 2/8] fix parameter name --- test/test.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.hpp b/test/test.hpp index e4423e1f6b..9b874a8e7f 100644 --- a/test/test.hpp +++ b/test/test.hpp @@ -55,7 +55,7 @@ extern "C" int EXPORT ed25519_create_seed(unsigned char *seed); void EXPORT ed25519_create_keypair(unsigned char *public_key, unsigned char *private_key, const unsigned char *seed); void EXPORT ed25519_sign(unsigned char *signature, const unsigned char *message, size_t message_len, const unsigned char *public_key, const unsigned char *private_key); - int EXPORT ed25519_verify(const unsigned char *signature, const unsigned char *message, size_t message_len, const unsigned char *private_key); + int EXPORT ed25519_verify(const unsigned char *signature, const unsigned char *message, size_t message_len, const unsigned char *public_key); void EXPORT ed25519_add_scalar(unsigned char *public_key, unsigned char *private_key, const unsigned char *scalar); void EXPORT ed25519_key_exchange(unsigned char *shared_secret, const unsigned char *public_key, const unsigned char *private_key); } From 72e425e1470e09bbdf2811f7b9ce7d4c696bcc91 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Thu, 5 Dec 2013 19:56:46 -0800 Subject: [PATCH 3/8] test traversal algorithms --- src/kademlia/node.cpp | 5 +- test/test_dht.cpp | 311 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 284 insertions(+), 32 deletions(-) diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 988e1cec98..4440f39bda 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -502,9 +502,9 @@ void node_impl::lookup_peers(sha1_hash const& info_hash, int prefix, entry& repl return; } -namespace +namespace detail { - void write_nodes_entry(entry& r, nodes_t const& nodes) + void TORRENT_EXTRA_EXPORT write_nodes_entry(entry& r, nodes_t const& nodes) { bool ipv6_nodes = false; entry& n = r["nodes"]; @@ -540,6 +540,7 @@ namespace } } } +using detail::write_nodes_entry; // verifies that a message has all the required // entries and returns them in ret diff --git a/test/test_dht.cpp b/test/test_dht.cpp index 759f7b7245..1d15d09a12 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -57,6 +57,8 @@ POSSIBILITY OF SUCH DAMAGE. using namespace libtorrent; using namespace libtorrent::dht; +void nop() {} + sha1_hash to_hash(char const* s) { sha1_hash ret; @@ -84,13 +86,13 @@ void node_push_back(void* userdata, libtorrent::dht::node_entry const& n) void nop(void* userdata, libtorrent::dht::node_entry const& n) {} -std::list > g_responses; +std::list > g_sent_packets; struct mock_socket : udp_socket_interface { bool send_packet(entry& msg, udp::endpoint const& ep, int flags) { - g_responses.push_back(std::make_pair(ep, msg)); + g_sent_packets.push_back(std::make_pair(ep, msg)); return true; } }; @@ -121,7 +123,16 @@ boost::array generate_key() static const std::string no; -void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep +void lazy_from_entry(entry const& e, lazy_entry& l) +{ + error_code ec; + static char inbuf[1500]; + int len = bencode(inbuf, e); + int ret = lazy_bdecode(inbuf, inbuf + len, l, ec); + TEST_CHECK(ret == 0); +} + +void send_dht_request(node_impl& node, char const* msg, udp::endpoint const& ep , lazy_entry* reply, char const* t = "10", char const* info_hash = 0 , char const* name = 0, std::string const token = std::string(), int port = 0 , char const* target = 0, entry const* value = 0 @@ -171,22 +182,82 @@ void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep node.incoming(m); // by now the node should have invoked the send function and put the - // response in g_responses + // response in g_sent_packets std::list >::iterator i - = std::find_if(g_responses.begin(), g_responses.end() + = std::find_if(g_sent_packets.begin(), g_sent_packets.end() , boost::bind(&std::pair::first, _1) == ep); - if (i == g_responses.end()) + if (i == g_sent_packets.end()) { TEST_ERROR("not response from DHT node"); return; } - static char inbuf[1500]; - int len = bencode(inbuf, i->second); - g_responses.erase(i); - int ret = lazy_bdecode(inbuf, inbuf + len, *reply, ec); - TEST_CHECK(ret == 0); + lazy_from_entry(i->second, *reply); + g_sent_packets.erase(i); +} + +namespace libtorrent { namespace dht { namespace detail +{ + // defined in node.cpp + void write_nodes_entry(entry& r, nodes_t const& nodes); +} } } + +void write_peers(entry::dictionary_type& r, std::set const& peers) +{ + entry::list_type& pe = r["values"].list(); + for (std::set::const_iterator it = peers.begin() + ; it != peers.end(); ++it) + { + std::string endpoint(18, '\0'); + std::string::iterator out = endpoint.begin(); + libtorrent::detail::write_endpoint(*it, out); + endpoint.resize(out - endpoint.begin()); + pe.push_back(entry(endpoint)); + } +} + +void send_dht_response(node_impl& node, lazy_entry const& request, udp::endpoint const& ep + , nodes_t const& nodes = nodes_t() + , std::string const token = std::string(), int port = 0 + , std::set const& peers = std::set() + , char const* target = 0, entry const* value = 0 + , std::string const key = std::string(), std::string const sig = std::string() + , int seq = -1, sha1_hash const* nid = NULL) +{ + entry e; + e["y"] = "r"; + e["t"] = request.dict_find_string_value("t"); +// e["ip"] = endpoint_to_bytes(ep); + entry::dictionary_type& r = e["r"].dict(); + if (nid == NULL) r["id"] = generate_next().to_string(); + else r["id"] = nid->to_string(); + if (!token.empty()) r["token"] = token; + if (port) r["p"] = port; + if (!nodes.empty()) dht::detail::write_nodes_entry(e["r"], nodes); + if (!peers.empty()) write_peers(r, peers); + if (value) r["v"] = *value; + if (!sig.empty()) r["sig"] = sig; + if (!key.empty()) r["k"] = key; + if (seq >= 0) r["seq"] = seq; + char msg_buf[1500]; + int size = bencode(msg_buf, e); +#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM +// this yields a lot of output. too much +// std::cerr << "sending: " << e << "\n"; +#endif + +#ifdef TORRENT_USE_VALGRIND + VALGRIND_CHECK_MEM_IS_DEFINED(msg_buf, size); +#endif + + lazy_entry decoded; + error_code ec; + lazy_bdecode(msg_buf, msg_buf + size, decoded, ec); + if (ec) fprintf(stderr, "lazy_bdecode failed: %s\n", ec.message().c_str()); + + dht::msg m(decoded, ep); + node.incoming(m); } struct announce_item @@ -220,7 +291,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps { if ((i % items[j].num_peers) == 0) continue; lazy_entry response; - send_dht_msg(node, "get", eps[i], &response, "10", 0 + send_dht_request(node, "get", eps[i], &response, "10", 0 , 0, no, 0, (char const*)&items[j].target[0]); key_desc_t desc[] = @@ -258,7 +329,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps TEST_EQUAL(addr, eps[i].address()); } - send_dht_msg(node, "put", eps[i], &response, "10", 0 + send_dht_request(node, "put", eps[i], &response, "10", 0 , 0, token, 0, (char const*)&items[j].target[0], &items[j].ent); key_desc_t desc2[] = @@ -287,7 +358,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps for (int j = 0; j < num_items; ++j) { lazy_entry response; - send_dht_msg(node, "get", eps[j], &response, "10", 0 + send_dht_request(node, "get", eps[j], &response, "10", 0 , 0, no, 0, (char const*)&items[j].target[0]); key_desc_t desc[] = @@ -327,12 +398,20 @@ struct print_alert : alert_dispatcher } }; + int sum_distance_exp(int s, node_entry const& e, node_id const& ref) { return s + distance_exp(e.id, ref); } -// TODO: 3 test find_data, obfuscated_get_peers and bootstrap +std::vector g_got_peers; + +void get_peers_cb(std::vector const& peers) +{ + g_got_peers.insert(g_got_peers.end(), peers.begin(), peers.end()); +} + +// TODO: 3 test obfuscated_get_peers int test_main() { dht_settings sett; @@ -352,7 +431,7 @@ int test_main() // ====== ping ====== udp::endpoint source(address::from_string("10.0.0.1"), 20); - send_dht_msg(node, "ping", source, &response, "10"); + send_dht_request(node, "ping", source, &response, "10"); dht::key_desc_t pong_desc[] = { {"y", lazy_entry::string_t, 1, 0}, @@ -376,7 +455,7 @@ int test_main() // ====== invalid message ====== - send_dht_msg(node, "find_node", source, &response, "10"); + send_dht_request(node, "find_node", source, &response, "10"); dht::key_desc_t err_desc[] = { {"y", lazy_entry::string_t, 1, 0}, @@ -406,7 +485,7 @@ int test_main() // ====== get_peers ====== - send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101"); + send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101"); dht::key_desc_t peer1_desc[] = { {"y", lazy_entry::string_t, 1, 0}, @@ -433,7 +512,7 @@ int test_main() // ====== announce ====== - send_dht_msg(node, "announce_peer", source, &response, "10", "01010101010101010101", "test", token, 8080); + send_dht_request(node, "announce_peer", source, &response, "10", "01010101010101010101", "test", token, 8080); dht::key_desc_t ann_desc[] = { {"y", lazy_entry::string_t, 1, 0}, @@ -458,7 +537,7 @@ int test_main() for (int i = 0; i < 100; ++i) { source = udp::endpoint(rand_v4(), 6000); - send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101"); + send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101"); ret = dht::verify_message(&response, peer1_desc, parsed, 4, error_string, sizeof(error_string)); if (ret) @@ -473,14 +552,14 @@ int test_main() fprintf(stderr, " invalid get_peers response: %s\n", error_string); } response.clear(); - send_dht_msg(node, "announce_peer", source, &response, "10", "01010101010101010101" + send_dht_request(node, "announce_peer", source, &response, "10", "01010101010101010101" , "test", token, 8080, 0, 0, false, i >= 50); response.clear(); } // ====== get_peers ====== - send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101" + send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101" , 0, no, 0, 0, 0, true); dht::key_desc_t peer2_desc[] = { @@ -524,7 +603,7 @@ int test_main() // http://libtorrent.org/dht_sec.html source = udp::endpoint(address::from_string("124.31.75.21"), 20); node_id nid = to_hash("1712f6c70c5d6a4ec8a88e4c6ab4c28b95eee401"); - send_dht_msg(node, "find_node", source, &response, "10", 0, 0, std::string() + send_dht_request(node, "find_node", source, &response, "10", 0, 0, std::string() , 0, "0101010101010101010101010101010101010101", 0, false, false, std::string(), std::string(), -1, 0, &nid); dht::key_desc_t nodes_desc[] = { @@ -549,7 +628,7 @@ int test_main() // verify that we reject invalid node IDs // this is now an invalid node-id for 'source' nid[0] = 0x18; - send_dht_msg(node, "find_node", source, &response, "10", 0, 0, std::string() + send_dht_request(node, "find_node", source, &response, "10", 0, 0, std::string() , 0, "0101010101010101010101010101010101010101", 0, false, false, std::string(), std::string(), -1, 0, &nid); ret = dht::verify_message(&response, err_desc, parsed, 2, error_string, sizeof(error_string)); @@ -658,7 +737,7 @@ int test_main() TEST_CHECK(ret); - send_dht_msg(node, "get", source, &response, "10", 0 + send_dht_request(node, "get", source, &response, "10", 0 , 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0] , 0, false, false, std::string(), std::string(), 64); @@ -698,7 +777,7 @@ int test_main() VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64); #endif - send_dht_msg(node, "put", source, &response, "10", 0 + send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[0].ent, false, false , std::string((char*)public_key, 32) , std::string((char*)signature, 64), seq); @@ -722,7 +801,7 @@ int test_main() TEST_ERROR(error_string); } - send_dht_msg(node, "get", source, &response, "10", 0 + send_dht_request(node, "get", source, &response, "10", 0 , 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0] , 0, false, false, std::string(), std::string(), 64); @@ -772,7 +851,7 @@ int test_main() TEST_CHECK(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key) != 1); - send_dht_msg(node, "put", source, &response, "10", 0 + send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[0].ent, false, false , std::string((char*)public_key, 32) , std::string((char*)signature, 64), seq); @@ -814,7 +893,7 @@ int test_main() VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64); #endif - send_dht_msg(node, "put", source, &response, "10", 0 + send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[1].ent, false, false , std::string((char*)public_key, 32) , std::string((char*)signature, 64), seq @@ -837,7 +916,7 @@ int test_main() // put the same message again. This should fail because the // CAS hash is outdated, it's not the hash of the value that's // stored anymore - send_dht_msg(node, "put", source, &response, "10", 0 + send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[1].ent, false, false , std::string((char*)public_key, 32) , std::string((char*)signature, 64), seq @@ -1231,6 +1310,178 @@ int test_main() , rs[i], to_hex(id.to_string()).c_str()); } } + + // test traversal algorithms + + dht::key_desc_t ping_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 4, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, key_desc_t::last_child}, + }; + + dht::key_desc_t find_node_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 9, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, 0}, + {"target", lazy_entry::string_t, 20, key_desc_t::last_child}, + }; + + dht::key_desc_t get_peers_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 9, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, 0}, + {"info_hash", lazy_entry::string_t, 20, key_desc_t::last_child}, + }; + + // bootstrap + + do + { + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + + udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); + std::vector nodesv; + nodesv.push_back(initial_node); + node.bootstrap(nodesv, boost::bind(&nop)); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, initial_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "find_node"); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break; + } + else + { + fprintf(stderr, " invalid find_node request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + udp::endpoint found_node(address_v4::from_string("5.5.5.5"), 2235); + nodes_t nodes; + nodes.push_back(found_node); + g_sent_packets.clear(); + send_dht_response(node, response, initial_node, nodes); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, found_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "find_node"); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break; + } + else + { + fprintf(stderr, " invalid find_node request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + g_sent_packets.clear(); + send_dht_response(node, response, found_node); + + TEST_CHECK(g_sent_packets.empty()); + TEST_EQUAL(node.num_global_nodes(), 3); + } while (false); + + // get_peers + + do + { + dht::node_id target = to_hash("1234876923549721020394873245098347598635"); + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + + udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); + node.m_table.add_node(initial_node); + + node.announce(target, 1234, false, get_peers_cb); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, initial_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, get_peers_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "get_peers"); + TEST_EQUAL(parsed[5]->string_value(), target.to_string()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get_peers") break; + } + else + { + fprintf(stderr, " invalid get_peers request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + std::set peers[2]; + peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.1"), 4111)); + peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.2"), 4112)); + peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.3"), 4113)); + + udp::endpoint next_node(address_v4::from_string("5.5.5.5"), 2235); + nodes_t nodes; + nodes.push_back(next_node); + + g_sent_packets.clear(); + send_dht_response(node, response, initial_node, nodes, "10", 1234, peers[0]); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, next_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, get_peers_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "get_peers"); + TEST_EQUAL(parsed[5]->string_value(), target.to_string()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get_peers") break; + } + else + { + fprintf(stderr, " invalid get_peers request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.4"), 4114)); + peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.5"), 4115)); + peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.6"), 4116)); + + g_sent_packets.clear(); + send_dht_response(node, response, next_node, nodes_t(), "11", 1234, peers[1]); + + TEST_CHECK(g_sent_packets.empty()); + + for (int i = 0; i < 2; ++i) + for (auto peer = peers[i].begin(); peer != peers[i].end(); ++peer) + { + TEST_CHECK(std::find(g_got_peers.begin(), g_got_peers.end(), *peer) != g_got_peers.end()); + } + g_got_peers.clear(); + } while (false); + return 0; } From a7f178ab163c00cff1a0bad71f2f132fb1aae0be Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Tue, 3 Dec 2013 20:47:56 -0800 Subject: [PATCH 4/8] DHT get traversal algorithm --- Jamfile | 2 + include/libtorrent/kademlia/get_item.hpp | 84 ++++++++ include/libtorrent/kademlia/item.hpp | 115 +++++++++++ include/libtorrent/kademlia/node.hpp | 7 +- .../kademlia/traversal_algorithm.hpp | 1 + src/kademlia/get_item.cpp | 145 ++++++++++++++ src/kademlia/item.cpp | 172 +++++++++++++++++ src/kademlia/node.cpp | 48 +++-- src/kademlia/traversal_algorithm.cpp | 13 ++ test/test_dht.cpp | 179 ++++++++++++++---- 10 files changed, 703 insertions(+), 63 deletions(-) mode change 100755 => 100644 Jamfile create mode 100644 include/libtorrent/kademlia/get_item.hpp create mode 100644 include/libtorrent/kademlia/item.hpp create mode 100644 src/kademlia/get_item.cpp create mode 100644 src/kademlia/item.cpp diff --git a/Jamfile b/Jamfile old mode 100755 new mode 100644 index e414c0f9f4..9041d5c2e4 --- a/Jamfile +++ b/Jamfile @@ -577,6 +577,8 @@ KADEMLIA_SOURCES = traversal_algorithm logging get_peers + item + get_item ; ED25519_SOURCES = diff --git a/include/libtorrent/kademlia/get_item.hpp b/include/libtorrent/kademlia/get_item.hpp new file mode 100644 index 0000000000..3708d39d4d --- /dev/null +++ b/include/libtorrent/kademlia/get_item.hpp @@ -0,0 +1,84 @@ +/* + +Copyright (c) 2013, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef LIBTORRENT_GET_ITEM_HPP +#define LIBTORRENT_GET_ITEM_HPP + +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +class get_item : public find_data +{ +public: + typedef boost::function data_callback; + + void got_data(lazy_entry const* v, + char const* pk, + boost::int64_t seq, + char const* sig); + + get_item(node_impl& node, node_id target + , data_callback const& dcallback + , nodes_callback const& ncallback); + + virtual char const* name() const; + +protected: + virtual observer_ptr new_observer(void* ptr, udp::endpoint const& ep, node_id const& id); + virtual bool invoke(observer_ptr o); + virtual void done(); + + data_callback m_data_callback; + item m_data; + bool m_got_data; +}; + +class get_item_observer : public find_data_observer +{ +public: + get_item_observer( + boost::intrusive_ptr const& algorithm + , udp::endpoint const& ep, node_id const& id) + : find_data_observer(algorithm, ep, id) + {} + +protected: + virtual void extract_data(lazy_entry const* r); +}; + +} } // namespace libtorrent::dht + +#endif // LIBTORRENT_GET_ITEM_HPP diff --git a/include/libtorrent/kademlia/item.hpp b/include/libtorrent/kademlia/item.hpp new file mode 100644 index 0000000000..468cae9b58 --- /dev/null +++ b/include/libtorrent/kademlia/item.hpp @@ -0,0 +1,115 @@ +/* + +Copyright (c) 2013, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#ifndef LIBTORRENT_ITEM_HPP +#define LIBTORRENT_ITEM_HPP + +#include +#include +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +bool TORRENT_EXTRA_EXPORT verify_mutable_item(std::pair v, + boost::uint64_t seq, + char const* pk, + char const* sig); + +void TORRENT_EXTRA_EXPORT sign_mutable_item(std::pair v, + boost::uint64_t seq, + char const* pk, + char const* sk, + char* sig); + +sha1_hash TORRENT_EXTRA_EXPORT mutable_item_cas(std::pair v, boost::uint64_t seq); + +struct TORRENT_EXPORT invalid_item : std::exception +{ + virtual const char* what() const throw() { return "invalid DHT item"; } +}; + +enum +{ + item_pk_len = 32, + item_sk_len = 64, + item_sig_len = 64 +}; + +struct lazy_item; + +class TORRENT_EXPORT item +{ +public: + item() : m_mutable(false) {} + item(entry const& v, boost::uint64_t seq = 0, char const* pk = 0, char const* sk = 0); + item(lazy_entry const* v, boost::uint64_t seq = 0, char const* pk = 0, char const* sig = 0); + item(lazy_item const&); + + void assign(entry const& v, boost::uint64_t seq = 0, char const* pk = 0, char const* sk = 0); + bool assign(lazy_entry const* v, boost::uint64_t seq = 0, char const* pk = 0, char const* sig = 0); + + bool is_mutable() { return m_mutable; } + + sha1_hash cas(); + + std::vector const& value() { return m_value; } + char const* pk() { TORRENT_ASSERT(m_mutable); return m_pk; } + char const* sig() { TORRENT_ASSERT(m_mutable); return m_sig; } + boost::uint64_t seq() { TORRENT_ASSERT(m_mutable); return m_seq; } + +private: + std::vector m_value; + char m_pk[item_pk_len]; + char m_sig[item_sig_len]; + boost::uint64_t m_seq; + bool m_mutable; +}; + +struct lazy_item +{ + lazy_item(lazy_entry const* v) : value(v), pk(NULL), sig(NULL), seq(0) {} + lazy_item(lazy_entry const* v, char const* pk, char const* sig, boost::uint64_t seq); + + bool is_mutable() const { return pk && sig; } + + lazy_entry const* value; + char const* pk; + char const* sig; + boost::uint64_t const seq; +}; + +} } // namespace libtorrent::dht + +#endif // LIBTORRENT_ITEM_HPP diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 079c101fdf..509a34d605 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -43,6 +43,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -132,11 +133,11 @@ struct dht_immutable_item int size; }; -struct ed25519_public_key { char bytes[32]; }; +struct ed25519_public_key { char bytes[item_pk_len]; }; struct dht_mutable_item : dht_immutable_item { - char sig[64]; + char sig[item_sig_len]; boost::uint64_t seq; ed25519_public_key key; }; @@ -232,6 +233,8 @@ typedef std::map dht_mutable_table_t; void announce(sha1_hash const& info_hash, int listen_port, bool seed , boost::function const&)> f); + void get_item(sha1_hash const& target, boost::function f); + bool verify_token(std::string const& token, char const* info_hash , udp::endpoint const& addr); diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 5a46d66204..76f909e323 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -67,6 +67,7 @@ struct traversal_algorithm : boost::noncopyable void failed(observer_ptr o, int flags = 0); virtual ~traversal_algorithm(); void status(dht_lookup& l); + void abort(); void* allocate_observer(); void free_observer(void* ptr); diff --git a/src/kademlia/get_item.cpp b/src/kademlia/get_item.cpp new file mode 100644 index 0000000000..43f42d59d7 --- /dev/null +++ b/src/kademlia/get_item.cpp @@ -0,0 +1,145 @@ +/* + +Copyright (c) 2013, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include + +namespace libtorrent { namespace dht +{ + +void get_item::got_data(lazy_entry const* v, + char const* pk, + boost::int64_t seq, + char const* sig) +{ + if (pk && sig) + { + if (hasher(pk, item_pk_len).final() != m_target) + return; + + if (!m_got_data || m_data.seq() < seq) + { + if (!m_data.assign(v, seq, pk, sig)) + return; + } + } + else + { + std::pair buf = v->data_section(); + if (hasher(buf.first, buf.second).final() != m_target) + return; + + m_data_callback(v); + // There can only be one true immutable item with a given id + // Now that we've got it there's no point in continuing to query other nodes + abort(); + } + m_got_data = true; +} + +get_item::get_item( + node_impl& node + , node_id target + , data_callback const& dcallback + , nodes_callback const& ncallback) + : find_data(node, target, ncallback) + , m_data_callback(dcallback) + , m_got_data(false) +{ +} + +char const* get_item::name() const { return "get"; } + +observer_ptr get_item::new_observer(void* ptr + , udp::endpoint const& ep, node_id const& id) +{ + observer_ptr o(new (ptr) get_item_observer(this, ep, id)); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + return o; +} + +bool get_item::invoke(observer_ptr o) +{ + if (m_done) + { + m_invoke_count = -1; + return false; + } + + entry e; + e["y"] = "q"; + entry& a = e["a"]; + + e["q"] = "get"; + a["target"] = m_target.to_string(); + + return m_node.m_rpc.invoke(e, o->target_ep(), o); +} + +void get_item::done() +{ + if (m_data.is_mutable()) + m_data_callback(m_data); + find_data::done(); +} + +void get_item_observer::extract_data(lazy_entry const* r) +{ + char const* pk = NULL; + char const* sig = NULL; + boost::int64_t seq = 0; + + lazy_entry const* k = r->dict_find_string("k"); + if (k && k->string_length() == item_pk_len) + pk = k->string_ptr(); + + lazy_entry const* s = r->dict_find_string("sig"); + if (s && s->string_length() == item_sig_len) + sig = s->string_ptr(); + + lazy_entry const* q = r->dict_find_int("seq"); + if (q) + seq = q->int_value(); + else if (pk && sig) + return; + + lazy_entry const* v = r->dict_find("v"); + if (v) + { + static_cast(m_algorithm.get())->got_data(v, pk, seq, sig); + } +} + +} } // namespace libtorrent::dht diff --git a/src/kademlia/item.cpp b/src/kademlia/item.cpp new file mode 100644 index 0000000000..4969a4a3f5 --- /dev/null +++ b/src/kademlia/item.cpp @@ -0,0 +1,172 @@ +/* + +Copyright (c) 2013, Steven Siloti +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include + +#include "ed25519.h" + +namespace libtorrent { namespace dht +{ + +namespace +{ + enum { canonical_length = 1100 }; + int canonical_string(std::pair v, boost::uint64_t seq, char out[canonical_length]) + { + int len = snprintf(out, canonical_length, "3:seqi%" PRId64 "e1:v", seq); + memcpy(out + len, v.first, v.second); + len += v.second; + TORRENT_ASSERT(len <= canonical_length); + return len; + } +} + +bool verify_mutable_item(std::pair v, + boost::uint64_t seq, + char const* pk, + char const* sig) +{ +#ifdef TORRENT_USE_VALGRIND + VALGRIND_CHECK_MEM_IS_DEFINED(v.first, v.second); + VALGRIND_CHECK_MEM_IS_DEFINED(pk, item_pk_len); + VALGRIND_CHECK_MEM_IS_DEFINED(sig, item_sig_len); +#endif + + char str[canonical_length]; + int len = canonical_string(v, seq, str); + + return ed25519_verify((unsigned char const*)sig, + (unsigned char const*)str, + len, + (unsigned char const*)pk) == 1; +} + +void sign_mutable_item(std::pair v, + boost::uint64_t seq, + char const* pk, + char const* sk, + char* sig) +{ +#ifdef TORRENT_USE_VALGRIND + VALGRIND_CHECK_MEM_IS_DEFINED(v.first, v.second); + VALGRIND_CHECK_MEM_IS_DEFINED(sk, item_sk_len); + VALGRIND_CHECK_MEM_IS_DEFINED(pk, item_pk_len); +#endif + + char str[canonical_length]; + int len = canonical_string(v, seq, str); + + ed25519_sign((unsigned char*)sig, + (unsigned char const*)str, + len, + (unsigned char const*)pk, + (unsigned char const*)sk + ); +} + +sha1_hash mutable_item_cas(std::pair v, boost::uint64_t seq) +{ + char str[canonical_length]; + int len = canonical_string(v, seq, str); + return hasher(str, len).final(); +} + +item::item(entry const& v, boost::uint64_t seq, char const* pk, char const* sk) +{ + assign(v, seq, pk, sk); +} + +item::item(lazy_entry const* v, boost::uint64_t seq, char const* pk, char const* sig) +{ + if (!assign(v, seq, pk, sig)) + throw invalid_item(); +} + +item::item(lazy_item const& i) + : m_value(i.value->data_section().first, i.value->data_section().first + i.value->data_section().second) + , m_seq(i.seq) + , m_mutable(i.is_mutable()) +{ + // if this is a mutable item lazy_item will have already verified it + memcpy(m_pk, i.pk, item_pk_len); + memcpy(m_sig, i.sig, item_sig_len); +} + +void item::assign(entry const& v, boost::uint64_t seq, char const* pk, char const* sk) +{ + m_value.clear(); + bencode(std::back_inserter(m_value), v); + if (pk && sk) + { + sign_mutable_item(std::make_pair(&m_value[0], m_value.size()), seq, pk, sk, m_sig); + memcpy(m_pk, pk, item_pk_len); + m_seq = seq; + m_mutable = true; + } + else + m_mutable = false; +} + +bool item::assign(lazy_entry const* v, boost::uint64_t seq, char const* pk, char const* sig) +{ + if (pk && sig) + { + if (!verify_mutable_item(v->data_section(), seq, pk, sig)) + return false; + memcpy(m_pk, pk, item_pk_len); + memcpy(m_sig, sig, item_sig_len); + m_seq = seq; + m_mutable = true; + } + else + m_mutable = false; + + m_value.assign(v->data_section().first, v->data_section().first + v->data_section().second); + return true; +} + +sha1_hash item::cas() +{ + TORRENT_ASSERT(m_mutable); + return mutable_item_cas(std::make_pair(&m_value[0], m_value.size()), m_seq); +} + +lazy_item::lazy_item(lazy_entry const* v, char const* pk, char const* sig, boost::uint64_t seq) + : value(v), pk(pk), sig(sig), seq(seq) +{ + if (is_mutable() && !verify_mutable_item(v->data_section(), seq, pk, sig)) + throw invalid_item(); +} + +} } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 4440f39bda..9ed320643a 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -48,12 +48,11 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/rpc_manager.hpp" #include "libtorrent/kademlia/routing_table.hpp" #include "libtorrent/kademlia/node.hpp" -#include +#include "libtorrent/kademlia/dht_observer.hpp" #include "libtorrent/kademlia/refresh.hpp" #include "libtorrent/kademlia/get_peers.hpp" - -#include "ed25519.h" +#include "libtorrent/kademlia/get_item.hpp" #ifdef TORRENT_USE_VALGRIND #include @@ -377,6 +376,17 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port, bool seed ta->start(); } +void node_impl::get_item(sha1_hash const& target, boost::function f) +{ +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "starting get for [ " << target << " ]" ; +#endif + + boost::intrusive_ptr ta; + ta.reset(new dht::get_item(*this, target, f, get_item::nodes_callback())); + ta->start(); +} + void node_impl::tick() { node_id target; @@ -881,8 +891,8 @@ void node_impl::incoming_request(msg const& m, entry& e) {"v", lazy_entry::none_t, 0, 0}, {"seq", lazy_entry::int_t, 0, key_desc_t::optional}, // public key - {"k", lazy_entry::string_t, 32, key_desc_t::optional}, - {"sig", lazy_entry::string_t, 64, key_desc_t::optional}, + {"k", lazy_entry::string_t, item_pk_len, key_desc_t::optional}, + {"sig", lazy_entry::string_t, item_sig_len, key_desc_t::optional}, {"cas", lazy_entry::string_t, 20, key_desc_t::optional}, }; @@ -909,7 +919,7 @@ void node_impl::incoming_request(msg const& m, entry& e) if (!mutable_put) target = hasher(buf.first, buf.second).final(); else - target = hasher(msg_keys[3]->string_ptr(), 32).final(); + target = hasher(msg_keys[3]->string_ptr(), item_pk_len).final(); // fprintf(stderr, "%s PUT target: %s\n" // , mutable_put ? "mutable":"immutable" @@ -960,25 +970,16 @@ void node_impl::incoming_request(msg const& m, entry& e) else { // mutable put, we must verify the signature - // generate the message digest by merging the sequence number and the - - char seq[1100]; - int len = snprintf(seq, sizeof(seq), "3:seqi%" PRId64 "e1:v", msg_keys[2]->int_value()); - std::pair buf = msg_keys[1]->data_section(); - memcpy(seq + len, buf.first, buf.second); - len += buf.second; - TORRENT_ASSERT(len <= 1100); #ifdef TORRENT_USE_VALGRIND - VALGRIND_CHECK_MEM_IS_DEFINED(buf.first, buf.second); - VALGRIND_CHECK_MEM_IS_DEFINED(msg_keys[4]->string_ptr(), 64); - VALGRIND_CHECK_MEM_IS_DEFINED(msg_keys[3]->string_ptr(), 32); - VALGRIND_CHECK_MEM_IS_DEFINED(seq, len); + VALGRIND_CHECK_MEM_IS_DEFINED(msg_keys[4]->string_ptr(), item_sig_len); + VALGRIND_CHECK_MEM_IS_DEFINED(msg_keys[3]->string_ptr(), item_pk_len); #endif // msg_keys[4] is the signature, msg_keys[3] is the public key - if (ed25519_verify((unsigned char const*)msg_keys[4]->string_ptr() - , (unsigned char const*)seq, len - , (unsigned char const*)msg_keys[3]->string_ptr()) != 1) + if (!verify_mutable_item(msg_keys[1]->data_section() + , msg_keys[2]->int_value() + , msg_keys[3]->string_ptr() + , msg_keys[4]->string_ptr())) { incoming_error(e, "invalid signature", 206); return; @@ -1026,10 +1027,7 @@ void node_impl::incoming_request(msg const& m, entry& e) // matches the expected value before replacing it if (msg_keys[5]) { - int len = snprintf(seq, sizeof(seq), "3:seqi%" PRId64 "e1:v", item->seq); - memcpy(seq + len, item->value, item->size); - len += item->size; - sha1_hash h = hasher(seq, len).final(); + sha1_hash h = mutable_item_cas(std::make_pair(item->value, item->size), item->seq); if (h != sha1_hash(msg_keys[5]->string_ptr())) { diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index ee6a86291f..2057cdcfd1 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -464,5 +464,18 @@ void traversal_algorithm::status(dht_lookup& l) l.last_sent = last_sent; } +void traversal_algorithm::abort() +{ + m_num_target_nodes = 0; + for (std::vector::iterator i = m_results.begin() + , end(m_results.end()); i != end; ++i) + { + observer& o = **i; + if (o.flags & observer::flag_queried) + o.flags |= observer::flag_done; + } + done(); +} + } } // namespace libtorrent::dht diff --git a/test/test_dht.cpp b/test/test_dht.cpp index 1d15d09a12..0eb948dbd7 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/node_id.hpp" #include "libtorrent/kademlia/routing_table.hpp" +#include "libtorrent/kademlia/item.hpp" #include #include "test.hpp" @@ -411,6 +412,13 @@ void get_peers_cb(std::vector const& peers) g_got_peers.insert(g_got_peers.end(), peers.begin(), peers.end()); } +std::vector g_got_items; + +void get_item_cb(dht::item const& i) +{ + g_got_items.push_back(i); +} + // TODO: 3 test obfuscated_get_peers int test_main() { @@ -727,18 +735,18 @@ int test_main() fprintf(stderr, "generating ed25519 keys\n"); unsigned char seed[32]; ed25519_create_seed(seed); - unsigned char private_key[64]; - unsigned char public_key[32]; + char private_key[item_sk_len]; + char public_key[item_pk_len]; - ed25519_create_keypair(public_key, private_key, seed); + ed25519_create_keypair((unsigned char*)public_key, (unsigned char*)private_key, seed); fprintf(stderr, "pub: %s priv: %s\n" - , to_hex(std::string((char*)public_key, 32)).c_str() - , to_hex(std::string((char*)private_key, 64)).c_str()); + , to_hex(std::string(public_key, item_pk_len)).c_str() + , to_hex(std::string(private_key, item_sk_len)).c_str()); TEST_CHECK(ret); send_dht_request(node, "get", source, &response, "10", 0 - , 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0] + , 0, no, 0, (char*)&hasher(public_key, item_pk_len).final()[0] , 0, false, false, std::string(), std::string(), 64); key_desc_t desc[] = @@ -765,22 +773,20 @@ int test_main() TEST_ERROR(error_string); } - unsigned char signature[64]; + char signature[item_sig_len]; char buffer[1200]; int seq = 4; - int pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq); - char* ptr = buffer + pos; - pos += bencode(ptr, items[0].ent); - ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key); - TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1); + std::pair itemv(buffer, bencode(buffer, items[0].ent)); + sign_mutable_item(itemv, seq, public_key, private_key, signature); + TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), true); #ifdef TORRENT_USE_VALGRIND - VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64); + VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len); #endif send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[0].ent, false, false - , std::string((char*)public_key, 32) - , std::string((char*)signature, 64), seq); + , std::string(public_key, item_pk_len) + , std::string(signature, item_sig_len), seq); key_desc_t desc2[] = { @@ -802,7 +808,7 @@ int test_main() } send_dht_request(node, "get", source, &response, "10", 0 - , 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0] + , 0, no, 0, (char*)&hasher(public_key, item_pk_len).final()[0] , 0, false, false, std::string(), std::string(), 64); key_desc_t desc3[] = @@ -838,23 +844,21 @@ int test_main() // also test that invalid signatures fail! - pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq); - ptr = buffer + pos; - pos += bencode(ptr, items[0].ent); - ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key); - TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1); + itemv.second = bencode(buffer, items[0].ent); + sign_mutable_item(itemv, seq, public_key, private_key, signature); + TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), 1); #ifdef TORRENT_USE_VALGRIND - VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64); + VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len); #endif // break the signature signature[2] ^= 0xaa; - TEST_CHECK(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key) != 1); + TEST_CHECK(verify_mutable_item(itemv, seq, public_key, signature) != 1); send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[0].ent, false, false - , std::string((char*)public_key, 32) - , std::string((char*)signature, 64), seq); + , std::string(public_key, item_pk_len) + , std::string(signature, item_sig_len), seq); key_desc_t desc_error[] = { @@ -880,23 +884,21 @@ int test_main() // === test CAS put === // this is the hash that we expect to be there - sha1_hash cas = hasher(buffer, pos).final(); + sha1_hash cas = mutable_item_cas(itemv, seq); // increment sequence number ++seq; - pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq); - ptr = buffer + pos; // put item 1 - pos += bencode(ptr, items[1].ent); - ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key); - TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1); + itemv.second = bencode(buffer, items[1].ent); + sign_mutable_item(itemv, seq, public_key, private_key, signature); + TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), 1); #ifdef TORRENT_USE_VALGRIND - VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64); + VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len); #endif send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[1].ent, false, false - , std::string((char*)public_key, 32) - , std::string((char*)signature, 64), seq + , std::string(public_key, item_pk_len) + , std::string(signature, item_sig_len), seq , (char const*)&cas[0]); ret = verify_message(&response, desc2, parsed, 1, error_string, sizeof(error_string)); @@ -918,8 +920,8 @@ int test_main() // stored anymore send_dht_request(node, "put", source, &response, "10", 0 , 0, token, 0, 0, &items[1].ent, false, false - , std::string((char*)public_key, 32) - , std::string((char*)signature, 64), seq + , std::string(public_key, item_pk_len) + , std::string(signature, item_sig_len), seq , (char const*)&cas[0]); ret = verify_message(&response, desc_error, parsed, 2, error_string, sizeof(error_string)); @@ -1339,6 +1341,15 @@ int test_main() {"info_hash", lazy_entry::string_t, 20, key_desc_t::last_child}, }; + dht::key_desc_t get_item_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 3, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, 0}, + {"target", lazy_entry::string_t, 20, key_desc_t::last_child}, + }; + // bootstrap do @@ -1482,6 +1493,102 @@ int test_main() g_got_peers.clear(); } while (false); + // immutable get + + do + { + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + + udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); + node.m_table.add_node(initial_node); + + node.get_item(items[0].target, get_item_cb); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, initial_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "get"); + TEST_EQUAL(parsed[5]->string_value(), items[0].target.to_string()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get") break; + } + else + { + fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + g_sent_packets.clear(); + send_dht_response(node, response, initial_node, nodes_t(), "10", 1234, std::set() + , NULL, &items[0].ent); + + TEST_CHECK(g_sent_packets.empty()); + TEST_EQUAL(g_got_items.size(), 1); + if (g_got_items.empty()) break; + + itemv.second = bencode(buffer, items[0].ent); + TEST_CHECK(memcmp(&g_got_items.front().value()[0], itemv.first, itemv.second) == 0); + g_got_items.clear(); + + } while (false); + + // mutable get + + do + { + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + + udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234); + node.m_table.add_node(initial_node); + + sha1_hash target = hasher(public_key, item_pk_len).final(); + node.get_item(target, get_item_cb); + + TEST_EQUAL(g_sent_packets.size(), 1); + if (g_sent_packets.empty()) break; + TEST_EQUAL(g_sent_packets.front().first, initial_node); + + lazy_from_entry(g_sent_packets.front().second, response); + ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "get"); + TEST_EQUAL(parsed[5]->string_value(), target.to_string()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get") break; + } + else + { + fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + break; + } + + g_sent_packets.clear(); + + itemv.second = bencode(buffer, items[0].ent); + sign_mutable_item(itemv, seq, public_key, private_key, signature); + send_dht_response(node, response, initial_node, nodes_t(), "10", 1234, std::set() + , NULL, &items[0].ent, std::string(public_key, item_pk_len), std::string(signature, item_sig_len), seq); + + TEST_CHECK(g_sent_packets.empty()); + TEST_EQUAL(g_got_items.size(), 1); + if (g_got_items.empty()) break; + + TEST_CHECK(memcmp(&g_got_items.front().value()[0], itemv.first, itemv.second) == 0); + TEST_CHECK(memcmp(g_got_items.front().pk(), public_key, item_pk_len) == 0); + TEST_CHECK(memcmp(g_got_items.front().sig(), signature, item_sig_len) == 0); + TEST_EQUAL(g_got_items.front().seq(), seq); + g_got_items.clear(); + + } while (false); + return 0; } From c778d5a72f49d6e228c1d2a47db2203b837d9042 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sat, 14 Dec 2013 21:09:12 -0800 Subject: [PATCH 5/8] seq should be unsigned --- include/libtorrent/kademlia/get_item.hpp | 2 +- src/kademlia/get_item.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/libtorrent/kademlia/get_item.hpp b/include/libtorrent/kademlia/get_item.hpp index 3708d39d4d..0f72b1c903 100644 --- a/include/libtorrent/kademlia/get_item.hpp +++ b/include/libtorrent/kademlia/get_item.hpp @@ -47,7 +47,7 @@ class get_item : public find_data void got_data(lazy_entry const* v, char const* pk, - boost::int64_t seq, + boost::uint64_t seq, char const* sig); get_item(node_impl& node, node_id target diff --git a/src/kademlia/get_item.cpp b/src/kademlia/get_item.cpp index 43f42d59d7..8f51e88584 100644 --- a/src/kademlia/get_item.cpp +++ b/src/kademlia/get_item.cpp @@ -39,7 +39,7 @@ namespace libtorrent { namespace dht void get_item::got_data(lazy_entry const* v, char const* pk, - boost::int64_t seq, + boost::uint64_t seq, char const* sig) { if (pk && sig) @@ -119,7 +119,7 @@ void get_item_observer::extract_data(lazy_entry const* r) { char const* pk = NULL; char const* sig = NULL; - boost::int64_t seq = 0; + boost::uint64_t seq = 0; lazy_entry const* k = r->dict_find_string("k"); if (k && k->string_length() == item_pk_len) From 9552052dfbc4c0f4c20fe9409ece350ff2a626f8 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sun, 15 Dec 2013 09:00:39 -0800 Subject: [PATCH 6/8] use entry for item value --- include/libtorrent/entry.hpp | 1 + include/libtorrent/kademlia/item.hpp | 9 ++++++--- src/kademlia/item.cpp | 19 ++++++++++++------- test/test_dht.cpp | 5 ++--- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/include/libtorrent/entry.hpp b/include/libtorrent/entry.hpp index f857180821..55c90512a7 100644 --- a/include/libtorrent/entry.hpp +++ b/include/libtorrent/entry.hpp @@ -137,6 +137,7 @@ namespace libtorrent // hidden bool operator==(entry const& e) const; + bool operator!=(entry const& e) const { return !(*this == e); } // copies the structure of the right hand side into this // entry. diff --git a/include/libtorrent/kademlia/item.hpp b/include/libtorrent/kademlia/item.hpp index 468cae9b58..530b67be7a 100644 --- a/include/libtorrent/kademlia/item.hpp +++ b/include/libtorrent/kademlia/item.hpp @@ -80,17 +80,20 @@ class TORRENT_EXPORT item void assign(entry const& v, boost::uint64_t seq = 0, char const* pk = 0, char const* sk = 0); bool assign(lazy_entry const* v, boost::uint64_t seq = 0, char const* pk = 0, char const* sig = 0); - bool is_mutable() { return m_mutable; } + void clear() { m_value = entry(); } + bool empty() const { return m_value.type() == entry::undefined_t; } + + bool is_mutable() const { return m_mutable; } sha1_hash cas(); - std::vector const& value() { return m_value; } + entry const& value() const { return m_value; } char const* pk() { TORRENT_ASSERT(m_mutable); return m_pk; } char const* sig() { TORRENT_ASSERT(m_mutable); return m_sig; } boost::uint64_t seq() { TORRENT_ASSERT(m_mutable); return m_seq; } private: - std::vector m_value; + entry m_value; char m_pk[item_pk_len]; char m_sig[item_sig_len]; boost::uint64_t m_seq; diff --git a/src/kademlia/item.cpp b/src/kademlia/item.cpp index 4969a4a3f5..c3f88bb335 100644 --- a/src/kademlia/item.cpp +++ b/src/kademlia/item.cpp @@ -114,10 +114,10 @@ item::item(lazy_entry const* v, boost::uint64_t seq, char const* pk, char const* } item::item(lazy_item const& i) - : m_value(i.value->data_section().first, i.value->data_section().first + i.value->data_section().second) - , m_seq(i.seq) + : m_seq(i.seq) , m_mutable(i.is_mutable()) { + m_value = *i.value; // if this is a mutable item lazy_item will have already verified it memcpy(m_pk, i.pk, item_pk_len); memcpy(m_sig, i.sig, item_sig_len); @@ -125,11 +125,13 @@ item::item(lazy_item const& i) void item::assign(entry const& v, boost::uint64_t seq, char const* pk, char const* sk) { - m_value.clear(); - bencode(std::back_inserter(m_value), v); + m_value = v; if (pk && sk) { - sign_mutable_item(std::make_pair(&m_value[0], m_value.size()), seq, pk, sk, m_sig); + char buffer[1000]; + int bsize = bencode(buffer, v); + TORRENT_ASSERT(bsize <= 1000); + sign_mutable_item(std::make_pair(buffer, bsize), seq, pk, sk, m_sig); memcpy(m_pk, pk, item_pk_len); m_seq = seq; m_mutable = true; @@ -140,6 +142,7 @@ void item::assign(entry const& v, boost::uint64_t seq, char const* pk, char cons bool item::assign(lazy_entry const* v, boost::uint64_t seq, char const* pk, char const* sig) { + TORRENT_ASSERT(v->data_section().second <= 1000); if (pk && sig) { if (!verify_mutable_item(v->data_section(), seq, pk, sig)) @@ -152,14 +155,16 @@ bool item::assign(lazy_entry const* v, boost::uint64_t seq, char const* pk, char else m_mutable = false; - m_value.assign(v->data_section().first, v->data_section().first + v->data_section().second); + m_value = *v; return true; } sha1_hash item::cas() { TORRENT_ASSERT(m_mutable); - return mutable_item_cas(std::make_pair(&m_value[0], m_value.size()), m_seq); + char buffer[1000]; + int bsize = bencode(buffer, m_value); + return mutable_item_cas(std::make_pair(buffer, bsize), m_seq); } lazy_item::lazy_item(lazy_entry const* v, char const* pk, char const* sig, boost::uint64_t seq) diff --git a/test/test_dht.cpp b/test/test_dht.cpp index 0eb948dbd7..db81c33dac 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -1532,8 +1532,7 @@ int test_main() TEST_EQUAL(g_got_items.size(), 1); if (g_got_items.empty()) break; - itemv.second = bencode(buffer, items[0].ent); - TEST_CHECK(memcmp(&g_got_items.front().value()[0], itemv.first, itemv.second) == 0); + TEST_EQUAL(g_got_items.front().value(), items[0].ent); g_got_items.clear(); } while (false); @@ -1581,7 +1580,7 @@ int test_main() TEST_EQUAL(g_got_items.size(), 1); if (g_got_items.empty()) break; - TEST_CHECK(memcmp(&g_got_items.front().value()[0], itemv.first, itemv.second) == 0); + TEST_EQUAL(g_got_items.front().value(), items[0].ent); TEST_CHECK(memcmp(g_got_items.front().pk(), public_key, item_pk_len) == 0); TEST_CHECK(memcmp(g_got_items.front().sig(), signature, item_sig_len) == 0); TEST_EQUAL(g_got_items.front().seq(), seq); From abc0877178a88303a07428347b772568b75d8279 Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sun, 15 Dec 2013 09:51:09 -0800 Subject: [PATCH 7/8] remove m_got_data --- include/libtorrent/kademlia/get_item.hpp | 1 - src/kademlia/get_item.cpp | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/include/libtorrent/kademlia/get_item.hpp b/include/libtorrent/kademlia/get_item.hpp index 0f72b1c903..b4eaa44baa 100644 --- a/include/libtorrent/kademlia/get_item.hpp +++ b/include/libtorrent/kademlia/get_item.hpp @@ -63,7 +63,6 @@ class get_item : public find_data data_callback m_data_callback; item m_data; - bool m_got_data; }; class get_item_observer : public find_data_observer diff --git a/src/kademlia/get_item.cpp b/src/kademlia/get_item.cpp index 8f51e88584..1c00a95afb 100644 --- a/src/kademlia/get_item.cpp +++ b/src/kademlia/get_item.cpp @@ -47,7 +47,7 @@ void get_item::got_data(lazy_entry const* v, if (hasher(pk, item_pk_len).final() != m_target) return; - if (!m_got_data || m_data.seq() < seq) + if (m_data.empty() || m_data.seq() < seq) { if (!m_data.assign(v, seq, pk, sig)) return; @@ -64,7 +64,6 @@ void get_item::got_data(lazy_entry const* v, // Now that we've got it there's no point in continuing to query other nodes abort(); } - m_got_data = true; } get_item::get_item( @@ -74,7 +73,6 @@ get_item::get_item( , nodes_callback const& ncallback) : find_data(node, target, ncallback) , m_data_callback(dcallback) - , m_got_data(false) { } From 4ac02d6d960953c86bfe52f3c4b0c2034c95925e Mon Sep 17 00:00:00 2001 From: Steven Siloti Date: Sat, 14 Dec 2013 21:02:32 -0800 Subject: [PATCH 8/8] DHT put item --- include/libtorrent/kademlia/get_item.hpp | 8 +- include/libtorrent/kademlia/node.hpp | 2 +- src/kademlia/get_item.cpp | 97 +++++++++-- src/kademlia/node.cpp | 4 +- test/test_dht.cpp | 195 ++++++++++++++++++++++- 5 files changed, 284 insertions(+), 22 deletions(-) diff --git a/include/libtorrent/kademlia/get_item.hpp b/include/libtorrent/kademlia/get_item.hpp index b4eaa44baa..819a97b5f7 100644 --- a/include/libtorrent/kademlia/get_item.hpp +++ b/include/libtorrent/kademlia/get_item.hpp @@ -43,16 +43,14 @@ namespace libtorrent { namespace dht class get_item : public find_data { public: - typedef boost::function data_callback; + typedef boost::function data_callback; void got_data(lazy_entry const* v, char const* pk, boost::uint64_t seq, char const* sig); - get_item(node_impl& node, node_id target - , data_callback const& dcallback - , nodes_callback const& ncallback); + get_item(node_impl& node, node_id target, data_callback const& dcallback); virtual char const* name() const; @@ -61,6 +59,8 @@ class get_item : public find_data virtual bool invoke(observer_ptr o); virtual void done(); + void put(std::vector > const& v); + data_callback m_data_callback; item m_data; }; diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 509a34d605..5b5bafb178 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -233,7 +233,7 @@ typedef std::map dht_mutable_table_t; void announce(sha1_hash const& info_hash, int listen_port, bool seed , boost::function const&)> f); - void get_item(sha1_hash const& target, boost::function f); + void get_item(sha1_hash const& target, boost::function f); bool verify_token(std::string const& token, char const* info_hash , udp::endpoint const& addr); diff --git a/src/kademlia/get_item.cpp b/src/kademlia/get_item.cpp index 1c00a95afb..59fcc71ab5 100644 --- a/src/kademlia/get_item.cpp +++ b/src/kademlia/get_item.cpp @@ -34,6 +34,10 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#if (defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS) && !TORRENT_NO_ASSERTS +#include +#endif + namespace libtorrent { namespace dht { @@ -53,25 +57,38 @@ void get_item::got_data(lazy_entry const* v, return; } } - else + else if (m_data.empty()) { std::pair buf = v->data_section(); if (hasher(buf.first, buf.second).final() != m_target) return; - m_data_callback(v); - // There can only be one true immutable item with a given id - // Now that we've got it there's no point in continuing to query other nodes - abort(); + m_data.assign(v); + bool put_requested = m_data_callback(m_data); + if (put_requested) + { +#if (defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS) && !TORRENT_NO_ASSERTS + std::vector buffer; + bencode(std::back_inserter(buffer), m_data.value()); + TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final()); +#endif + m_nodes_callback = boost::bind(&get_item::put, this, _1); + } + else + { + // There can only be one true immutable item with a given id + // Now that we've got it and the user doesn't want to do a put + // there's no point in continuing to query other nodes + abort(); + } } } get_item::get_item( node_impl& node , node_id target - , data_callback const& dcallback - , nodes_callback const& ncallback) - : find_data(node, target, ncallback) + , data_callback const& dcallback) + : find_data(node, target, nodes_callback()) , m_data_callback(dcallback) { } @@ -108,11 +125,71 @@ bool get_item::invoke(observer_ptr o) void get_item::done() { - if (m_data.is_mutable()) - m_data_callback(m_data); + if (m_data.is_mutable() || m_data.empty()) + { + bool put_requested = m_data_callback(m_data); + if (put_requested) + { +#if (defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS) && !TORRENT_NO_ASSERTS + if (m_data.is_mutable()) + { + TORRENT_ASSERT(m_target == hasher(m_data.pk(), item_pk_len).final()); + } + else + { + std::vector buffer; + bencode(std::back_inserter(buffer), m_data.value()); + TORRENT_ASSERT(m_target == hasher(&buffer[0], buffer.size()).final()); + } +#endif + m_nodes_callback = boost::bind(&get_item::put, this, _1); + } + } find_data::done(); } +void get_item::put(std::vector > const& v) +{ +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "sending put [ v: " << m_data.value() + << " seq: " << (m_data.is_mutable() ? m_data.seq() : -1) + << " nodes: " << v.size() << " ]" ; +#endif + + // create a dummy traversal_algorithm + boost::intrusive_ptr algo( + new traversal_algorithm(m_node, (node_id::min)())); + + // store on the first k nodes + for (std::vector >::const_iterator i = v.begin() + , end(v.end()); i != end; ++i) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << " put-distance: " << (160 - distance_exp(m_target, i->first.id)); +#endif + + void* ptr = m_node.m_rpc.allocate_observer(); + if (ptr == 0) return; + observer_ptr o(new (ptr) announce_observer(algo, i->first.ep(), i->first.id)); +#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS + o->m_in_constructor = false; +#endif + entry e; + e["y"] = "q"; + e["q"] = "put"; + entry& a = e["a"]; + a["v"] = m_data.value(); + a["token"] = i->second; + if (m_data.is_mutable()) + { + a["k"] = std::string(m_data.pk(), item_pk_len); + a["seq"] = m_data.seq(); + a["sig"] = std::string(m_data.sig(), item_sig_len); + } + m_node.m_rpc.invoke(e, i->first.ep(), o); + } +} + void get_item_observer::extract_data(lazy_entry const* r) { char const* pk = NULL; diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 9ed320643a..64514581d7 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -376,14 +376,14 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port, bool seed ta->start(); } -void node_impl::get_item(sha1_hash const& target, boost::function f) +void node_impl::get_item(sha1_hash const& target, boost::function f) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "starting get for [ " << target << " ]" ; #endif boost::intrusive_ptr ta; - ta.reset(new dht::get_item(*this, target, f, get_item::nodes_callback())); + ta.reset(new dht::get_item(*this, target, f)); ta->start(); } diff --git a/test/test_dht.cpp b/test/test_dht.cpp index db81c33dac..a31e332039 100644 --- a/test/test_dht.cpp +++ b/test/test_dht.cpp @@ -124,6 +124,13 @@ boost::array generate_key() static const std::string no; +std::list >::iterator +find_packet(udp::endpoint ep) +{ + return std::find_if(g_sent_packets.begin(), g_sent_packets.end() + , boost::bind(&std::pair::first, _1) == ep); +} + void lazy_from_entry(entry const& e, lazy_entry& l) { error_code ec; @@ -186,8 +193,7 @@ void send_dht_request(node_impl& node, char const* msg, udp::endpoint const& ep // response in g_sent_packets std::list >::iterator i - = std::find_if(g_sent_packets.begin(), g_sent_packets.end() - , boost::bind(&std::pair::first, _1) == ep); + = find_packet(ep); if (i == g_sent_packets.end()) { TEST_ERROR("not response from DHT node"); @@ -413,10 +419,20 @@ void get_peers_cb(std::vector const& peers) } std::vector g_got_items; +dht::item g_put_item; +int g_put_count; -void get_item_cb(dht::item const& i) +bool get_item_cb(dht::item& i) { - g_got_items.push_back(i); + if (!i.empty()) + g_got_items.push_back(i); + if (!g_put_item.empty()) + { + i = g_put_item; + g_put_count++; + return true; + } + return false; } // TODO: 3 test obfuscated_get_peers @@ -433,7 +449,7 @@ int test_main() // DHT should be running on port 48199 now lazy_entry response; - lazy_entry const* parsed[10]; + lazy_entry const* parsed[11]; char error_string[200]; bool ret; @@ -1588,6 +1604,175 @@ int test_main() } while (false); + dht::key_desc_t put_immutable_item_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 3, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, 0}, + {"token", lazy_entry::string_t, 2, 0}, + {"v", lazy_entry::none_t, 0, key_desc_t::last_child}, + }; + + dht::key_desc_t put_mutable_item_desc[] = { + {"y", lazy_entry::string_t, 1, 0}, + {"t", lazy_entry::string_t, 2, 0}, + {"q", lazy_entry::string_t, 3, 0}, + {"a", lazy_entry::dict_t, 0, key_desc_t::parse_children}, + {"id", lazy_entry::string_t, 20, 0}, + {"cas", lazy_entry::string_t, 20, key_desc_t::optional}, + {"k", lazy_entry::string_t, item_pk_len, 0}, + {"seq", lazy_entry::int_t, 0, 0}, + {"sig", lazy_entry::string_t, item_sig_len, 0}, + {"token", lazy_entry::string_t, 2, 0}, + {"v", lazy_entry::none_t, 0, key_desc_t::last_child}, + }; + + // immutable put + + do + { + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + enum { num_test_nodes = 2 }; + node_entry nodes[num_test_nodes] = + { node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234)) + , node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) }; + + for (int i = 0; i < num_test_nodes; ++i) + node.m_table.add_node(nodes[i]); + + g_put_item.assign(items[0].ent); + node.get_item(items[0].target, get_item_cb); + + TEST_EQUAL(g_sent_packets.size(), num_test_nodes); + if (g_sent_packets.size() != num_test_nodes) break; + + for (int i = 0; i < num_test_nodes; ++i) + { + std::list >::iterator packet = find_packet(nodes[i].ep()); + TEST_CHECK(packet != g_sent_packets.end()); + lazy_from_entry(packet->second, response); + verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string)); + g_sent_packets.erase(packet); + std::stringstream t; t << std::setw(2) << std::setfill('0') << i; + send_dht_response(node, response, nodes[i].ep(), nodes_t(), t.str(), 1234, + std::set(), 0, 0, std::string(), std::string(), -1, &nodes[i].id); + } + + TEST_EQUAL(g_put_count, 1); + TEST_EQUAL(g_sent_packets.size(), num_test_nodes); + if (g_sent_packets.size() != num_test_nodes) break; + + itemv.second = bencode(buffer, items[0].ent); + + for (int i = 0; i < num_test_nodes; ++i) + { + std::list >::iterator packet = find_packet(nodes[i].ep()); + TEST_CHECK(packet != g_sent_packets.end()); + if (packet == g_sent_packets.end()) continue; + + lazy_from_entry(packet->second, response); + ret = verify_message(&response, put_immutable_item_desc, parsed, 7, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "put"); + std::pair v = parsed[6]->data_section(); + TEST_EQUAL(v.second, itemv.second); + TEST_CHECK(memcmp(v.first, itemv.first, itemv.second) == 0); + std::stringstream t; t << std::setw(2) << std::setfill('0') << i; + TEST_EQUAL(parsed[5]->string_value(), t.str()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "put") continue; + } + else + { + fprintf(stderr, " invalid put request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + continue; + } + } + + g_sent_packets.clear(); + g_put_item.clear(); + g_put_count = 0; + + } while (false); + + // mutable put + + do + { + dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0); + enum { num_test_nodes = 2 }; + node_entry nodes[num_test_nodes] = + { node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234)) + , node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) }; + + for (int i = 0; i < num_test_nodes; ++i) + node.m_table.add_node(nodes[i]); + + sha1_hash target = hasher(public_key, item_pk_len).final(); + g_put_item.assign(items[0].ent, seq, public_key, private_key); + std::string sig(g_put_item.sig(), item_sig_len); + node.get_item(target, get_item_cb); + + TEST_EQUAL(g_sent_packets.size(), num_test_nodes); + if (g_sent_packets.size() != num_test_nodes) break; + + for (int i = 0; i < num_test_nodes; ++i) + { + std::list >::iterator packet = find_packet(nodes[i].ep()); + TEST_CHECK(packet != g_sent_packets.end()); + lazy_from_entry(packet->second, response); + verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string)); + g_sent_packets.erase(packet); + std::stringstream t; t << std::setw(2) << std::setfill('0') << i; + send_dht_response(node, response, nodes[i].ep(), nodes_t(), t.str(), 1234, + std::set(), 0, 0, std::string(), std::string(), -1, &nodes[i].id); + } + + TEST_EQUAL(g_put_count, 1); + TEST_EQUAL(g_sent_packets.size(), num_test_nodes); + if (g_sent_packets.size() != num_test_nodes) break; + + itemv.second = bencode(buffer, items[0].ent); + + for (int i = 0; i < num_test_nodes; ++i) + { + std::list >::iterator packet = find_packet(nodes[i].ep()); + TEST_CHECK(packet != g_sent_packets.end()); + if (packet == g_sent_packets.end()) continue; + + lazy_from_entry(packet->second, response); + ret = verify_message(&response, put_mutable_item_desc, parsed, 11, error_string, sizeof(error_string)); + if (ret) + { + TEST_EQUAL(parsed[0]->string_value(), "q"); + TEST_EQUAL(parsed[2]->string_value(), "put"); + TEST_EQUAL(parsed[6]->string_value(), std::string(public_key, item_pk_len)); + TEST_EQUAL(parsed[7]->int_value(), seq); + TEST_EQUAL(parsed[8]->string_value(), sig); + std::pair v = parsed[10]->data_section(); + TEST_EQUAL(v.second, itemv.second); + TEST_CHECK(memcmp(v.first, itemv.first, itemv.second) == 0); + std::stringstream t; t << std::setw(2) << std::setfill('0') << i; + TEST_EQUAL(parsed[9]->string_value(), t.str()); + if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "put") continue; + } + else + { + fprintf(stderr, " invalid put request: %s\n", print_entry(response).c_str()); + TEST_ERROR(error_string); + continue; + } + } + + g_sent_packets.clear(); + g_put_item.clear(); + g_put_count = 0; + + } while (false); + return 0; }