Skip to content

Commit

Permalink
perf: use a std::map to store peers in tr_swarm (#5645)
Browse files Browse the repository at this point in the history
  • Loading branch information
tearfur committed Jun 22, 2023
1 parent b562b2d commit 699b3d8
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 64 deletions.
6 changes: 2 additions & 4 deletions libtransmission/handshake.cc
Expand Up @@ -521,9 +521,8 @@ ReadState tr_handshake::read_crypto_provide(tr_peerIo* peer_io)

if (auto const info = mediator_->torrent_from_obfuscated(obfuscated_hash); info)
{
auto const& [addr, port] = peer_io->socket_address();
bool const client_is_seed = info->is_done;
bool const peer_is_seed = mediator_->is_peer_known_seed(info->id, addr, port);
bool const peer_is_seed = mediator_->is_peer_known_seed(info->id, peer_io->socket_address());
tr_logAddTraceHand(this, fmt::format("got INCOMING connection's encrypted handshake for torrent [{}]", info->id));
peer_io->set_torrent_hash(info->info_hash);

Expand Down Expand Up @@ -789,8 +788,7 @@ void tr_handshake::on_error(tr_peerIo* io, tr_error const& error, void* vhandsha
/* Don't mark a peer as non-µTP unless it's really a connect failure. */
if ((error.code == ETIMEDOUT || error.code == ECONNREFUSED) && info)
{
auto const& [addr, port] = io->socket_address();
handshake->mediator_->set_utp_failed(info_hash, addr, port);
handshake->mediator_->set_utp_failed(info_hash, io->socket_address());
}

if (handshake->mediator_->allows_tcp() && io->reconnect())
Expand Down
9 changes: 6 additions & 3 deletions libtransmission/handshake.h
Expand Up @@ -61,15 +61,18 @@ class tr_handshake
[[nodiscard]] virtual libtransmission::TimerMaker& timer_maker() = 0;
[[nodiscard]] virtual bool allows_dht() const = 0;
[[nodiscard]] virtual bool allows_tcp() const = 0;
[[nodiscard]] virtual bool is_peer_known_seed(tr_torrent_id_t tor_id, tr_address const& addr, tr_port const& port)
const = 0;
[[nodiscard]] virtual bool is_peer_known_seed(
tr_torrent_id_t tor_id,
std::pair<tr_address, tr_port> const& socket_address) const = 0;
[[nodiscard]] virtual size_t pad(void* setme, size_t max_bytes) const = 0;
[[nodiscard]] virtual DH::private_key_bigend_t private_key() const
{
return DH::randomPrivateKey();
}

virtual void set_utp_failed(tr_sha1_digest_t const& info_hash, tr_address const&, tr_port const&) = 0;
virtual void set_utp_failed(
tr_sha1_digest_t const& info_hash,
std::pair<tr_address, tr_port> const& socket_address) = 0;
};

tr_handshake(Mediator* mediator, std::shared_ptr<tr_peerIo> peer_io, tr_encryption_mode mode_in, DoneFunc on_done);
Expand Down
89 changes: 42 additions & 47 deletions libtransmission/peer-mgr.cc
Expand Up @@ -107,15 +107,16 @@ class HandshakeMediator final : public tr_handshake::Mediator
return session_.allowsTCP();
}

void set_utp_failed(tr_sha1_digest_t const& info_hash, tr_address const& addr, tr_port const& port) override
void set_utp_failed(tr_sha1_digest_t const& info_hash, std::pair<tr_address, tr_port> const& socket_address) override
{
if (auto* const tor = session_.torrents().get(info_hash); tor != nullptr)
{
tr_peerMgrSetUtpFailed(tor, addr, port, true);
tr_peerMgrSetUtpFailed(tor, socket_address, true);
}
}

[[nodiscard]] bool is_peer_known_seed(tr_torrent_id_t tor_id, tr_address const& addr, tr_port const& port) const override;
[[nodiscard]] bool is_peer_known_seed(tr_torrent_id_t tor_id, std::pair<tr_address, tr_port> const& socket_address)
const override;

[[nodiscard]] libtransmission::TimerMaker& timer_maker() override
{
Expand Down Expand Up @@ -462,7 +463,10 @@ class tr_swarm
{
if (!pool_is_all_seeds_)
{
pool_is_all_seeds_ = std::all_of(std::begin(pool), std::end(pool), [](auto const& atom) { return atom.isSeed(); });
pool_is_all_seeds_ = std::all_of(
std::begin(pool),
std::end(pool),
[](auto const& key_val) { return key_val.second.isSeed(); });
}

return *pool_is_all_seeds_;
Expand All @@ -473,42 +477,34 @@ class tr_swarm
pool_is_all_seeds_.reset();
}

[[nodiscard]] peer_atom* get_existing_atom(tr_address const& addr, tr_port const& port) noexcept
[[nodiscard]] peer_atom* get_existing_atom(std::pair<tr_address, tr_port> const& socket_address) noexcept
{
auto const iter = std::find_if(
std::begin(pool),
std::end(pool),
[&addr, &port](auto const& atom) { return atom.addr == addr && atom.port == port; });
return iter != std::end(pool) ? &*iter : nullptr;
auto&& it = pool.find(socket_address);
return it != pool.end() ? &it->second : nullptr;
}

[[nodiscard]] peer_atom const* get_existing_atom(tr_address const& addr, tr_port const& port) const noexcept
[[nodiscard]] peer_atom const* get_existing_atom(std::pair<tr_address, tr_port> const& socket_address) const noexcept
{
auto const iter = std::find_if(
std::begin(pool),
std::end(pool),
[&addr, &port](auto const& atom) { return atom.addr == addr && atom.port == port; });
return iter != std::end(pool) ? &*iter : nullptr;
auto const& it = pool.find(socket_address);
return it != pool.cend() ? &it->second : nullptr;
}

[[nodiscard]] bool peer_is_a_seed(tr_address const& addr, tr_port const& port) const noexcept
[[nodiscard]] bool peer_is_a_seed(std::pair<tr_address, tr_port> const& socket_address) const noexcept
{
auto const* const atom = get_existing_atom(addr, port);
auto const* const atom = get_existing_atom(socket_address);
return atom != nullptr && atom->isSeed();
}

peer_atom* ensure_atom_exists(tr_address const& addr, tr_port const& port, uint8_t const flags, uint8_t const from)
peer_atom* ensure_atom_exists(std::pair<tr_address, tr_port> const& socket_address, uint8_t const flags, uint8_t const from)
{
auto const& [addr, port] = socket_address;

TR_ASSERT(addr.is_valid());
TR_ASSERT(from < TR_PEER_FROM__MAX);

peer_atom* atom = get_existing_atom(addr, port);

if (atom == nullptr)
{
atom = &pool.emplace_back(addr, port, flags, from);
}
else
auto&& [atom_it, is_new] = pool.try_emplace(socket_address, addr, port, flags, from);
peer_atom* atom = &atom_it->second;
if (!is_new)
{
atom->fromBest = std::min(atom->fromBest, from);
atom->flags |= flags;
Expand Down Expand Up @@ -651,9 +647,8 @@ class tr_swarm
std::vector<tr_peerMsgs*> peers;

// tr_peers hold pointers to the items in this container,
// so use a deque instead of vector to prevent insertion from
// invalidating those pointers
std::deque<peer_atom> pool;
// therefore references to elements within cannot invalidate
std::map<std::pair<tr_address, tr_port>, peer_atom> pool;

tr_peerMsgs* optimistic = nullptr; /* the optimistic peer, or nullptr if none */

Expand Down Expand Up @@ -799,7 +794,7 @@ void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)
since the blocklist has changed, erase that cached value */
for (auto* const tor : mgr->session->torrents())
{
for (auto& atom : tor->swarm->pool)
for (auto& [socket_address, atom] : tor->swarm->pool)
{
atom.setBlocklistedDirty();
}
Expand All @@ -808,17 +803,17 @@ void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)

// ---

void tr_peerMgrSetUtpSupported(tr_torrent* tor, tr_address const& addr, tr_port const& port)
void tr_peerMgrSetUtpSupported(tr_torrent* tor, std::pair<tr_address, tr_port> const& socket_address)
{
if (auto* const atom = tor->swarm->get_existing_atom(addr, port); atom != nullptr)
if (auto* const atom = tor->swarm->get_existing_atom(socket_address); atom != nullptr)
{
atom->flags |= ADDED_F_UTP_FLAGS;
}
}

void tr_peerMgrSetUtpFailed(tr_torrent* tor, tr_address const& addr, tr_port const& port, bool failed)
void tr_peerMgrSetUtpFailed(tr_torrent* tor, std::pair<tr_address, tr_port> const& socket_address, bool failed)
{
if (auto* const atom = tor->swarm->get_existing_atom(addr, port); atom != nullptr)
if (auto* const atom = tor->swarm->get_existing_atom(socket_address); atom != nullptr)
{
atom->utp_failed = failed;
}
Expand Down Expand Up @@ -1007,15 +1002,15 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str

auto* const s = manager->get_existing_swarm(result.io->torrent_hash());

auto const [addr, port] = result.io->socket_address();
auto const& socket_address = result.io->socket_address();

if (result.io->is_incoming())
{
manager->incoming_handshakes.erase(std::make_pair(addr, port));
manager->incoming_handshakes.erase(socket_address);
}
else if (s != nullptr)
{
s->outgoing_handshakes.erase(std::make_pair(addr, port));
s->outgoing_handshakes.erase(socket_address);
}

auto const lock = manager->unique_lock();
Expand All @@ -1024,7 +1019,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str
{
if (s != nullptr)
{
struct peer_atom* atom = s->get_existing_atom(addr, port);
struct peer_atom* atom = s->get_existing_atom(socket_address);

if (atom != nullptr)
{
Expand All @@ -1045,7 +1040,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str
}
else /* looking good */
{
struct peer_atom* atom = s->ensure_atom_exists(addr, port, 0, TR_PEER_FROM_INCOMING);
struct peer_atom* atom = s->ensure_atom_exists(socket_address, 0, TR_PEER_FROM_INCOMING);

atom->time = tr_time();
atom->piece_data_time = 0;
Expand Down Expand Up @@ -1118,9 +1113,9 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket)
}
else /* we don't have a connection to them yet... */
{
auto const& socket_address = socket.socketAddress();
auto&& socket_address = socket.socketAddress();
manager->incoming_handshakes.try_emplace(
socket_address,
std::move(socket_address),
&manager->handshake_mediator_,
tr_peerIo::new_incoming(session, &session->top_bandwidth_, std::move(socket)),
session->encryptionMode(),
Expand All @@ -1134,7 +1129,7 @@ void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor)

auto* const swarm = tor->swarm;

for (auto& atom : swarm->pool)
for (auto& [socket_address, atom] : swarm->pool)
{
swarm->mark_atom_as_seed(atom);
}
Expand All @@ -1153,7 +1148,7 @@ size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t
if (tr_isPex(pex) && /* safeguard against corrupt data */
!s->manager->session->addressIsBlocked(pex->addr) && pex->is_valid_for_peers())
{
s->ensure_atom_exists(pex->addr, pex->port, pex->flags, from);
s->ensure_atom_exists(std::make_pair(pex->addr, pex->port), pex->flags, from);
++n_used;
}
}
Expand Down Expand Up @@ -1328,7 +1323,7 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
}
else /* TR_PEERS_INTERESTING */
{
for (auto const& atom : s->pool)
for (auto const& [socket_address, atom] : s->pool)
{
if (isAtomInteresting(tor, atom))
{
Expand Down Expand Up @@ -2399,7 +2394,7 @@ struct peer_candidate
continue;
}

for (auto& atom : swarm->pool)
for (auto& [socket_address, atom] : swarm->pool)
{
if (isPeerCandidate(tor, atom, now))
{
Expand Down Expand Up @@ -2493,8 +2488,8 @@ void tr_peerMgr::makeNewPeerConnections(size_t max)

// ---

bool HandshakeMediator::is_peer_known_seed(tr_torrent_id_t tor_id, tr_address const& addr, tr_port const& port) const
bool HandshakeMediator::is_peer_known_seed(tr_torrent_id_t tor_id, std::pair<tr_address, tr_port> const& socket_address) const
{
auto const* const tor = session_.torrents().get(tor_id);
return tor != nullptr && tor->swarm != nullptr && tor->swarm->peer_is_a_seed(addr, port);
return tor != nullptr && tor->swarm != nullptr && tor->swarm->peer_is_a_seed(socket_address);
}
4 changes: 2 additions & 2 deletions libtransmission/peer-mgr.h
Expand Up @@ -163,9 +163,9 @@ constexpr bool tr_isPex(tr_pex const* pex)

void tr_peerMgrFree(tr_peerMgr* manager);

void tr_peerMgrSetUtpSupported(tr_torrent* tor, tr_address const& addr, tr_port const& port);
void tr_peerMgrSetUtpSupported(tr_torrent* tor, std::pair<tr_address, tr_port> const& socket_address);

void tr_peerMgrSetUtpFailed(tr_torrent* tor, tr_address const& addr, tr_port const& port, bool failed);
void tr_peerMgrSetUtpFailed(tr_torrent* tor, std::pair<tr_address, tr_port> const& socket_address, bool failed);

[[nodiscard]] std::vector<tr_block_span_t> tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_peer const* peer, size_t numwant);

Expand Down
9 changes: 4 additions & 5 deletions libtransmission/peer-msgs.cc
Expand Up @@ -339,9 +339,9 @@ class tr_peerMsgsImpl final : public tr_peerMsgs

if (io->supports_utp())
{
auto const& [addr, port] = socketAddress();
tr_peerMgrSetUtpSupported(torrent, addr, port);
tr_peerMgrSetUtpFailed(torrent, addr, port, false);
auto const& socket_address = socketAddress();
tr_peerMgrSetUtpSupported(torrent, socket_address);
tr_peerMgrSetUtpFailed(torrent, socket_address, false);
}

if (io->supports_ltep())
Expand Down Expand Up @@ -1101,8 +1101,7 @@ void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload)
{
/* Mysterious µTorrent extension that we don't grok. However,
it implies support for µTP, so use it to indicate that. */
auto const& [addr, port] = msgs->socketAddress();
tr_peerMgrSetUtpFailed(msgs->torrent, addr, port, false);
tr_peerMgrSetUtpFailed(msgs->torrent, msgs->socketAddress(), false);
}
}

Expand Down
8 changes: 5 additions & 3 deletions tests/libtransmission/handshake-test.cc
Expand Up @@ -81,8 +81,9 @@ class HandshakeTest : public SessionTest
return true;
}

[[nodiscard]] bool is_peer_known_seed(tr_torrent_id_t /*tor_id*/, tr_address const& /*addr*/, tr_port const& /*port*/)
const override
[[nodiscard]] bool is_peer_known_seed(
tr_torrent_id_t /*tor_id*/,
std::pair<tr_address, tr_port> const& /*socket_address*/) const override
{
return false;
}
Expand All @@ -100,7 +101,8 @@ class HandshakeTest : public SessionTest
return private_key_;
}

void set_utp_failed(tr_sha1_digest_t const& /*info_hash*/, tr_address const& /*addr*/, tr_port const& /*port*/) override
void set_utp_failed(tr_sha1_digest_t const& /*info_hash*/, std::pair<tr_address, tr_port> const& /*socket_address*/)
override
{
}

Expand Down

0 comments on commit 699b3d8

Please sign in to comment.