From 794efd338a06f4ca552b5c8a73088c06a92d7df3 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 25 Jul 2016 19:13:09 -0400 Subject: [PATCH 1/3] Relax timing restrictions when syncing blockchain. When we request several blocks from a peer at once, base our timeout on the last block we've received from the peer instead of on the time of our request. --- .../include/graphene/net/peer_connection.hpp | 3 +- libraries/net/node.cpp | 36 +++++++++---------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 7cfa316..5635940 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -226,7 +226,8 @@ namespace graphene { namespace net bool peer_needs_sync_items_from_us; bool we_need_sync_items_from_peer; fc::optional, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() - item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects + fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer + std::set sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows fc::time_point_sec last_block_time_delegate_has_seen; bool inhibit_fetching_sync_blocks; diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index b439faf..820d392 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -1000,8 +1000,8 @@ namespace graphene { namespace net { namespace detail { dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) ); item_id item_id_to_request( graphene::net::block_message_type, item_to_request ); _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) ); - peer->sync_items_requested_from_peer.insert( peer_connection::item_to_time_map_type::value_type(item_id_to_request, fc::time_point::now() ) ); - std::vector items_to_fetch; + peer->last_sync_item_received_time = fc::time_point::now(); + peer->sync_items_requested_from_peer.insert(item_to_request); peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector{item_id_to_request.item_hash} ) ); } @@ -1013,8 +1013,8 @@ namespace graphene { namespace net { namespace detail { for (const item_hash_t& item_to_request : items_to_request) { _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) ); - item_id item_id_to_request( graphene::net::block_message_type, item_to_request ); - peer->sync_items_requested_from_peer.insert( peer_connection::item_to_time_map_type::value_type(item_id_to_request, fc::time_point::now() ) ); + peer->last_sync_item_received_time = fc::time_point::now(); + peer->sync_items_requested_from_peer.insert(item_to_request); } peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request)); } @@ -1378,14 +1378,14 @@ namespace graphene { namespace net { namespace detail { else { bool disconnect_due_to_request_timeout = false; - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->sync_items_requested_from_peer) - if (item_and_time.second < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); - disconnect_due_to_request_timeout = true; - break; - } + if (!active_peer->sync_items_requested_from_peer.empty() && + active_peer->last_sync_item_received_time < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests", + ("peer", active_peer->get_remote_endpoint())("count", active_peer->sync_items_requested_from_peer.size())); + disconnect_due_to_request_timeout = true; + break; + } if (!disconnect_due_to_request_timeout && active_peer->item_ids_requested_from_peer && active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) @@ -2796,7 +2796,7 @@ namespace graphene { namespace net { namespace detail { return; } - auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item); + auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item.item_hash); if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) { originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); @@ -2806,7 +2806,7 @@ namespace graphene { namespace net { namespace detail { else disconnect_from_peer(originating_peer, "You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",true, fc::exception(FC_LOG_MESSAGE(error,"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.", - ("item_id",requested_item)))); + ("item_id", requested_item)))); wlog("Peer doesn't have the requested sync item. This really shouldn't happen"); trigger_fetch_sync_items_loop(); return; @@ -2984,8 +2984,8 @@ namespace graphene { namespace net { namespace detail { // received yet, reschedule them to be fetched from another peer if (!originating_peer->sync_items_requested_from_peer.empty()) { - for (auto sync_item_and_time : originating_peer->sync_items_requested_from_peer) - _active_sync_requests.erase(sync_item_and_time.first.item_hash); + for (auto sync_item : originating_peer->sync_items_requested_from_peer) + _active_sync_requests.erase(sync_item); trigger_fetch_sync_items_loop(); } @@ -3475,11 +3475,11 @@ namespace graphene { namespace net { namespace detail { else { // not during normal operation. see if we requested it during sync - auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(item_id(graphene::net::block_message_type, - block_message_to_process.block_id)); + auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find( block_message_to_process.block_id); if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) { originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); + originating_peer->last_sync_item_received_time = fc::time_point::now(); _active_sync_requests.erase(block_message_to_process.block_id); process_block_during_sync(originating_peer, block_message_to_process, message_hash); if (originating_peer->idle()) From b69880981c7be9d224ffa04abbccae21f9eba0a3 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 8 Aug 2017 12:05:57 -0400 Subject: [PATCH 2/3] Fix bug where peers could get stuck in sync mode. There was a case where we had requested a block through the sync mechanism and also received it through the normal inventory mechanism where we would leave the peer in a sync state, but never ask them for more sync blocks. This commit fixes the bug that put us into that stuck state, and also adds code to disconnect peers if we ever manage to get into that stalled state. Conflicts: libraries/net/node.cpp --- .../include/graphene/net/peer_connection.hpp | 6 +- libraries/net/node.cpp | 438 +++++++++++------- libraries/net/peer_connection.cpp | 23 +- 3 files changed, 283 insertions(+), 184 deletions(-) diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 5635940..b6c24ef 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -270,6 +270,7 @@ namespace graphene { namespace net fc::thread* _thread; unsigned _send_message_queue_tasks_running; // temporary debugging #endif + bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system private: peer_connection(peer_connection_delegate* delegate); void destroy(); @@ -300,8 +301,9 @@ namespace graphene { namespace net fc::ip::endpoint get_local_endpoint(); void set_remote_endpoint(fc::optional new_remote_endpoint); - bool busy(); - bool idle(); + bool busy() const; + bool idle() const; + bool is_currently_handling_message() const; bool is_transaction_fetching_inhibited() const; fc::sha512 get_shared_secret() const; diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 820d392..299dd86 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -1418,6 +1418,19 @@ namespace graphene { namespace net { namespace detail { ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); peers_to_send_keep_alive.push_back(active_peer); } + else if (active_peer->we_need_sync_items_from_peer && + !active_peer->is_currently_handling_message() && + !active_peer->item_ids_requested_from_peer && + active_peer->ids_of_items_to_get.empty()) + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); + } } } @@ -2450,24 +2463,24 @@ namespace graphene { namespace net { namespace detail { uint32_t expected_num = first_block_number_in_reponse + i; if (actual_num != expected_num) { - wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("peer_endpoint", originating_peer->get_remote_endpoint()) - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, - "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num))); - disconnect_from_peer(originating_peer, - "You gave an invalid response to my request for sync blocks", - true, error_for_peer); - return; + wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, + "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num))); + disconnect_from_peer(originating_peer, + "You gave an invalid response to my request for sync blocks", + true, error_for_peer); + return; } } @@ -2514,180 +2527,204 @@ namespace graphene { namespace net { namespace detail { } originating_peer->item_ids_requested_from_peer.reset(); - dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", - ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) - ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); - //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) - //{ - // dlog( "sync: ${hash}", ("hash", item_hash ) ); - //} - - // if the peer doesn't have any items after the one we asked for - if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && - ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. - ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && - _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, - blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain - originating_peer->ids_of_items_to_get.empty() && - originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? + // if exceptions are throw after clearing the item_ids_requested_from_peer (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); - originating_peer->we_need_sync_items_from_peer = false; + dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", + ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) + ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); + //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) + //{ + // dlog( "sync: ${hash}", ("hash", item_hash ) ); + //} + + // if the peer doesn't have any items after the one we asked for + if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && + ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. + ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && + _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, + blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain + originating_peer->ids_of_items_to_get.empty() && + originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? + { + dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); + originating_peer->we_need_sync_items_from_peer = false; - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - _total_number_of_unfetched_items = new_number_of_unfetched_items; - if( new_number_of_unfetched_items == 0 ) - _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + _total_number_of_unfetched_items = new_number_of_unfetched_items; + if( new_number_of_unfetched_items == 0 ) + _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); - return; - } + return; + } - std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), - blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - // flush any items this peer sent us that we've already received and processed from another peer - if (!item_hashes_received.empty() && - originating_peer->ids_of_items_to_get.empty()) - { - bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), + blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + // flush any items this peer sent us that we've already received and processed from another peer + if (!item_hashes_received.empty() && + originating_peer->ids_of_items_to_get.empty()) + { + bool is_first_item_for_other_peer = false; + for (const peer_connection_ptr& peer : _active_connections) + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + ("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } + dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", + ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); + if (!is_first_item_for_other_peer) { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; + while (!item_hashes_received.empty() && + _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, + item_hashes_received.front()))) + { + assert(item_hashes_received.front() != item_hash_t()); + originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); + originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); + dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", + ("peer", originating_peer->get_remote_endpoint()) + ("block_id", originating_peer->last_block_delegate_has_seen) + ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); + + item_hashes_received.pop_front(); + } + dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); } - dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", - ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); - if (!is_first_item_for_other_peer) + } + else if (!item_hashes_received.empty()) { - while (!item_hashes_received.empty() && - _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, - item_hashes_received.front()))) + // we received a list of items and we already have a list of items to fetch from this peer. + // In the normal case, this list will immediately follow the existing list, meaning the + // last hash of our existing list will match the first hash of the new list. + + // In the much less likely case, we've received a partial list of items from the peer, then + // the peer switched forks before sending us the remaining list. In this case, the first + // hash in the new list may not be the last hash in the existing list (it may be earlier, or + // it may not exist at all. + + // In either case, pop items off the back of our existing list until we find our first + // item, then append our list. + while (!originating_peer->ids_of_items_to_get.empty()) { - assert(item_hashes_received.front() != item_hash_t()); + if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) + originating_peer->ids_of_items_to_get.pop_back(); + else + break; + } + if (originating_peer->ids_of_items_to_get.empty()) + { + // this happens when the peer has switched forks between the last inventory message and + // this one, and there weren't any unfetched items in common + // We don't know where in the blockchain the new front() actually falls, all we can + // expect is that it is a block that we knew about because it should be one of the + // blocks we sent in the initial synopsis. + assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", - ("peer", originating_peer->get_remote_endpoint()) - ("block_id", originating_peer->last_block_delegate_has_seen) - ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); - item_hashes_received.pop_front(); } - dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); - } - } - else if (!item_hashes_received.empty()) - { - // we received a list of items and we already have a list of items to fetch from this peer. - // In the normal case, this list will immediately follow the existing list, meaning the - // last hash of our existing list will match the first hash of the new list. - - // In the much less likely case, we've received a partial list of items from the peer, then - // the peer switched forks before sending us the remaining list. In this case, the first - // hash in the new list may not be the last hash in the existing list (it may be earlier, or - // it may not exist at all. - - // In either case, pop items off the back of our existing list until we find our first - // item, then append our list. - while (!originating_peer->ids_of_items_to_get.empty()) - { - if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) - originating_peer->ids_of_items_to_get.pop_back(); else - break; - } - if (originating_peer->ids_of_items_to_get.empty()) - { - // this happens when the peer has switched forks between the last inventory message and - // this one, and there weren't any unfetched items in common - // We don't know where in the blockchain the new front() actually falls, all we can - // expect is that it is a block that we knew about because it should be one of the - // blocks we sent in the initial synopsis. - assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); - originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); - originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - item_hashes_received.pop_front(); - } - else - { - // the common simple case: the new list extends the old. pop off the duplicate element - originating_peer->ids_of_items_to_get.pop_back(); + { + // the common simple case: the new list extends the old. pop off the duplicate element + originating_peer->ids_of_items_to_get.pop_back(); + } } - } - if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) - assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); + if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) + assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); - // append the remaining items to the peer's list - boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); + // append the remaining items to the peer's list + boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - // at any given time, there's a maximum number of blocks that can possibly be out there - // [(now - genesis time) / block interval]. If they offer us more blocks than that, - // they must be an attacker or have a buggy client. - fc::time_point_sec minimum_time_of_last_offered_block = - originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block - originating_peer->number_of_unfetched_item_ids * MUSE_BLOCK_INTERVAL; - if (minimum_time_of_last_offered_block > _delegate->get_blockchain_now() + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) - { - wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", - ("peer", originating_peer->get_remote_endpoint()) - ("timestamp", minimum_time_of_last_offered_block)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", - ("blocks", originating_peer->number_of_unfetched_item_ids) - ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) - ("now", _delegate->get_blockchain_now()))); - disconnect_from_peer(originating_peer, - "You offered me a list of more sync blocks than could possibly exist", - true, error_for_peer); - return; - } + // at any given time, there's a maximum number of blocks that can possibly be out there + // [(now - genesis time) / block interval]. If they offer us more blocks than that, + // they must be an attacker or have a buggy client. + fc::time_point_sec minimum_time_of_last_offered_block = + originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block + originating_peer->number_of_unfetched_item_ids * MUSE_BLOCK_INTERVAL; + if (minimum_time_of_last_offered_block > _delegate->get_blockchain_now() + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) + { + wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", + ("peer", originating_peer->get_remote_endpoint()) + ("timestamp", minimum_time_of_last_offered_block)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", + ("blocks", originating_peer->number_of_unfetched_item_ids) + ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) + ("now", _delegate->get_blockchain_now()))); + disconnect_from_peer(originating_peer, + "You offered me a list of more sync blocks than could possibly exist", + true, error_for_peer); + return; + } - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - if (new_number_of_unfetched_items != _total_number_of_unfetched_items) - _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, - new_number_of_unfetched_items); - _total_number_of_unfetched_items = new_number_of_unfetched_items; + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + if (new_number_of_unfetched_items != _total_number_of_unfetched_items) + _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, + new_number_of_unfetched_items); + _total_number_of_unfetched_items = new_number_of_unfetched_items; - if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) - { - // the peer hasn't sent us all the items it knows about. - if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) { - // we have a good number of item ids from this peer, start fetching blocks from it; - // we'll switch back later to finish the job. - trigger_fetch_sync_items_loop(); + // the peer hasn't sent us all the items it knows about. + if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + { + // we have a good number of item ids from this peer, start fetching blocks from it; + // we'll switch back later to finish the job. + trigger_fetch_sync_items_loop(); + } + else + { + // keep fetching the peer's list of sync items until we get enough to switch into block- + // fetchimg mode + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } else { - // keep fetching the peer's list of sync items until we get enough to switch into block- - // fetchimg mode - fetch_next_batch_of_item_ids_from_peer(originating_peer); + // the peer has told us about all of the items it knows + if (!originating_peer->ids_of_items_to_get.empty()) + { + // we now know about all of the items the peer knows about, and there are some items on the list + // that we should try to fetch. Kick off the fetch loop. + trigger_fetch_sync_items_loop(); + } + else + { + // If we get here, the peer has sent us a non-empty list of items, but we have already + // received all of the items from other peers. Send a new request to the peer to + // see if we're really in sync + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } } - else + catch (const fc::canceled_exception&) { - // the peer has told us about all of the items it knows - if (!originating_peer->ids_of_items_to_get.empty()) - { - // we now know about all of the items the peer knows about, and there are some items on the list - // that we should try to fetch. Kick off the fetch loop. - trigger_fetch_sync_items_loop(); - } - else - { - // If we get here, the peer has sent us a non-empty list of items, but we have already - // received all of the items from other peers. Send a new request to the peer to - // see if we're really in sync - fetch_next_batch_of_item_ids_from_peer(originating_peer); - } + throw; + } + catch (const fc::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); + } + catch (const std::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } } else @@ -3264,7 +3301,30 @@ namespace graphene { namespace net { namespace detail { block_processed_this_iteration = true; } else + { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); + for (const peer_connection_ptr& peer : _active_connections) + { + auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); + if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) + { + peer->ids_of_items_being_processed.erase(items_being_processed_iter); + dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks", + ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size())); + + // if we just processed the last item in our list from this peer, we will want to + // send another request to find out if we are now in sync (this is normally handled in + // send_sync_block_to_node_delegate) + if (peer->ids_of_items_to_get.empty() && + peer->number_of_unfetched_item_ids == 0 && + peer->ids_of_items_being_processed.empty()) + { + dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint())); + fetch_next_batch_of_item_ids_from_peer(peer.get()); + } + } + } + } break; // start iterating _received_sync_items from the beginning } // end if potential_first_block @@ -3479,20 +3539,44 @@ namespace graphene { namespace net { namespace detail { if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) { originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); - originating_peer->last_sync_item_received_time = fc::time_point::now(); - _active_sync_requests.erase(block_message_to_process.block_id); - process_block_during_sync(originating_peer, block_message_to_process, message_hash); - if (originating_peer->idle()) + // if exceptions are throw here after removing the sync item from the list (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - // we have finished fetching a batch of items, so we either need to grab another batch of items - // or we need to get another list of item ids. - if (originating_peer->number_of_unfetched_item_ids > 0 && - originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) - fetch_next_batch_of_item_ids_from_peer(originating_peer); - else - trigger_fetch_sync_items_loop(); + originating_peer->last_sync_item_received_time = fc::time_point::now(); + _active_sync_requests.erase(block_message_to_process.block_id); + process_block_during_sync(originating_peer, block_message_to_process, message_hash); + if (originating_peer->idle()) + { + // we have finished fetching a batch of items, so we either need to grab another batch of items + // or we need to get another list of item ids. + if (originating_peer->number_of_unfetched_item_ids > 0 && + originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + fetch_next_batch_of_item_ids_from_peer(originating_peer); + else + trigger_fetch_sync_items_loop(); + } + return; + } + catch (const fc::canceled_exception& e) + { + throw; + } + catch (const fc::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); + } + catch (const std::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } - return; } } diff --git a/libraries/net/peer_connection.cpp b/libraries/net/peer_connection.cpp index 4c375d5..68b2631 100644 --- a/libraries/net/peer_connection.cpp +++ b/libraries/net/peer_connection.cpp @@ -28,6 +28,8 @@ #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -85,11 +87,12 @@ namespace graphene { namespace net inhibit_fetching_sync_blocks(false), transaction_fetching_inhibited_until(fc::time_point::min()), last_known_fork_block_number(0), - firewall_check_state(nullptr) + firewall_check_state(nullptr), #ifndef NDEBUG - ,_thread(&fc::thread::current()), - _send_message_queue_tasks_running(0) + _thread(&fc::thread::current()), + _send_message_queue_tasks_running(0), #endif + _currently_handling_message(false) { } @@ -264,6 +267,10 @@ namespace graphene { namespace net void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message ) { VERIFY_CORRECT_THREAD(); + _currently_handling_message = true; + BOOST_SCOPE_EXIT(this_) { + this_->_currently_handling_message = false; + } BOOST_SCOPE_EXIT_END _node->on_message( this, received_message ); } @@ -437,18 +444,24 @@ namespace graphene { namespace net _remote_endpoint = new_remote_endpoint; } - bool peer_connection::busy() + bool peer_connection::busy() const { VERIFY_CORRECT_THREAD(); return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer; } - bool peer_connection::idle() + bool peer_connection::idle() const { VERIFY_CORRECT_THREAD(); return !busy(); } + bool peer_connection::is_currently_handling_message() const + { + VERIFY_CORRECT_THREAD(); + return _currently_handling_message; + } + bool peer_connection::is_transaction_fetching_inhibited() const { VERIFY_CORRECT_THREAD(); From faf1c685181d8dbe35473a95ca679c680409805a Mon Sep 17 00:00:00 2001 From: theoreticalbts Date: Thu, 24 Aug 2017 15:20:06 -0400 Subject: [PATCH 3/3] node.cpp: Fix possible race condition in process_backlog_of_sync_blocks() #1434 --- libraries/net/node.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 299dd86..ddd0e64 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -3303,6 +3303,7 @@ namespace graphene { namespace net { namespace detail { else { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); + std::vector< peer_connection_ptr > peers_needing_next_batch; for (const peer_connection_ptr& peer : _active_connections) { auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); @@ -3320,10 +3321,12 @@ namespace graphene { namespace net { namespace detail { peer->ids_of_items_being_processed.empty()) { dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint())); - fetch_next_batch_of_item_ids_from_peer(peer.get()); + peers_needing_next_batch.push_back( peer ); } } } + for( const peer_connection_ptr& peer : peers_needing_next_batch ) + fetch_next_batch_of_item_ids_from_peer(peer.get()); } break; // start iterating _received_sync_items from the beginning