Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major rewrite or TCPReporter #71

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions reporting/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ IF ( BUILD_REPORTING )
SET( HPPS ConsoleReporting.hpp FileReporting.hpp NiceHeaderMarshaller.hpp ReportingComponent.hpp TableMarshaller.hpp)

# Reporting to a socket
SET( SOCKET_SRCS command.cpp datasender.cpp socket.cpp socketmarshaller.cpp TcpReporting.cpp)
SET( SOCKET_HPPS command.hpp datasender.hpp socket.hpp socketmarshaller.hpp TcpReporting.hpp)
SET( SOCKET_SRCS tcpreportingsession.cpp command.cpp TcpReporting.cpp)
SET( SOCKET_HPPS tcpreportingsession.hpp command.hpp TcpReporting.hpp)

if(NOT OROCOS_TARGET STREQUAL "win32")
set(SRCS ${SRCS} ${SOCKET_SRCS})
Expand Down
4 changes: 4 additions & 0 deletions reporting/ReportingComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ namespace OCL

bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM)
{
// If we were to allow this, we would provoce double deallocation upon destruction of the pair
if(headerM == bodyM)
return false;

boost::shared_ptr<marsh::MarshallInterface> header(headerM);
boost::shared_ptr<marsh::MarshallInterface> body(bodyM);
if ( !header && !body)
Expand Down
308 changes: 90 additions & 218 deletions reporting/TcpReporting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,230 +25,102 @@
* *
***************************************************************************/

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <errno.h>

#include "TcpReporting.hpp"
#include <rtt/Activity.hpp>
#include <rtt/Logger.hpp>
#include <rtt/os/Mutex.hpp>
#include "socket.hpp"
#include "socketmarshaller.hpp"

using RTT::Logger;
using RTT::os::Mutex;
#include <rtt/Component.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include "EmptyMarshaller.hpp"

#include "ocl/Component.hpp"
ORO_LIST_COMPONENT_TYPE(OCL::TcpReporting);

namespace OCL
{
/**
* ListenThread is a thread which waits for new incoming connections
* from clients.
*/
class ListenThread
: public RTT::Activity
{
private:
bool inBreak;
static ListenThread* _instance;
RTT::SocketMarshaller* _marshaller;
unsigned short _port;
bool _accepting;
int _sock;

bool listen()
{
_sock = ::socket(PF_INET, SOCK_STREAM, 0);
if( _sock < 0 )
{
Logger::log() << Logger::Error << "Socket creation failed." << Logger::endl;
return false;
}

struct sockaddr_in localsocket;
struct sockaddr remote;
int adrlen = sizeof(remote);

localsocket.sin_family = AF_INET;
localsocket.sin_port = htons(_port);
localsocket.sin_addr.s_addr = INADDR_ANY;
if( ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) ) < 0 )
{
/* bind can fail when there is a legitimate server when a
previous run of orocos has crashed and the kernel does
not have freed the port yet. TRY_OTHER_PORTS can
select another port if the bind fails. */
#define TRY_OTHER_PORTS
// TODO: remove #define
#ifdef TRY_OTHER_PORTS
int i = 1;
int r = -1;
while( errno == EADDRINUSE && i < 5 && r < 0 )
{
localsocket.sin_port = htons(_port + i);
r = ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) );
i++;
}
if( r >= 0 )
{
Logger::log() << Logger::Info << "Port occupied, use port " << (_port+i-1) << " instead." << Logger::endl;
} else {
#endif
if( errno == EADDRINUSE )
{
Logger::log() << Logger::Error << "Binding of port failed: address already in use." << Logger::endl;
} else {
Logger::log() << Logger::Error << "Binding of port failed with errno " << errno << Logger::endl;
}
::close(_sock);
return false;
#ifdef TRY_OTHER_PORTS
}
#endif
}

if( ::listen(_sock, 2) < 0 )
{
Logger::log() << Logger::Info << "Cannot listen on socket" << Logger::endl;
::close(_sock);
return true;
}
while(_accepting)
{
int socket = ::accept( _sock, &remote,
reinterpret_cast<socklen_t*>(&adrlen) );
if( socket == -1 )
{
return false;
}
if( _accepting )
{
Logger::log() << Logger::Info << "Incoming connection" << Logger::endl;
_marshaller->addConnection( new Orocos::TCP::Socket(socket) );
}
}
return true;
}

ListenThread( RTT::SocketMarshaller* marshaller, unsigned short port )
: Activity(10), _marshaller(marshaller)
{
inBreak = false;
removeInstance();
_accepting = true;
_port = port;
Logger::log() << Logger::Info << "Starting server on port " << port << Logger::endl;
this->Activity::start();
}

