Skip to content

Commit

Permalink
Compression for network communication
Browse files Browse the repository at this point in the history
Compressed data is communicated through port 12347.  Communication with localhost is always uncompressed.
  • Loading branch information
Matthew Lam committed May 26, 2022
1 parent f5da65c commit 10c215a
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 39 deletions.
1 change: 1 addition & 0 deletions UI/OptionsWnd.cpp
Expand Up @@ -794,6 +794,7 @@ void OptionsWnd::CompleteConstruction() {
//BoolOption(current_page, 0, "resource.shipdesign.default.enabled", UserString("OPTIONS_ADD_DEFAULT_DESIGNS")); // hidden due to issues with implementation when not enabled preventing designs from being added or recreated
BoolOption(current_page, 0, "save.format.binary.enabled", UserString("OPTIONS_USE_BINARY_SERIALIZATION"));
BoolOption(current_page, 0, "save.format.xml.zlib.enabled", UserString("OPTIONS_USE_XML_ZLIB_SERIALIZATION"));
BoolOption(current_page, 0, "network.message.zlib.enabled", UserString("OPTIONS_USE_MSG_ZLIB_CPR"));
BoolOption(current_page, 0, "ui.map.sitrep.invalid.shown", UserString("OPTIONS_VERBOSE_SITREP_DESC"));
BoolOption(current_page, 0, "effects.accounting.enabled", UserString("OPTIONS_EFFECT_ACCOUNTING"));

Expand Down
35 changes: 30 additions & 5 deletions client/ClientNetworking.cpp
Expand Up @@ -191,6 +191,7 @@ class ClientNetworking::Impl {
expect_timeout is true, timeout is not reported as an error. */
bool ConnectToServer(const ClientNetworking* const self,
const std::string& ip_address,
Networking::MsgCprMode cpr_mode,
const std::chrono::milliseconds& timeout = std::chrono::seconds(10),
bool expect_timeout = false);

Expand Down Expand Up @@ -266,6 +267,8 @@ class ClientNetworking::Impl {
bool m_rx_connected = false; // accessed from multiple threads
bool m_tx_connected = false; // accessed from multiple threads

Networking::MsgCprMode m_cpr_mode;

MessageQueue m_incoming_messages; // accessed from multiple threads, but its interface is threadsafe
std::list<Message> m_outgoing_messages;

Expand Down Expand Up @@ -374,17 +377,29 @@ void ClientNetworking::Impl::LaunchNetworkThread(const ClientNetworking* const s
bool ClientNetworking::Impl::ConnectToServer(
const ClientNetworking* const self,
const std::string& ip_address,
Networking::MsgCprMode cpr_mode,
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/,
bool expect_timeout /*=false*/)
{
TraceLogger(network) << "ClientNetworking::Impl::ConnectToServer(" << self << ", " << ip_address << ", " << timeout.count() << ", " << expect_timeout << ")";
using Clock = std::chrono::high_resolution_clock;
Clock::time_point start_time = Clock::now();

if (ip_address == "127.0.0.1" || ip_address == "localhost") {
m_cpr_mode = Networking::MsgCprMode::NONE;
} else {
m_cpr_mode = cpr_mode;
}

std::string port = std::to_string(Networking::MessagePort());

if (m_cpr_mode == Networking::MsgCprMode::COMPRESSED)
port = std::to_string(Networking::CprMessagePort());

using namespace boost::asio::ip;
tcp::resolver resolver(m_io_context);
tcp::resolver::query query(ip_address,
std::to_string(Networking::MessagePort()),
port,
resolver_query_base::numeric_service);

// Resolve the query - will try to connect on success.
Expand Down Expand Up @@ -437,7 +452,7 @@ bool ClientNetworking::Impl::ConnectToLocalHostServer(
#if FREEORION_WIN32
try {
#endif
retval = ConnectToServer(self, "127.0.0.1", timeout, expect_timeout);
retval = ConnectToServer(self, "127.0.0.1", Networking::MsgCprMode::NONE, timeout, expect_timeout);
#if FREEORION_WIN32
} catch (const boost::system::system_error& e) {
if (e.code().value() != WSAEADDRNOTAVAIL)
Expand Down Expand Up @@ -634,6 +649,8 @@ void ClientNetworking::Impl::HandleMessageBodyRead(const std::shared_ptr<const C

assert(static_cast<int>(bytes_transferred) <= m_incoming_header[Message::Parts::SIZE]);
if (static_cast<int>(bytes_transferred) == m_incoming_header[Message::Parts::SIZE]) {
if (m_cpr_mode == Networking::MsgCprMode::COMPRESSED)
m_incoming_message.Decompress();
m_incoming_messages.PushBack(m_incoming_message);
AsyncReadMessage(keep_alive);
}
Expand Down Expand Up @@ -719,6 +736,8 @@ void ClientNetworking::Impl::AsyncWriteMessage() {

void ClientNetworking::Impl::SendMessageImpl(Message message) {
bool start_write = m_outgoing_messages.empty();
if (m_cpr_mode == Networking::MsgCprMode::COMPRESSED)
message.Compress();
m_outgoing_messages.push_back(std::move(message));
if (start_write)
AsyncWriteMessage();
Expand Down Expand Up @@ -794,8 +813,14 @@ ClientNetworking::ServerNames ClientNetworking::DiscoverLANServerNames()

bool ClientNetworking::ConnectToServer(
const std::string& ip_address,
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/)
{ return m_impl->ConnectToServer(this, ip_address, timeout); }
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/) {
Networking::MsgCprMode cpr_mode = Networking::MsgCprMode::NONE;

if (GetOptionsDB().Get<bool>("network.message.zlib.enabled"))
cpr_mode = Networking::MsgCprMode::COMPRESSED;

return m_impl->ConnectToServer(this, ip_address, cpr_mode, timeout);
}

bool ClientNetworking::ConnectToLocalHostServer(
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/)
Expand All @@ -804,7 +829,7 @@ bool ClientNetworking::ConnectToLocalHostServer(
bool ClientNetworking::PingServer(
const std::string& ip_address,
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/)
{ return m_impl->ConnectToServer(this, ip_address, timeout, true /*expect_timeout*/); }
{ return m_impl->ConnectToServer(this, ip_address, Networking::MsgCprMode::NONE, timeout, true /*expect_timeout*/); }

bool ClientNetworking::PingLocalHostServer(
const std::chrono::milliseconds& timeout/* = std::chrono::seconds(10)*/)
Expand Down
6 changes: 6 additions & 0 deletions default/stringtables/en.txt
Expand Up @@ -2174,6 +2174,9 @@ The server will use Binary serialization for client-server interaction if the cl
OPTIONS_DB_XML_ZLIB_SERIALIZATION
When saving games with XML serialization, compress most of the XML before writing the file. Compression substantially reduces save file sizes, but may make saves unloadable due to memory requirements to decompress the save data.

OPTIONS_DB_USE_MSG_ZLIB_CPR
Compress message data for non-local games.

OPTIONS_DB_UI_WINDOWS_EXISTS
True if the window with a given config name currently exists, false if one was created and deleted, doesn't exist if no window has yet been created with that name.

Expand Down Expand Up @@ -3408,6 +3411,9 @@ Create Binary Save Files
OPTIONS_USE_XML_ZLIB_SERIALIZATION
Use Compression for XML Save Files

OPTIONS_USE_MSG_ZLIB_CPR
Compress transmitted network game data

OPTIONS_CREATE_PERSISTENT_CONFIG
Write Persistent Config File

Expand Down
57 changes: 57 additions & 0 deletions network/Message.cpp
Expand Up @@ -9,6 +9,7 @@
#include "../util/SaveGamePreviewUtils.h"
#include "../universe/Species.h"
#include "../universe/Universe.h"
#include "../util/base64_filter.h"
#include "../util/OptionsDB.h"
#include "../util/OrderSet.h"
#include "../util/Serialize.h"
Expand All @@ -17,6 +18,11 @@
#include "../util/Version.h"
#include <boost/algorithm/string/erase.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/device/back_inserter.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/serialization/deque.hpp>
#include <boost/serialization/list.hpp>
#include <boost/serialization/map.hpp>
Expand Down Expand Up @@ -94,6 +100,57 @@ void Message::Reset() noexcept {
m_message_text.clear();
}

void Message::Compress() noexcept {
std::string_view::size_type temp = m_message_size;
m_message_text = CompressImpl(m_message_text);
m_message_size = m_message_text.size();
}

std::string Message::CompressImpl(const std::string& data) {
boost::iostreams::basic_array_source<char> src(data.c_str(), data.size());
boost::iostreams::stream<boost::iostreams::basic_array_source<char>> s_src(src);

std::string compressed;
compressed.reserve(std::pow(2.0, 26.0));

boost::iostreams::back_insert_device<std::string> cmp(compressed);
boost::iostreams::stream<boost::iostreams::back_insert_device<std::string>> s_cmp(cmp);

boost::iostreams::filtering_ostreambuf out;
out.push(boost::iostreams::zlib_compressor());
out.push(boost::iostreams::base64_encoder());
out.push(s_cmp);
boost::iostreams::copy(s_src, out);
s_cmp.flush();

return compressed;
}

void Message::Decompress() noexcept {
std::string_view::size_type temp = m_message_size;
m_message_text = DecompressImpl(m_message_text);
m_message_size = m_message_text.size();
}

std::string Message::DecompressImpl(const std::string& data) {
boost::iostreams::basic_array_source<char> src(data.c_str(), data.size());
boost::iostreams::stream<boost::iostreams::basic_array_source<char>> s_src(src);

std::string decompressed;
decompressed.reserve(std::pow(2.0, 26.0));

boost::iostreams::back_insert_device<std::string> dcmp(decompressed);
boost::iostreams::stream<boost::iostreams::back_insert_device<std::string>> s_dcmp(dcmp);

boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::zlib_decompressor());
in.push(boost::iostreams::base64_decoder());
in.push(s_src);
boost::iostreams::copy(in, s_dcmp);

return decompressed;
}

bool operator==(const Message& lhs, const Message& rhs) {
return
lhs.Type() == rhs.Type() &&
Expand Down
6 changes: 6 additions & 0 deletions network/Message.h
Expand Up @@ -149,12 +149,18 @@ class FO_COMMON_API Message {
void Swap(Message& rhs) noexcept;///< Swaps the contents of \a *this with \a rhs. Does not throw.
void Reset() noexcept; ///< Reverts message to same state as after default constructor

void Compress() noexcept;
void Decompress() noexcept;

private:
MessageType m_type = MessageType::UNDEFINED;
std::string_view::size_type m_message_size = 0;
std::string m_message_text;

friend FO_COMMON_API void BufferToHeader(const HeaderBuffer&, Message&);

std::string CompressImpl(const std::string& in);
std::string DecompressImpl(const std::string& in);
};

/** Fills in the relevant portions of \a message with the values in the buffer \a buffer. */
Expand Down
3 changes: 3 additions & 0 deletions network/Networking.cpp
Expand Up @@ -6,6 +6,7 @@ namespace {
void AddOptions(OptionsDB& db) {
db.Add("network.discovery.port", UserStringNop("OPTIONS_DB_NETWORK_DISCOVERY_PORT"), 12345, RangedValidator<int>(1025, 65535));
db.Add("network.message.port", UserStringNop("OPTIONS_DB_NETWORK_MESSAGE_PORT"), 12346, RangedValidator<int>(1025, 65535));
db.Add("network.cpr_message.port", UserStringNop("OPTIONS_DB_NETWORK_CPR_MESSAGE_PORT"), 12347, RangedValidator<int>(1025, 65535));
}
bool temp_bool = RegisterOptions(&AddOptions);
}
Expand All @@ -24,6 +25,8 @@ namespace Networking {
{ return GetOptionsDB().Get<int>("network.discovery.port"); }
int MessagePort()
{ return GetOptionsDB().Get<int>("network.message.port"); }
int CprMessagePort()
{ return GetOptionsDB().Get<int>("network.cpr_message.port"); }

AuthRoles::AuthRoles(const std::initializer_list<RoleType>& roles) {
for (RoleType r : roles)
Expand Down
6 changes: 6 additions & 0 deletions network/Networking.h
Expand Up @@ -15,6 +15,12 @@ namespace Networking {

FO_COMMON_API int DiscoveryPort();
FO_COMMON_API int MessagePort();
FO_COMMON_API int CprMessagePort();

enum class MsgCprMode : unsigned char {
NONE = 0,
COMPRESSED
};

enum class ClientType : signed char {
INVALID_CLIENT_TYPE = -1,
Expand Down

0 comments on commit 10c215a

Please sign in to comment.