Skip to content

Commit

Permalink
refactor: add virtual stream base class
Browse files Browse the repository at this point in the history
Add virtual stream base class.

Signed-off-by: Melg Eight <public.melg8@gmail.com>
  • Loading branch information
melg8 committed May 14, 2024
1 parent 8c32b4f commit f3d9d35
Showing 1 changed file with 58 additions and 24 deletions.
82 changes: 58 additions & 24 deletions sources/coal/application/sources/websocket_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <websocket_client.h>

#include <universal_declarations.h>

#include <spdlog/spdlog.h>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
Expand All @@ -29,7 +31,61 @@ using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
static constexpr auto nothrow_use_op =
boost::asio::as_tuple(boost::cobalt::use_op);

//------------------------------------------------------------------------------
class Stream {
public:
virtual ~Stream() = default;

virtual cobalt::task<bool> AsyncRead(beast::flat_buffer&) = 0;
virtual cobalt::task<bool> AsyncWrite(const net::const_buffer&) = 0;
virtual cobalt::task<bool> AsyncClose() = 0;
};

class WebsocketOverTcpStream : public Stream {
public:
WebsocketOverTcpStream(websocket::stream<beast::tcp_stream>&& stream)
: stream_(std::move(stream)) {}

virtual cobalt::task<bool> AsyncRead(beast::flat_buffer& buffer) {
const auto [err, _1] = co_await stream_.async_read(buffer, nothrow_use_op);
co_return !err;
}

virtual cobalt::task<bool> AsyncWrite(const net::const_buffer& buffer) {
const auto [err, _1] = co_await stream_.async_write(buffer, nothrow_use_op);
co_return !err;
}

virtual cobalt::task<bool> AsyncClose() {
const auto [err] = co_await stream_.async_close(
websocket::close_code::normal, nothrow_use_op);
co_return !err;
}

websocket::stream<beast::tcp_stream> stream_;
};

[[nodiscard]] static cobalt::task<void> UseStream(Stream&& stream,
std::string host,
std::string text) {
spdlog::info("Sending message to websocket: {}", text);
if (!co_await stream.AsyncWrite(net::buffer(text))) {
spdlog::error("Error writing into web socket {}", host);
co_return;
}

beast::flat_buffer buffer;
if (!co_await stream.AsyncRead(buffer)) {
spdlog::error("Error reading from web socket {}", host);
co_return;
}
spdlog::info("Got response from websocket size: {}", buffer.data().size());

if (!co_await stream.AsyncClose()) {
spdlog::error("Can't close gracefully connection to {}", host);
co_return;
}
spdlog::info("Websocket connection closed gracefuly with {}", host);
}

// Sends a WebSocket message and prints the response
cobalt::task<void> DoSession(std::string host,
Expand Down Expand Up @@ -82,29 +138,7 @@ cobalt::task<void> DoSession(std::string host,
co_return;
}

spdlog::info("Sending message to websocket: {}", text);
const auto [write_err, _1] =
co_await ws.async_write(net::buffer(std::string(text)), nothrow_use_op);
if (write_err) {
spdlog::error("Error writing into web socket {}", host);
co_return;
}

beast::flat_buffer buffer;
const auto [read_err, _2] = co_await ws.async_read(buffer, nothrow_use_op);
if (read_err) {
spdlog::error("Error reading from web socket {}", host);
co_return;
}
spdlog::info("Got response from websocket size: {}", buffer.data().size());

const auto [close_err] =
co_await ws.async_close(websocket::close_code::normal, nothrow_use_op);
if (close_err) {
spdlog::error("Can't close gracefully connection to {}", host);
co_return;
}
spdlog::info("Websocket connection closed gracefuly with {}", host);
co_await UseStream(WebsocketOverTcpStream(std::move(ws)), host, text);
}

} // namespace coal

0 comments on commit f3d9d35

Please sign in to comment.