Skip to content

Commit

Permalink
Hide subscr key in protocol base, use notity_one for starve.
Browse files Browse the repository at this point in the history
  • Loading branch information
evoskuil committed May 11, 2024
1 parent b2480eb commit ac32fee
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 42 deletions.
4 changes: 2 additions & 2 deletions include/bitcoin/node/chase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ enum class chase
/// Issued by 'full_node' and handled by 'observer'.
suspend,

/// Channel starved for block download identifiers (channel_t).
/// Channel starved for work (object_t).
/// Issued by 'block_in_31800' and handled by 'session_outbound'.
starved,

Expand All @@ -61,7 +61,7 @@ enum class chase
/// Issued by 'check' and handled by 'block_in_31800'.
purge,

/// Channels (all) directed to write hash count to the log (count_t).
/// Channels (all) directed to write work count to the log (count_t).
/// Issued by 'executore' and handled by 'block_in_31800'.
report,

Expand Down
1 change: 1 addition & 0 deletions include/bitcoin/node/define.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ typedef event_subscriber::completer event_completer;
using count_t = size_t;
using height_t = size_t;
using channel_t = uint64_t;
using object_t = object_key;
using header_t = database::header_link::integer;
using transaction_t = database::tx_link::integer;

Expand Down
13 changes: 11 additions & 2 deletions include/bitcoin/node/protocols/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BCN_API protocol
/// Events.
/// -----------------------------------------------------------------------

/// Subscribe to chaser events.
/// Subscribe to chaser events (only once).
virtual void subscribe_events(event_notifier&& handler,
event_completer&& complete) NOEXCEPT;

Expand All @@ -87,7 +87,10 @@ class BCN_API protocol
event_value value) NOEXCEPT;

/// Unsubscribe from chaser events.
virtual void unsubscribe_events(object_key key) NOEXCEPT;
virtual void unsubscribe_events() NOEXCEPT;

/// Get the subscription key (for notify_one).
virtual object_key events_key() const NOEXCEPT;

/// Properties.
/// -----------------------------------------------------------------------
Expand All @@ -102,8 +105,14 @@ class BCN_API protocol
virtual bool is_current() const NOEXCEPT;

private:
void handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT;

// This is thread safe.
const session::ptr session_;

// This is protected by singular subscription.
object_key key_{};
};

} // namespace node
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/node/protocols/protocol_block_in_31800.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class BCN_API protocol_block_in_31800
void restore(const map_ptr& map) NOEXCEPT;
void handle_put_hashes(const code& ec, size_t count) NOEXCEPT;
void handle_get_hashes(const code& ec, const map_ptr& map) NOEXCEPT;
void do_complete_event(const code& ec, object_key key) NOEXCEPT;
void do_complete_event(const code& ec) NOEXCEPT;

// This is thread safe.
const network::messages::inventory::type_id block_type_;
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/node/protocols/protocol_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class BCN_API protocol_observer
event_value value) NOEXCEPT;

private:
void do_complete_event(const code& ec, object_key key) NOEXCEPT;
void do_complete_event(const code& e) NOEXCEPT;

// This is protected by strand.
object_key key_{};
Expand Down
8 changes: 4 additions & 4 deletions include/bitcoin/node/sessions/session_outbound.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class BCN_API session_outbound
session_outbound(full_node& node, uint64_t identifier) NOEXCEPT;

void start(network::result_handler&& handler) NOEXCEPT override;
void performance(uint64_t channel, uint64_t speed,
void performance(object_key channel, uint64_t speed,
network::result_handler&& handler) NOEXCEPT override;

protected:
virtual bool handle_event(const code& ec, chase event_,
event_value value) NOEXCEPT;
virtual void split(channel_t self) NOEXCEPT;
virtual void do_performance(uint64_t channel, uint64_t speed,
virtual void split(object_t self) NOEXCEPT;
virtual void do_performance(object_key channel, uint64_t speed,
const network::result_handler& handler) NOEXCEPT;

private:
Expand All @@ -54,7 +54,7 @@ class BCN_API session_outbound
const float allowed_deviation_;

// This is protected by strand.
std::unordered_map<uint64_t, double> speeds_{};
std::unordered_map<object_key, double> speeds_{};
};

} // namespace node
Expand Down
4 changes: 2 additions & 2 deletions src/chasers/chaser_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ void chaser_snapshot::do_snapshot(size_t height) NOEXCEPT

if (const auto ec = snapshot([&](auto, auto) NOEXCEPT
{
LOGN("SNAPSHOT PROGRESS: " << height);
LOGA("SNAPSHOT PROGRESS: " << height);
}))
{
LOGN("SNAPSHOT ERROR: " << ec.message());
LOGA("SNAPSHOT ERROR: " << ec.message());
}
}

