Skip to content

Commit

Permalink
mavconn: Implement TCP server mode.
Browse files Browse the repository at this point in the history
Fix #57.
  • Loading branch information
vooon committed Jul 28, 2014
1 parent a060d44 commit 0d73430
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 39 deletions.
52 changes: 14 additions & 38 deletions include/mavros/mavconn_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ namespace mavconn {
class MAVConnTCPClient : public MAVConnInterface {
public:
/**
* Create generic TCP client (connect to the server)
* @param[id] server_addr remote host
* @param[id] server_port remote port
*/
MAVConnTCPClient(uint8_t system_id = 1, uint8_t component_id = MAV_COMP_ID_UDP_BRIDGE,
std::string server_host = "localhost", unsigned short server_port = 5760);
/**
* Special client variation for use in MAVConnTCPServer
* @param[in] client_sd socket descriptor
*/
explicit MAVConnTCPClient(uint8_t system_id, uint8_t component_id, int client_sd, sockaddr_in &client_addr);
~MAVConnTCPClient();

void close();
Expand All @@ -62,6 +68,7 @@ class MAVConnTCPClient : public MAVConnInterface {
inline bool is_open() { return sockfd != -1; };

private:
friend class MAVConnTCPServer;
ev::io io;
int sockfd;

Expand All @@ -75,39 +82,6 @@ class MAVConnTCPClient : public MAVConnInterface {
void write_cb(ev::io &watcher);
};

#if 0

/**
* @brief TCP server internal client class
*
* @note Because mavlink_message_parse() require chanel allocation,
* this class also use MAVConnInterface.
*/
class MAVConnTCPServerClient : public MAVConnInterface {
public:
/**
* @param[in] clientfd socket from MAVConnTCPServer
*/
MAVConnTCPServerClient(uint8_t system_id, uint8_t component_id, int clientfd);
~MAVConnTCPServerClient();

void close();

void send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid);
void send_bytes(const uint8_t *bytes, size_t length);

private:
ev::io io;
int sockfd;

std::list<MsgBuffer*> tx_q;
boost::recursive_mutex mutex;

void event_cb(ev::io &watcher, int revents);
void read_cb(ev::io &watcher);
void write_cb(ev::io &watcher);
};


/**
* @brief TCP server interface
Expand All @@ -121,7 +95,7 @@ class MAVConnTCPServer : public MAVConnInterface {
* @param[id] server_port bind port
*/
MAVConnTCPServer(uint8_t system_id = 1, uint8_t component_id = MAV_COMP_ID_UDP_BRIDGE,
std::string server_addr = "localhost", unsigned short server_port = 5760);
std::string bind_host = "localhost", unsigned short bind_port = 5760);
~MAVConnTCPServer();

void close();
Expand All @@ -139,13 +113,15 @@ class MAVConnTCPServer : public MAVConnInterface {

sockaddr_in bind_addr;

std::list<MAVConnTCPServerClient *> client_list;
std::list<MAVConnTCPClient *> client_list;
boost::recursive_mutex mutex;

void event_cb(ev::io &watcher, int revents);
};
void accept_cb(ev::io &watcher, int revents);

#endif
// client slots
void client_closed(MAVConnTCPClient *instp);
void recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid);
};

}; // namespace mavconn

141 changes: 140 additions & 1 deletion src/mavconn/mavconn_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,26 @@ MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
start_default_loop();
}

MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
int client_sd, sockaddr_in &client_addr) :
MAVConnInterface(system_id, component_id),
sockfd(client_sd),
server_addr(client_addr)
{
ROS_INFO_NAMED("mavconn", "tcp-l: Got client, channel %d, address: %s:%d",
channel,
inet_ntoa(server_addr.sin_addr),
ntohs(server_addr.sin_port));

if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
throw DeviceError("tcp-l: fcntl", errno);

// run io for async io
io.set<MAVConnTCPClient, &MAVConnTCPClient::event_cb>(this);
io.start(sockfd, ev::READ);
start_default_loop(); // XXX: alredy started by server
}

MAVConnTCPClient::~MAVConnTCPClient() {
close();
}
Expand Down Expand Up @@ -234,7 +254,126 @@ void MAVConnTCPClient::write_cb(ev::io &watcher)
}


/***** XXX Server variant ******/
/* -*- TCP server variant -*- */

MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id,
std::string server_host, unsigned short server_port) :
MAVConnInterface(system_id, component_id),
sockfd(-1)
{
if (!resolve_address_tcp(server_host, server_port, bind_addr))
throw DeviceError("tcp-l: resolve", "Bind address resolve failed");

ROS_INFO_NAMED("mavconn", "tcp-l: Bind address: %s:%d",
inet_ntoa(bind_addr.sin_addr), ntohs(bind_addr.sin_port));

sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
throw DeviceError("tcp-l: socket", errno);

int one = 1;
if (::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
throw DeviceError("tcp-l: setsockopt", errno);

if (::bind(sockfd, (sockaddr *)&bind_addr, sizeof(bind_addr)) < 0)
throw DeviceError("tcp-l: bind", errno);

if (::listen(sockfd, channes_available()))
throw DeviceError("tcp-l: listen", errno);

if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
throw DeviceError("tcp-l: fcntl", errno);

// run io for async io
io.set<MAVConnTCPServer, &MAVConnTCPServer::accept_cb>(this);
io.start(sockfd, ev::READ);
start_default_loop();
}

MAVConnTCPServer::~MAVConnTCPServer() {
close();
}

void MAVConnTCPServer::close() {
if (sockfd < 0)
return;

std::for_each(client_list.cbegin(), client_list.cend(),
[](MAVConnTCPClient *instp) {
instp->close();
});

io.stop();
::close(sockfd); sockfd = -1;

/* emit */ port_closed();
}

void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length)
{
boost::recursive_mutex::scoped_lock lock(mutex);
std::for_each(client_list.begin(), client_list.end(),
[bytes, length](MAVConnTCPClient *instp) {
instp->send_bytes(bytes, length);
});
}

void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
{
boost::recursive_mutex::scoped_lock lock(mutex);
std::for_each(client_list.begin(), client_list.end(),
[message, sysid, compid](MAVConnTCPClient *instp) {
instp->send_message(message, sysid, compid);
});
}

void MAVConnTCPServer::accept_cb(ev::io &watcher, int revents)
{
if (ev::ERROR & revents) {
ROS_ERROR_NAMED("mavconn", "accept_cb::revents: 0x%08x", revents);
close();
return;
}

if (!(ev::READ & revents))
return;

struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);

int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
if (client_sd < 0) {
ROS_ERROR_NAMED("mavconn", "tcp-l: accept: %s", strerror(errno));
return;
}

if (channes_available() <= 0) {
ROS_ERROR_NAMED("mavconn", "tcp-l:accept_cb: all channels in use, drop connection");
::close(client_sd);
return;
}

MAVConnTCPClient *instp = new MAVConnTCPClient(sys_id, comp_id, client_sd, client_addr);
instp->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3));
instp->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this, instp));

client_list.push_back(instp);
}

void MAVConnTCPServer::client_closed(MAVConnTCPClient *instp)
{
ROS_INFO_NAMED("mavconn", "tcp-l: Client connection closed, channel %d, address: %s:%d",
instp->channel,
inet_ntoa(instp->server_addr.sin_addr),
ntohs(instp->server_addr.sin_port));

client_list.remove(instp);
delete instp;
}

void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid)
{
message_received(message, sysid, compid);
}

}; // namespace mavconn

0 comments on commit 0d73430

Please sign in to comment.