Skip to content

Commit

Permalink
Slowly reworking opflex connections to share more code with ovsdb
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Flynn <tom.flynn@gmail.com>
  • Loading branch information
tomflynn committed May 29, 2020
1 parent 4b85a4e commit d2642a1
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 124 deletions.
4 changes: 4 additions & 0 deletions agent-ovs/ovs/OvsdbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,8 @@ void OvsdbConnection::handleMonitorError(uint64_t reqId, const rapidjson::Docume
LOG(WARNING) << "Received error response with no error element";
}
}

void OvsdbConnection::messagesReady() {
// TODO
}
}
6 changes: 6 additions & 0 deletions agent-ovs/ovs/include/OvsdbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class OvsdbConnection : public opflex::jsonrpc::RpcConnection {
*/
uint64_t getNextId() { return ++id; }

/**
* New messages are ready to be written to the socket.
* processWriteQueue() must be called.
*/
virtual void messagesReady();

private:

yajr::Peer* peer;
Expand Down
1 change: 1 addition & 0 deletions libopflex/comms/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ libcomms_la_SOURCES =
libcomms_la_SOURCES += active_connection.cpp
libcomms_la_SOURCES += passive_listener.cpp
libcomms_la_SOURCES += common.cpp
libcomms_la_SOURCES += rpc/JsonRpcConnection.cpp
libcomms_la_SOURCES += rpc/JsonRpcHandler.cpp
libcomms_la_SOURCES += transport/PlainText.cpp
libcomms_la_SOURCES += transport/ZeroCopyOpenSSL.cpp
Expand Down
99 changes: 99 additions & 0 deletions libopflex/comms/rpc/JsonRpcConnection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
/*
* Implementation of JSON-RPC handler
*
* Copyright (c) 2020 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/

/* This must be included before anything else */
#if HAVE_CONFIG_H
# include <config.h>
#endif

#include <boost/scoped_ptr.hpp>

#include "opflex/logging/internal/logging.hpp"
#include "opflex/rpc/JsonRpcConnection.h"

namespace opflex {
namespace jsonrpc {

RpcConnection::RpcConnection() : requestId(1), connGeneration(0) {
uv_mutex_init(&queue_mutex);
}

RpcConnection::~RpcConnection() {
uv_mutex_destroy(&queue_mutex);
}

void RpcConnection::cleanup() {
util::LockGuard guard(&queue_mutex);
connGeneration += 1;
while (!write_queue.empty()) {
delete write_queue.front().first;
write_queue.pop_front();
}
}

void RpcConnection::sendMessage(JsonRpcMessage* message, bool sync) {
if (sync) {
boost::scoped_ptr<JsonRpcMessage> messagep(message);
doWrite(message);
} else {
util::LockGuard guard(&queue_mutex);
write_queue.push_back(std::make_pair(message, connGeneration));
}
messagesReady();
}


void RpcConnection::processWriteQueue() {
util::LockGuard guard(&queue_mutex);
while (!write_queue.empty()) {
const write_queue_item_t& qi = write_queue.front();
// Avoid writing messages from a previous reconnect attempt
if (qi.second < connGeneration) {
LOG(DEBUG) << "Ignoring " << qi.first->getMethod()
<< " of type " << qi.first->getType();
continue;
}
boost::scoped_ptr<JsonRpcMessage> message(qi.first);
write_queue.pop_front();
doWrite(message.get());
}
}

void RpcConnection::doWrite(JsonRpcMessage* message) {
if (getPeer() == NULL) return;

jsonrpc::PayloadWrapper wrapper(message);
switch (message->getType()) {
case jsonrpc::JsonRpcMessage::REQUEST:
{
yajr::rpc::MethodName method(message->getMethod().c_str());
uint64_t xid = message->getReqXid();
if (xid == 0) xid = requestId++;
yajr::rpc::OutboundRequest outm(wrapper, &method, xid, getPeer());
outm.send();
}
break;
case jsonrpc::JsonRpcMessage::RESPONSE:
{
yajr::rpc::OutboundResult outm(*getPeer(), wrapper, message->getId());
outm.send();
}
break;
case jsonrpc::JsonRpcMessage::ERROR_RESPONSE:
{
yajr::rpc::OutboundError outm(*getPeer(), wrapper, message->getId());
outm.send();
}
break;
}
}
}
}
78 changes: 1 addition & 77 deletions libopflex/engine/OpflexConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@
# include <config.h>
#endif


#include <boost/scoped_ptr.hpp>

#include "opflex/engine/internal/OpflexConnection.h"
#include "opflex/engine/internal/OpflexHandler.h"
#include "opflex/engine/internal/OpflexMessage.h"
#include "opflex/logging/internal/logging.hpp"
#include "opflex/util/LockGuard.h"

#include "yajr/transport/ZeroCopyOpenSSL.hpp"
#include "opflex/yajr/rpc/message_factory.hpp"

static uv_once_t ssl_once = UV_ONCE_INIT;

Expand All @@ -34,25 +27,21 @@ namespace internal {

using rapidjson::Value;
using std::string;
using boost::scoped_ptr;
using yajr::rpc::OutboundRequest;
using yajr::rpc::OutboundResult;
using yajr::rpc::OutboundError;
using yajr::transport::ZeroCopyOpenSSL;

OpflexConnection::OpflexConnection(HandlerFactory& handlerFactory)
: handler(handlerFactory.newHandler(this))
,requestId(1) ,connGeneration(0)
: RpcConnection(), handler(handlerFactory.newHandler(this))
{
uv_mutex_init(&queue_mutex);
connect();
}

OpflexConnection::~OpflexConnection() {
cleanup();
if (handler)
delete handler;
uv_mutex_destroy(&queue_mutex);
}

static void init_ssl() {
Expand All @@ -65,15 +54,6 @@ void OpflexConnection::initSSL() {

void OpflexConnection::connect() {}

void OpflexConnection::cleanup() {
util::LockGuard guard(&queue_mutex);
connGeneration += 1;
while (!write_queue.empty()) {
delete write_queue.front().first;
write_queue.pop_front();
}
}

void OpflexConnection::disconnect() {
cleanup();
}
Expand All @@ -90,62 +70,6 @@ void OpflexConnection::notifyReady() {

}

void OpflexConnection::doWrite(OpflexMessage* message) {
if (getPeer() == NULL) return;

jsonrpc::PayloadWrapper wrapper(message);
switch (message->getType()) {
case OpflexMessage::REQUEST:
{
yajr::rpc::MethodName method(message->getMethod().c_str());
uint64_t xid = message->getReqXid();
if (xid == 0) xid = requestId++;
OutboundRequest outm(wrapper, &method, xid, getPeer());
outm.send();
}
break;
case OpflexMessage::RESPONSE:
{
OutboundResult outm(*getPeer(), wrapper, message->getId());
outm.send();
}
break;
case OpflexMessage::ERROR_RESPONSE:
{
OutboundError outm(*getPeer(), wrapper, message->getId());
outm.send();
}
break;
}
}

void OpflexConnection::processWriteQueue() {
util::LockGuard guard(&queue_mutex);
while (!write_queue.empty()) {
const write_queue_item_t& qi = write_queue.front();
// Avoid writing messages from a previous reconnect attempt
if (qi.second < connGeneration) {
LOG(DEBUG) << "Ignoring " << qi.first->getMethod()
<< " of type " << qi.first->getType();
continue;
}
scoped_ptr<OpflexMessage> message(qi.first);
write_queue.pop_front();
doWrite(message.get());
}
}

void OpflexConnection::sendMessage(OpflexMessage* message, bool sync) {
if (sync) {
scoped_ptr<OpflexMessage> messagep(message);
doWrite(message);
} else {
util::LockGuard guard(&queue_mutex);
write_queue.push_back(std::make_pair(message, connGeneration));
}
messagesReady();
}

} /* namespace internal */
} /* namespace engine */
} /* namespace opflex */
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class OpflexMessage;
* Maintain the connection state information for a connection to an
* opflex peer
*/
class OpflexConnection : opflex::jsonrpc::RpcConnection {
class OpflexConnection : public opflex::jsonrpc::RpcConnection {
public:

/**
Expand Down Expand Up @@ -109,30 +109,13 @@ class OpflexConnection : opflex::jsonrpc::RpcConnection {
*/
virtual const std::string& getRemotePeer() = 0;

/**
* Send the opflex message to the remote peer. This can be called
* from any thread.
*
* @param message the message to send. Ownership of the object
* passes to the connection.
* @param sync if true, send the message synchronously. This can
* only be called if it's called from the uv loop thread.
*/
virtual void sendMessage(OpflexMessage* message, bool sync = false);

/**
* Get the handler associated with this connection
*
* @return the OpflexHandler for the connection.
*/
virtual OpflexHandler* getHandler() { return handler; }

/**
* Process the write queue for the connection from within the
* libuv loop thread
*/
void processWriteQueue();

/**
* Get the peer handshake timeout (in ms)
* @return timeout
Expand All @@ -155,29 +138,9 @@ class OpflexConnection : opflex::jsonrpc::RpcConnection {
*/
OpflexHandler* handler;

/**
* New messages are ready to be written to the socket.
* processWriteQueue() must be called.
*/
virtual void messagesReady() = 0;

/**
* Clean up write queue
*/
virtual void cleanup();

private:
uint64_t requestId;

uint64_t connGeneration;
typedef std::pair<OpflexMessage*, uint64_t> write_queue_item_t;
typedef std::list<write_queue_item_t> write_queue_t;
write_queue_t write_queue;
uv_mutex_t queue_mutex;
uint32_t handshakeTimeout;

void doWrite(OpflexMessage* message);

virtual void notifyReady();
virtual void notifyFailed() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ class OpflexMessage : public jsonrpc::JsonRpcMessage {

virtual void serializePayload(yajr::rpc::SendHandler& writer) const = 0;

/**
* Get a transaction ID for a request. If nonzero, allocate a
* transaction ID using a counter
*
* @return the transaction ID for the request
*/
virtual uint64_t getReqXid() { return 0; }
};

/**
Expand Down
Loading

0 comments on commit d2642a1

Please sign in to comment.