Expand Down
29 changes: 26 additions & 3 deletions src/protocols/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,40 @@ void protocol::put_hashes(const map_ptr& map,
void protocol::subscribe_events(event_notifier&& handler,
event_completer&& complete) NOEXCEPT
{
session_->subscribe_events(std::move(handler), std::move(complete));
session_->subscribe_events(std::move(handler),
BIND(handle_subscribe, _1, _2, std::move(complete)));
}

void protocol::notify(const code& ec, chase event_, event_value value) NOEXCEPT
{
session_->notify(ec, event_, value);
}

void protocol::unsubscribe_events(object_key key) NOEXCEPT
// As this has no completion handler resubscription is not allowed.
void protocol::unsubscribe_events() NOEXCEPT
{
session_->unsubscribe_events(key);
session_->unsubscribe_events(key_);
key_ = {};
}

// private
void protocol::handle_subscribe(const code& ec, object_key key,
const event_completer& complete) NOEXCEPT
{
// Protocol stop is thread safe.
if (ec)
{
stop(ec);
return;
}

key_ = key;
complete(ec, key_);
}

object_key protocol::events_key() const NOEXCEPT
{
return key_;
}

// Methods.
Expand Down
29 changes: 13 additions & 16 deletions src/protocols/protocol_block_in_31800.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,23 @@ void protocol_block_in_31800::start() NOEXCEPT

// protected
void protocol_block_in_31800::complete_event(const code& ec,
object_key key) NOEXCEPT
object_key) NOEXCEPT
{
POST(do_complete_event, ec, key);
if (stopped(ec))
return;

POST(do_complete_event, ec);
}

// private
void protocol_block_in_31800::do_complete_event(const code&,
object_key key) NOEXCEPT
void protocol_block_in_31800::do_complete_event(const code& ec) NOEXCEPT
{
BC_ASSERT(stranded());
key_ = key;

// stopped() is true before stopping() is called (by base).
if (stopped())
if (stopped(ec))
{
unsubscribe_events(key_);
unsubscribe_events();
return;
}

Expand All @@ -90,7 +91,7 @@ void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT
restore(map_);
map_ = chaser_check::empty_map();
stop_performance();
unsubscribe_events(key_);
unsubscribe_events();
protocol::stopping(ec);
}

Expand All @@ -112,13 +113,9 @@ bool protocol_block_in_31800::handle_event(const code&,
{
case chase::split:
{
// TODO: remove condition once notify_one is used for chase::split.
// If value identifies this channel (slowest), split work and stop.
if (possible_narrow_cast<channel_t>(value) == identifier())
{
POST(do_split, channel_t{});
}

// chase::split is posted by notify_one() using subscription key.
// That key is not the channel identifier, but it's this channel.
POST(do_split, identifier());
break;
}
case chase::stall:
Expand Down Expand Up @@ -407,7 +404,7 @@ void protocol_block_in_31800::handle_get_hashes(const code& ec,

if (map->empty())
{
notify(error::success, chase::starved, identifier());
notify(error::success, chase::starved, events_key());
return;
}

Expand Down
12 changes: 5 additions & 7 deletions src/protocols/protocol_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@ void protocol_observer::start() NOEXCEPT
// protected
void protocol_observer::complete_event(const code& ec, object_key key) NOEXCEPT
{
POST(do_complete_event, ec, key);
POST(do_complete_event, ec);
}

// private
void protocol_observer::do_complete_event(const code&,
object_key key) NOEXCEPT
void protocol_observer::do_complete_event(const code& ec) NOEXCEPT
{
BC_ASSERT(stranded());
key_ = key;

// stopped() is true before stopping() is called (by base).
if (stopped())
if (stopped(ec))
{
unsubscribe_events(key_);
unsubscribe_events();
return;
}
}
Expand All @@ -69,7 +67,7 @@ void protocol_observer::do_complete_event(const code&,
void protocol_observer::stopping(const code& ec) NOEXCEPT
{
BC_ASSERT(stranded());
unsubscribe_events(key_);
unsubscribe_events();
protocol::stopping(ec);
}

Expand Down
9 changes: 5 additions & 4 deletions src/sessions/session_outbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ bool session_outbound::handle_event(const code&,
case chase::starved:
{
// When a channel becomes starved notify other(s) to split work.
split(possible_narrow_cast<channel_t>(value));
split(possible_narrow_cast<object_t>(value));
break;
}
case chase::stop:
Expand All @@ -98,7 +98,9 @@ bool session_outbound::handle_event(const code&,
return true;
}

void session_outbound::split(channel_t self) NOEXCEPT
// object_key is used instead of channel identifier because the event
// subscriber supports objects other than channels.
void session_outbound::split(object_t self) NOEXCEPT
{
BC_ASSERT(stranded());

Expand All @@ -120,8 +122,7 @@ void session_outbound::split(channel_t self) NOEXCEPT
const auto channel = slowest->first;
speeds_.erase(slowest);

// TODO: use notify_one and subscription key to identify channel.
node::session::notify(error::success, chase::split, channel);
node::session::notify_one(self, error::success, chase::split, channel);
return;
}

Expand Down

0 comments on commit ac32fee

Please sign in to comment.