// This method should only be called when theadCreationLock is locked.
void removeInstance()
{
if( _instance )
{
delete _instance;
}
}

public:
~ListenThread()
{
_accepting = false;
}

virtual void loop()
{
if( !inBreak )
{
if( !listen() )
{
Logger::log() << Logger::Error << "Could not listen on port " << _port << Logger::endl;
} else {
Logger::log() << Logger::Info << "Shutting down server" << Logger::endl;
}
}
}

virtual bool breakLoop()
{
inBreak = true;
_accepting = false;
::close( _sock );
// accept still hangs until a new connection has been established
int sock = ::socket(PF_INET, SOCK_STREAM, 0);
if( sock > 0 )
{
struct sockaddr_in socket;
socket.sin_family = AF_INET;
socket.sin_port = htons(_port);
socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
::connect( sock, (struct sockaddr*)&socket, sizeof(socket) );
::close( sock );
}
return true;
}

static void createInstance( RTT::SocketMarshaller* marshaller, unsigned short port = 3142 )
{
// The lock is needed to avoid problems when createInstance is called by two
// different threads (which in reality should not occur very often).
//ListenThread* _oinst = ListenThread::_instance;
ListenThread::_instance = new ListenThread( marshaller, port );
//delete _oinst;
}

static void destroyInstance()
{
ListenThread::_instance->breakLoop();
}
};
ListenThread* ListenThread::_instance = 0;
namespace OCL {

void TcpReporting::registerAccept() {
pending_socket = new tcp::socket(io_service);
acceptor.async_accept(*pending_socket,
boost::bind(&TcpReporting::handleAccept, this,
asio::placeholders::error));
}

void TcpReporting::handleAccept(const asio::error_code& error) {
if (error) {
delete pending_socket;
stop();
} else {
TcpReportingSession* session = new TcpReportingSession(pending_socket);
pending_socket = NULL;
addMarshaller(0, session);
sessions.push_back(session);
registerAccept();
}
}

TcpReporting::TcpReporting(std::string fr_name /*= "Reporting"*/)
: ReportingComponent(fr_name), acceptor(io_service), port(3142) {
addProperty("port", port);
}

namespace OCL
{
TcpReporting::TcpReporting(std::string fr_name /*= "Reporting"*/)
: ReportingComponent( fr_name ),
port_prop("port","port to listen/send to",3142)
{
_finishing = false;
this->properties()->addProperty( port_prop);
}

TcpReporting::~TcpReporting()
{
}

const RTT::PropertyBag* TcpReporting::getReport()
{
makeReport2();
return &report;
}

bool TcpReporting::configureHook(){
port=port_prop.value();
return true;
}

bool TcpReporting::startHook()
{
RTT::Logger::In in("TcpReporting::startup");
fbody = new RTT::SocketMarshaller(this);
this->addMarshaller( 0, fbody );
ListenThread::createInstance( fbody, port );
return ReportingComponent::startHook();
}

void TcpReporting::stopHook()
{
_finishing = true;
ListenThread::destroyInstance();
fbody->shutdown();
ReportingComponent::stopHook();
this->removeMarshallers();
}
TcpReporting::~TcpReporting() {}

bool TcpReporting::startHook() {
addMarshaller(0, new RTT::EmptyMarshaller());
tcp::endpoint endpoint(tcp::v4(), port);
asio::error_code error;

acceptor.open(endpoint.protocol(), error);
if (error) {
RTT::log(RTT::Error) << "failed to open acceptor: " << error.message()
<< RTT::endlog();
return false;
}

acceptor.set_option(tcp::acceptor::reuse_address(true));

acceptor.bind(endpoint, error);
if (error) {
RTT::log(RTT::Error) << "failed to bind acceptor: " << error.message()
<< RTT::endlog();
return false;
}

acceptor.listen(asio::socket_base::max_connections, error);
if (error) {
RTT::log(RTT::Error) << "failed to listen acceptor: " << error.message()
<< RTT::endlog();
return false;
}
registerAccept();

return ReportingComponent::startHook();
}

void TcpReporting::stopHook() {
ReportingComponent::stopHook(); // This flushes all connections
io_service.poll();

// Properly terminate the sessions
for (size_t i = 0; i < sessions.size(); i++) {
sessions[i]->terminate();
}
io_service.poll(); // we need this to invoke the session terminations

removeMarshallers();

acceptor.close();
io_service.poll();
io_service.reset();
}

void TcpReporting::updateHook() {
asio::error_code error;
io_service.poll(error);
if (error) {
RTT::log(RTT::Error) << "error in update hook: " << error.message()
<< RTT::endlog();
stop();
}
ReportingComponent::updateHook();
}
}