Skip to content

Commit

Permalink
Implement send_doc_queued in terms of coroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
loonycyborg committed Dec 6, 2020
1 parent 9b410e6 commit 1b0a89e
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 85 deletions.
10 changes: 7 additions & 3 deletions src/server/campaignd/server.cpp
Expand Up @@ -425,9 +425,13 @@ std::ostream& operator<<(std::ostream& o, const server::request& r)

void server::handle_new_client(socket_ptr socket)
{
async_receive_doc(socket,
std::bind(&server::handle_request, this, std::placeholders::_1, std::placeholders::_2)
);
boost::asio::spawn(io_service_, [this, socket](boost::asio::yield_context yield) {
boost::system::error_code ec;
auto doc { coro_receive_doc(socket, yield[ec]) };
if(check_error(ec, socket) || !doc) return;

handle_request(socket, doc);
});
}

void server::handle_request(socket_ptr socket, std::shared_ptr<simple_wml::document> doc)
Expand Down
13 changes: 9 additions & 4 deletions src/server/common/send_receive_wml_helpers.ipp
Expand Up @@ -35,11 +35,11 @@
#include <memory>
#include <stdexcept>

inline void coro_send_doc(socket_ptr socket, simple_wml::document& doc, boost::asio::yield_context yield)
inline void coro_send_doc(socket_ptr socket, std::shared_ptr<simple_wml::document> doc, boost::asio::yield_context yield)
{
try {
std::unique_ptr<simple_wml::document> doc_copy{ doc.clone() };
simple_wml::string_span s = doc_copy->output_compressed();
//std::unique_ptr<simple_wml::document> doc_copy{ doc.clone() };
simple_wml::string_span s = doc->output_compressed();

union DataSize
{
Expand All @@ -60,6 +60,11 @@ inline void coro_send_doc(socket_ptr socket, simple_wml::document& doc, boost::a
}
}

inline void coro_send_doc(socket_ptr socket, simple_wml::document& doc, boost::asio::yield_context yield)
{
coro_send_doc(socket, std::shared_ptr<simple_wml::document>{ doc.clone() }, yield);
}

template<typename Handler, typename ErrorHandler>
struct handle_doc
{
Expand Down Expand Up @@ -345,7 +350,7 @@ inline std::shared_ptr<simple_wml::document> coro_receive_doc(socket_ptr socket,
ERR_SERVER <<
client_address(socket) <<
"\tsimple_wml error in received data: " << e.message << std::endl;
async_send_error(socket, "Invalid WML received: " + e.message);
//async_send_error(socket, "Invalid WML received: " + e.message);
return {};
}
}
Expand Down
44 changes: 21 additions & 23 deletions src/server/common/server_base.cpp
Expand Up @@ -193,7 +193,7 @@ bool check_error(const boost::system::error_code& error, socket_ptr socket)

namespace {

void info_table_into_simple_wml(simple_wml::document& doc, const std::string& parent_name, const info_table& info)
void info_table_into_simple_wml(simple_wml::document& doc, const std::string& parent_name, const server_base::info_table& info)
{
if(info.empty()) {
return;
Expand All @@ -207,31 +207,29 @@ void info_table_into_simple_wml(simple_wml::document& doc, const std::string& pa

}

using SendQueue = std::map<socket_ptr, std::queue<std::shared_ptr<simple_wml::document>>>;
SendQueue send_queue;

static void handle_async_send_doc_queued(socket_ptr socket)
void server_base::async_send_doc_queued(socket_ptr socket, simple_wml::document& doc)
{
if(send_queue[socket].empty()) {
send_queue.erase(socket);
} else {
async_send_doc(socket, *(send_queue[socket].front()), handle_async_send_doc_queued, handle_async_send_doc_queued);
send_queue[socket].pop();
}
}
static boost::asio::strand<boost::asio::io_context::executor_type> strand { io_service_.get_executor() };
std::shared_ptr<simple_wml::document> doc_ptr { doc.clone() };

void async_send_doc_queued(socket_ptr socket, simple_wml::document& doc)
{
auto iter = send_queue.find(socket);
if(iter == send_queue.end()) {
send_queue[socket];
async_send_doc(socket, doc, handle_async_send_doc_queued, handle_async_send_doc_queued);
} else {
send_queue[socket].emplace(doc.clone());
}
boost::asio::spawn(strand, [doc_ptr, socket](boost::asio::yield_context yield)
{
static std::map<socket_ptr, std::queue<std::shared_ptr<simple_wml::document>>> queues;

queues[socket].emplace(doc_ptr);
if(queues[socket].size() > 1) {
return;
}

while(queues[socket].size() > 0) {
coro_send_doc(socket, queues[socket].front(), yield);
queues[socket].pop();
}
queues.erase(socket);
});
}

void async_send_error(socket_ptr socket, const std::string& msg, const char* error_code, const info_table& info)
void server_base::async_send_error(socket_ptr socket, const std::string& msg, const char* error_code, const info_table& info)
{
simple_wml::document doc;
doc.root().add_child("error").set_attr_dup("message", msg.c_str());
Expand All @@ -243,7 +241,7 @@ void async_send_error(socket_ptr socket, const std::string& msg, const char* err
async_send_doc_queued(socket, doc);
}

void async_send_warning(socket_ptr socket, const std::string& msg, const char* warning_code, const info_table& info)
void server_base::async_send_warning(socket_ptr socket, const std::string& msg, const char* warning_code, const info_table& info)
{
simple_wml::document doc;
doc.root().add_child("warning").set_attr_dup("message", msg.c_str());
Expand Down
12 changes: 6 additions & 6 deletions src/server/common/server_base.hpp
Expand Up @@ -42,6 +42,12 @@ class server_base
virtual ~server_base() {}
void run();

void async_send_doc_queued(socket_ptr socket, simple_wml::document& doc);

typedef std::map<std::string, std::string> info_table;
void async_send_error(socket_ptr socket, const std::string& msg, const char* error_code = "", const info_table& info = {});
void async_send_warning(socket_ptr socket, const std::string& msg, const char* warning_code = "", const info_table& info = {});

protected:
unsigned short port_;
bool keep_alive_;
Expand Down Expand Up @@ -78,9 +84,3 @@ class server_base

std::string client_address(socket_ptr socket);
bool check_error(const boost::system::error_code& error, socket_ptr socket);

void async_send_doc_queued(socket_ptr socket, simple_wml::document& doc);

typedef std::map<std::string, std::string> info_table;
void async_send_error(socket_ptr socket, const std::string& msg, const char* error_code = "", const info_table& info = {});
void async_send_warning(socket_ptr socket, const std::string& msg, const char* warning_code = "", const info_table& info = {});
62 changes: 32 additions & 30 deletions src/server/wesnothd/game.cpp
Expand Up @@ -73,15 +73,6 @@ std::vector<TResult> split(const simple_wml::string_span& val, TConvert conv, co

namespace wesnothd
{
template<typename Container>
void send_to_players(simple_wml::document& data, const Container& players, socket_ptr exclude = socket_ptr())
{
for(const auto& player : players) {
if(player != exclude) {
async_send_doc_queued(player, data);
}
}
}

int game::id_num = 1;
int game::db_id_num = 1;
Expand All @@ -92,12 +83,13 @@ void game::missing_user(socket_ptr /*socket*/, const std::string& func) const
<< ") in player_info_ in game:\t\"" << name_ << "\" (" << id_ << ", " << db_id_ << ")\n";
}

game::game(player_connections& player_connections,
game::game(wesnothd::server& server, player_connections& player_connections,
const socket_ptr& host,
const std::string& name,
bool save_replays,
const std::string& replay_save_path)
: player_connections_(player_connections)
: server(server)
, player_connections_(player_connections)
, id_(id_num++)
, db_id_(db_id_num++)
, name_(name)
Expand Down Expand Up @@ -391,7 +383,7 @@ bool game::send_taken_side(simple_wml::document& cfg, const simple_wml::node* si
cfg.root().set_attr_dup("side", (*side)["side"]);

// Tell the host which side the new player should take.
async_send_doc_queued(owner_, cfg);
server.async_send_doc_queued(owner_, cfg);
return true;
}

Expand Down Expand Up @@ -653,7 +645,7 @@ void game::change_controller(
// side_drop already.)
if(!player_left) {
change.set_attr("is_local", "yes");
async_send_doc_queued(sock, response);
server.async_send_doc_queued(sock, response);
}
}

Expand All @@ -664,7 +656,7 @@ void game::notify_new_host()
cfg.root().add_child("host_transfer");

std::string message = owner_name + " has been chosen as the new host.";
async_send_doc_queued(owner_, cfg);
server.async_send_doc_queued(owner_, cfg);
send_and_record_server_message(message);
}

Expand Down Expand Up @@ -806,7 +798,7 @@ void game::unmute_observer(const simple_wml::node& unmute, const socket_ptr& unm
void game::send_leave_game(const socket_ptr& user) const
{
static simple_wml::document leave_game("[leave_game]\n[/leave_game]\n", simple_wml::INIT_COMPRESSED);
async_send_doc_queued(user, leave_game);
server.async_send_doc_queued(user, leave_game);
}

socket_ptr game::kick_member(const simple_wml::node& kick, const socket_ptr& kicker)
Expand Down Expand Up @@ -1226,7 +1218,7 @@ void game::handle_controller_choice(const simple_wml::node& req)

// Calling send_to_one to 0 connect causes the package to be sent to all clients.
if(sides_[side_index] != 0) {
async_send_doc_queued(sides_[side_index], *mdata);
server.async_send_doc_queued(sides_[side_index], *mdata);
}

change_controller_wml.set_attr("is_local", "no");
Expand Down Expand Up @@ -1438,12 +1430,12 @@ bool game::add_player(const socket_ptr& player, bool observer)
DBG_GAME << debug_player_info();

// Send the user the game data.
async_send_doc_queued(player, level_);
server.async_send_doc_queued(player, level_);

if(started_) {
// Tell this player that the game has started
static simple_wml::document start_game_doc("[start_game]\n[/start_game]\n", simple_wml::INIT_COMPRESSED);
async_send_doc_queued(player, start_game_doc);
server.async_send_doc_queued(player, start_game_doc);

// Send observer join of all the observers in the game to the new player
// only once the game started. The client forgets about it anyway otherwise.
Expand Down Expand Up @@ -1571,7 +1563,7 @@ bool game::remove_player(const socket_ptr& player, const bool disconnect, const

DBG_GAME << "*** sending side drop: \n" << drop.output() << std::endl;

async_send_doc_queued(owner_, drop);
server.async_send_doc_queued(owner_, drop);
}

if(ai_transfer) {
Expand All @@ -1584,7 +1576,7 @@ bool game::remove_player(const socket_ptr& player, const bool disconnect, const
return false;
}

void game::send_user_list(const socket_ptr& exclude) const
void game::send_user_list(const socket_ptr& exclude)
{
// If the game hasn't started yet, then send all players a list of the users in the game.
if(started_ /*|| description_ == nullptr*/) {
Expand Down Expand Up @@ -1663,8 +1655,8 @@ void game::load_next_scenario(const socket_ptr& user)
cfg_controller.set_attr("is_local", side_user == user ? "yes" : "no");
}

async_send_doc_queued(user, cfg_scenario);
async_send_doc_queued(user, doc_controllers);
server.async_send_doc_queued(user, cfg_scenario);
server.async_send_doc_queued(user, doc_controllers);

players_not_advanced_.erase(user);

Expand All @@ -1675,7 +1667,17 @@ void game::load_next_scenario(const socket_ptr& user)
send_observerjoins(user);
}

void game::send_data(simple_wml::document& data, const socket_ptr& exclude, std::string /*packet_type*/) const
template<typename Container>
void game::send_to_players(simple_wml::document& data, const Container& players, socket_ptr exclude)
{
for(const auto& player : players) {
if(player != exclude) {
server.async_send_doc_queued(player, data);
}
}
}

void game::send_data(simple_wml::document& data, const socket_ptr& exclude, std::string /*packet_type*/)
{
send_to_players(data, all_game_users(), exclude);
}
Expand Down Expand Up @@ -1703,7 +1705,7 @@ struct controls_side_helper
void game::send_data_sides(simple_wml::document& data,
const simple_wml::string_span& sides,
const socket_ptr& exclude,
std::string /*packet_type*/) const
std::string /*packet_type*/)
{
std::vector<int> sides_vec = ::split<int>(sides, ::split_conv_impl());

Expand Down Expand Up @@ -1749,7 +1751,7 @@ std::string game::has_same_ip(const socket_ptr& user) const
return clones;
}

void game::send_observerjoins(const socket_ptr& sock) const
void game::send_observerjoins(const socket_ptr& sock)
{
for(const socket_ptr& ob : observers_) {
if(ob == sock) {
Expand All @@ -1764,12 +1766,12 @@ void game::send_observerjoins(const socket_ptr& sock) const
send_data(cfg, ob);
} else {
// Send to the (new) user.
async_send_doc_queued(sock, cfg);
server.async_send_doc_queued(sock, cfg);
}
}
}

void game::send_observerquit(const socket_ptr& observer) const
void game::send_observerquit(const socket_ptr& observer)
{
simple_wml::document observer_quit;

Expand Down Expand Up @@ -1799,7 +1801,7 @@ void game::send_history(const socket_ptr& socket) const
simple_wml::document* doc = new simple_wml::document(buf.c_str(), simple_wml::INIT_STATIC);
doc->compress();

async_send_doc_queued(socket, *doc);
server.async_send_doc_queued(socket, *doc);

history_.clear();
history_.push_back(doc);
Expand Down Expand Up @@ -2014,7 +2016,7 @@ void game::send_and_record_server_message(const char* message, const socket_ptr&
}
}

void game::send_server_message_to_all(const char* message, const socket_ptr& exclude) const
void game::send_server_message_to_all(const char* message, const socket_ptr& exclude)
{
simple_wml::document doc;
send_server_message(message, socket_ptr(), &doc);
Expand Down Expand Up @@ -2046,7 +2048,7 @@ void game::send_server_message(const char* message, const socket_ptr& sock, simp
}

if(sock) {
async_send_doc_queued(sock, doc);
server.async_send_doc_queued(sock, doc);
}
}

Expand Down

0 comments on commit 1b0a89e

Please sign in to comment.