Skip to content

Commit

Permalink
Merge 9fc5168 into d717ecd
Browse files Browse the repository at this point in the history
  • Loading branch information
mdavidsaver committed May 18, 2022
2 parents d717ecd + 9fc5168 commit fe6933e
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
{
(void)evbuffer_drain(current->txBody.get(), evbuffer_get_length(current->txBody.get()));

EvOutBuf R(hostBE, current->txBody.get());
EvOutBuf R(current->sendBE, current->txBody.get());

to_wire(R, sid);
to_wire(R, cid);
Expand Down
10 changes: 5 additions & 5 deletions src/clientconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void Connection::createChannels()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

to_wire(R, uint16_t(1u));
to_wire(R, chan->cid);
Expand All @@ -90,7 +90,7 @@ void Connection::sendDestroyRequest(uint32_t sid, uint32_t ioid)
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

to_wire(R, sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -232,7 +232,7 @@ void Connection::handle_CONNECTION_VALIDATION()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

// serverReceiveBufferSize, not used
to_wire(R, uint32_t(0x10000));
Expand Down Expand Up @@ -316,7 +316,7 @@ void Connection::handle_CREATE_CHANNEL()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());
to_wire(R, sid);
to_wire(R, cid);
}
Expand Down Expand Up @@ -400,7 +400,7 @@ void Connection::tickEcho()

auto tx = bufferevent_get_output(bev.get());

to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, hostBE);
to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, sendBE);

// maybe help reduce latency
bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH);
Expand Down
8 changes: 5 additions & 3 deletions src/clientget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,11 @@ struct GPROp : public OperationBase
// act on new operation state

{
(void)evbuffer_drain(chan->conn->txBody.get(), evbuffer_get_length(chan->conn->txBody.get()));
auto& conn = chan->conn;

EvOutBuf R(hostBE, chan->conn->txBody.get());
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -339,7 +341,7 @@ struct GPROp : public OperationBase
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
2 changes: 1 addition & 1 deletion src/clientintrospect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct InfoOp : public OperationBase
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
6 changes: 3 additions & 3 deletions src/clientmon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription

(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -272,7 +272,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription

(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -360,7 +360,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
22 changes: 21 additions & 1 deletion src/conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ConnBase::ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr)
,peerName(peerAddr.tostring())
,bev(bev)
,isClient(isClient)
,sendBE(EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)
,peerBE(true) // arbitrary choice, default should be overwritten before use
,expectSeg(false)
,segCmd(0xff)
Expand All @@ -46,7 +47,7 @@ size_t ConnBase::enqueueTxBody(pva_app_msg_t cmd)
to_evbuf(tx, Header{cmd,
uint8_t(isClient ? 0u : pva_flags::Server),
uint32_t(blen)},
hostBE);
sendBE);
auto err = evbuffer_add_buffer(tx, txBody.get());
assert(!err); // could only fail if frozen/pinned, which is not the case
statTx += 8u + blen;
Expand Down Expand Up @@ -121,6 +122,25 @@ void ConnBase::bevRead()
"%s %s Receive header\n", peerLabel(), peerName.c_str());

if(header[2]&pva_flags::Control) {
if(header[3]==pva_ctrl_msg::SetEndian) {
/* This should be the first message sent by a (supposedly) a server.
* However, old pvAccess* accepts it from either peer at any time.
*
* The protocol spec. claims that we should inspect the size field
* (bytes 4-7) and act as follows.
* 0x00000000 - Send future messages using endianness on this (received)
* message. Peer will ignore MSB flag in our headers!
* 0xffffffff - Send future messages as we like. Peer will test the
* MSB flag.
*
* However, neither pvAccessCPP nor pvAccessJava actually test this.
* Instead the 0x00000000 behavior is assumed.
*
* So we latch the byte order here, as the peer will ignore the MSB
* flag in our messages...
*/
sendBE = header[2]&pva_flags::MSB;
}
// Control messages are not actually useful
evbuffer_drain(rx, 8);
statRx += 8u;
Expand Down
1 change: 1 addition & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct ConnBase
TypeStore rxRegistry;

const bool isClient;
bool sendBE;
bool peerBE;
bool expectSeg;

Expand Down
8 changes: 4 additions & 4 deletions src/serverchan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void ServerChannelControl::close()
conn->peerName.c_str(), ch->name.c_str());

auto tx = bufferevent_get_output(conn->bev.get());
EvOutBuf R(hostBE, tx);
EvOutBuf R(conn->sendBE, tx);
to_wire(R, Header{CMD_DESTROY_CHANNEL, pva_flags::Server, 8});
to_wire(R, ch->sid);
to_wire(R, ch->cid);
Expand Down Expand Up @@ -251,7 +251,7 @@ void ServerConn::handle_SEARCH()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

_to_wire<12>(R, iface->server->effective.guid.data(), false, __FILE__, __LINE__);
to_wire(R, searchID);
Expand Down Expand Up @@ -363,7 +363,7 @@ void ServerConn::handle_CREATE_CHANNEL()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());
to_wire(R, cid);
to_wire(R, sid);
to_wire(R, sts);
Expand Down Expand Up @@ -417,7 +417,7 @@ void ServerConn::handle_DESTROY_CHANNEL()

{
auto tx = bufferevent_get_output(bev.get());
EvOutBuf R(hostBE, tx);
EvOutBuf R(sendBE, tx);
to_wire(R, Header{CMD_DESTROY_CHANNEL, pva_flags::Server, 8});
to_wire(R, sid);
to_wire(R, cid);
Expand Down
8 changes: 4 additions & 4 deletions src/serverconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *

// queue connection validation message
{
VectorOutBuf M(hostBE, buf);
VectorOutBuf M(sendBE, buf);
to_wire(M, Header{pva_ctrl_msg::SetEndian, pva_flags::Control|pva_flags::Server, 0});

auto save = M.save();
Expand All @@ -87,7 +87,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
to_wire(M, "ca");
auto bend = M.save();

FixedBuf H(hostBE, save, 8);
FixedBuf H(sendBE, save, 8);
to_wire(H, Header{CMD_CONNECTION_VALIDATION, pva_flags::Server, uint32_t(bend-bstart)});

assert(M.good() && H.good());
Expand Down Expand Up @@ -123,7 +123,7 @@ void ServerConn::handle_ECHO()
auto tx = bufferevent_get_output(bev.get());
uint32_t len = evbuffer_get_length(segBuf.get());

to_evbuf(tx, Header{CMD_ECHO, pva_flags::Server, len}, hostBE);
to_evbuf(tx, Header{CMD_ECHO, pva_flags::Server, len}, sendBE);

auto err = evbuffer_add_buffer(tx, segBuf.get());
assert(!err);
Expand All @@ -140,7 +140,7 @@ void auth_complete(ServerConn *self, const Status& sts)
(void)evbuffer_drain(self->txBody.get(), evbuffer_get_length(self->txBody.get()));

{
EvOutBuf M(hostBE, self->txBody.get());
EvOutBuf M(self->sendBE, self->txBody.get());
to_wire(M, sts);
}

Expand Down
2 changes: 1 addition & 1 deletion src/serverget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct ServerGPR : public ServerOp
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, subcmd);
to_wire(R, sts);
Expand Down
2 changes: 1 addition & 1 deletion src/serverintrospect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct ServerIntrospect : public ServerOp
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, sts);
if(type)
Expand Down
2 changes: 1 addition & 1 deletion src/servermon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct MonitorOp : public ServerOp,
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, subcmd);
if(subcmd&0x08) {
Expand Down

0 comments on commit fe6933e

Please sign in to comment.