Skip to content

Commit

Permalink
[FOLD]
Browse files Browse the repository at this point in the history
  • Loading branch information
vinniefalco committed Oct 24, 2016
1 parent 9460b22 commit 14152b9
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BEAST_WEBSOCKET_IMPL_READ_FRAME_OP_HPP
#define BEAST_WEBSOCKET_IMPL_READ_FRAME_OP_HPP
#ifndef BEAST_WEBSOCKET_IMPL_READ_IPP
#define BEAST_WEBSOCKET_IMPL_READ_IPP

#include <beast/websocket/teardown.hpp>
#include <beast/core/buffer_concepts.hpp>
#include <beast/core/handler_alloc.hpp>
#include <beast/core/prepare_buffers.hpp>
#include <beast/core/static_streambuf.hpp>
#include <beast/core/stream_concepts.hpp>
#include <boost/assert.hpp>
#include <boost/optional.hpp>
#include <memory>

namespace beast {
namespace websocket {

//------------------------------------------------------------------------------

// Reads a single message frame,
// processes any received control frames.
//
Expand Down Expand Up @@ -541,6 +545,368 @@ upcall:
d.h(ec);
}

template<class NextLayer>
template<class DynamicBuffer, class ReadHandler>
typename async_completion<
ReadHandler, void(error_code)>::result_type
stream<NextLayer>::
async_read_frame(frame_info& fi,
DynamicBuffer& dynabuf, ReadHandler&& handler)
{
static_assert(is_AsyncStream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
beast::async_completion<
ReadHandler, void(error_code)> completion(handler);
read_frame_op<DynamicBuffer, decltype(completion.handler)>{
completion.handler, *this, fi, dynabuf};
return completion.result.get();
}

template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
read_frame(frame_info& fi, DynamicBuffer& dynabuf)
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
error_code ec;
read_frame(fi, dynabuf, ec);
if(ec)
throw system_error{ec};
}

template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
close_code::value code{};
for(;;)
{
if(rd_need_ == 0)
{
// read header
detail::frame_streambuf fb;
do_read_fh(fb, code, ec);
failed_ = ec != 0;
if(failed_)
return;
if(code != close_code::none)
break;
if(detail::is_control(rd_fh_.op))
{
// read control payload
if(rd_fh_.len > 0)
{
auto const mb = fb.prepare(
static_cast<std::size_t>(rd_fh_.len));
fb.commit(boost::asio::read(stream_, mb, ec));
failed_ = ec != 0;
if(failed_)
return;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
fb.commit(static_cast<std::size_t>(rd_fh_.len));
}
if(rd_fh_.op == opcode::ping)
{
ping_data data;
detail::read(data, fb.data());
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
continue;
}
else if(rd_fh_.op == opcode::pong)
{
ping_data payload;
detail::read(payload, fb.data());
if(pong_cb_)
pong_cb_(payload);
continue;
}
BOOST_ASSERT(rd_fh_.op == opcode::close);
{
detail::read(cr_, fb.data(), code);
if(code != close_code::none)
break;
if(! wr_close_)
{
auto cr = cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
fb.reset();
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
}
break;
}
}
if(rd_need_ == 0 && ! rd_fh_.fin)
{
// empty frame
continue;
}
}
// read payload
auto smb = dynabuf.prepare(
detail::clamp(rd_need_));
auto const bytes_transferred =
stream_.read_some(smb, ec);
failed_ = ec != 0;
if(failed_)
return;
rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, smb);
if(rd_fh_.mask)
detail::mask_inplace(pb, rd_key_);
if(rd_opcode_ == opcode::text)
{
if(! rd_utf8_check_.write(pb) ||
(rd_need_ == 0 && rd_fh_.fin &&
! rd_utf8_check_.finish()))
{
code = close_code::bad_payload;
break;
}
}
dynabuf.commit(bytes_transferred);
fi.op = rd_opcode_;
fi.fin = rd_fh_.fin && rd_need_ == 0;
return;
}
if(code != close_code::none)
{
// Fail the connection (per rfc6455)
if(! wr_close_)
{
wr_close_ = true;
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
}
websocket_helpers::call_teardown(next_layer(), ec);
failed_ = ec != 0;
if(failed_)
return;
ec = error::failed;
failed_ = true;
return;
}
if(! ec)
websocket_helpers::call_teardown(next_layer(), ec);
if(! ec)
ec = error::closed;
failed_ = ec != 0;
}

//------------------------------------------------------------------------------

// read an entire message
//
template<class NextLayer>
template<class DynamicBuffer, class Handler>
class stream<NextLayer>::read_op
{
using alloc_type =
handler_alloc<char, Handler>;

struct data
{
stream<NextLayer>& ws;
opcode& op;
DynamicBuffer& db;
Handler h;
frame_info fi;
bool cont;
int state = 0;

template<class DeducedHandler>
data(DeducedHandler&& h_,
stream<NextLayer>& ws_, opcode& op_,
DynamicBuffer& sb_)
: ws(ws_)
, op(op_)
, db(sb_)
, h(std::forward<DeducedHandler>(h_))
, cont(boost_asio_handler_cont_helpers::
is_continuation(h))
{
}
};

std::shared_ptr<data> d_;

public:
read_op(read_op&&) = default;
read_op(read_op const&) = default;

template<class DeducedHandler, class... Args>
read_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(std::allocate_shared<data>(alloc_type{h},
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
{
(*this)(error_code{}, false);
}

void operator()(
error_code const& ec, bool again = true);

friend
void* asio_handler_allocate(
std::size_t size, read_op* op)
{
return boost_asio_handler_alloc_helpers::
allocate(size, op->d_->h);
}

friend
void asio_handler_deallocate(
void* p, std::size_t size, read_op* op)
{
return boost_asio_handler_alloc_helpers::
deallocate(p, size, op->d_->h);
}

friend
bool asio_handler_is_continuation(read_op* op)
{
return op->d_->cont;
}

template<class Function>
friend
void asio_handler_invoke(Function&& f, read_op* op)
{
return boost_asio_handler_invoke_helpers::
invoke(f, op->d_->h);
}
};

template<class NextLayer>
template<class DynamicBuffer, class Handler>
void
stream<NextLayer>::read_op<DynamicBuffer, Handler>::
operator()(error_code const& ec, bool again)
{
auto& d = *d_;
d.cont = d.cont || again;
while(! ec)
{
switch(d.state)
{
case 0:
// read payload
d.state = 1;
#if 0
// VFALCO This causes dereference of null, because
// the handler is moved from the data block
// before asio_handler_deallocate is called.
d.ws.async_read_frame(
d.fi, d.db, std::move(*this));
#else
d.ws.async_read_frame(d.fi, d.db, *this);
#endif
return;

// got payload
case 1:
d.op = d.fi.op;
if(d.fi.fin)
goto upcall;
d.state = 0;
break;
}
}
upcall:
d.h(ec);
}

template<class NextLayer>
template<class DynamicBuffer, class ReadHandler>
typename async_completion<
ReadHandler, void(error_code)>::result_type
stream<NextLayer>::
async_read(opcode& op,
DynamicBuffer& dynabuf, ReadHandler&& handler)
{
static_assert(is_AsyncStream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
beast::async_completion<
ReadHandler, void(error_code)
> completion(handler);
read_op<DynamicBuffer, decltype(completion.handler)>{
completion.handler, *this, op, dynabuf};
return completion.result.get();
}

template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
read(opcode& op, DynamicBuffer& dynabuf)
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
error_code ec;
read(op, dynabuf, ec);
if(ec)
throw system_error{ec};
}

template<class NextLayer>
template<class DynamicBuffer>
void
stream<NextLayer>::
read(opcode& op, DynamicBuffer& dynabuf, error_code& ec)
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
frame_info fi;
for(;;)
{
read_frame(fi, dynabuf, ec);
if(ec)
break;
op = fi.op;
if(fi.fin)
break;
}
}

//------------------------------------------------------------------------------

} // websocket
} // beast

Expand Down
Loading

0 comments on commit 14152b9

Please sign in to comment.