From 0d734306e21c5fd249e1c1e44bd07b45fc3e530e Mon Sep 17 00:00:00 2001 From: Vladimir Ermakov Date: Tue, 29 Jul 2014 01:17:38 +0400 Subject: [PATCH] mavconn: Implement TCP server mode. Fix #57. --- include/mavros/mavconn_tcp.h | 52 ++++--------- src/mavconn/mavconn_tcp.cpp | 141 ++++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 39 deletions(-) diff --git a/include/mavros/mavconn_tcp.h b/include/mavros/mavconn_tcp.h index b69c5fe53..3b5a2c42f 100644 --- a/include/mavros/mavconn_tcp.h +++ b/include/mavros/mavconn_tcp.h @@ -45,11 +45,17 @@ namespace mavconn { class MAVConnTCPClient : public MAVConnInterface { public: /** + * Create generic TCP client (connect to the server) * @param[id] server_addr remote host * @param[id] server_port remote port */ MAVConnTCPClient(uint8_t system_id = 1, uint8_t component_id = MAV_COMP_ID_UDP_BRIDGE, std::string server_host = "localhost", unsigned short server_port = 5760); + /** + * Special client variation for use in MAVConnTCPServer + * @param[in] client_sd socket descriptor + */ + explicit MAVConnTCPClient(uint8_t system_id, uint8_t component_id, int client_sd, sockaddr_in &client_addr); ~MAVConnTCPClient(); void close(); @@ -62,6 +68,7 @@ class MAVConnTCPClient : public MAVConnInterface { inline bool is_open() { return sockfd != -1; }; private: + friend class MAVConnTCPServer; ev::io io; int sockfd; @@ -75,39 +82,6 @@ class MAVConnTCPClient : public MAVConnInterface { void write_cb(ev::io &watcher); }; -#if 0 - -/** - * @brief TCP server internal client class - * - * @note Because mavlink_message_parse() require chanel allocation, - * this class also use MAVConnInterface. - */ -class MAVConnTCPServerClient : public MAVConnInterface { -public: - /** - * @param[in] clientfd socket from MAVConnTCPServer - */ - MAVConnTCPServerClient(uint8_t system_id, uint8_t component_id, int clientfd); - ~MAVConnTCPServerClient(); - - void close(); - - void send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid); - void send_bytes(const uint8_t *bytes, size_t length); - -private: - ev::io io; - int sockfd; - - std::list tx_q; - boost::recursive_mutex mutex; - - void event_cb(ev::io &watcher, int revents); - void read_cb(ev::io &watcher); - void write_cb(ev::io &watcher); -}; - /** * @brief TCP server interface @@ -121,7 +95,7 @@ class MAVConnTCPServer : public MAVConnInterface { * @param[id] server_port bind port */ MAVConnTCPServer(uint8_t system_id = 1, uint8_t component_id = MAV_COMP_ID_UDP_BRIDGE, - std::string server_addr = "localhost", unsigned short server_port = 5760); + std::string bind_host = "localhost", unsigned short bind_port = 5760); ~MAVConnTCPServer(); void close(); @@ -139,13 +113,15 @@ class MAVConnTCPServer : public MAVConnInterface { sockaddr_in bind_addr; - std::list client_list; + std::list client_list; boost::recursive_mutex mutex; - void event_cb(ev::io &watcher, int revents); -}; + void accept_cb(ev::io &watcher, int revents); -#endif + // client slots + void client_closed(MAVConnTCPClient *instp); + void recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid); +}; }; // namespace mavconn diff --git a/src/mavconn/mavconn_tcp.cpp b/src/mavconn/mavconn_tcp.cpp index adee5c652..4561e2ba2 100644 --- a/src/mavconn/mavconn_tcp.cpp +++ b/src/mavconn/mavconn_tcp.cpp @@ -111,6 +111,26 @@ MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id, start_default_loop(); } +MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id, + int client_sd, sockaddr_in &client_addr) : + MAVConnInterface(system_id, component_id), + sockfd(client_sd), + server_addr(client_addr) +{ + ROS_INFO_NAMED("mavconn", "tcp-l: Got client, channel %d, address: %s:%d", + channel, + inet_ntoa(server_addr.sin_addr), + ntohs(server_addr.sin_port)); + + if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) + throw DeviceError("tcp-l: fcntl", errno); + + // run io for async io + io.set(this); + io.start(sockfd, ev::READ); + start_default_loop(); // XXX: alredy started by server +} + MAVConnTCPClient::~MAVConnTCPClient() { close(); } @@ -234,7 +254,126 @@ void MAVConnTCPClient::write_cb(ev::io &watcher) } -/***** XXX Server variant ******/ +/* -*- TCP server variant -*- */ + +MAVConnTCPServer::MAVConnTCPServer(uint8_t system_id, uint8_t component_id, + std::string server_host, unsigned short server_port) : + MAVConnInterface(system_id, component_id), + sockfd(-1) +{ + if (!resolve_address_tcp(server_host, server_port, bind_addr)) + throw DeviceError("tcp-l: resolve", "Bind address resolve failed"); + + ROS_INFO_NAMED("mavconn", "tcp-l: Bind address: %s:%d", + inet_ntoa(bind_addr.sin_addr), ntohs(bind_addr.sin_port)); + + sockfd = ::socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + throw DeviceError("tcp-l: socket", errno); + + int one = 1; + if (::setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) + throw DeviceError("tcp-l: setsockopt", errno); + + if (::bind(sockfd, (sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) + throw DeviceError("tcp-l: bind", errno); + + if (::listen(sockfd, channes_available())) + throw DeviceError("tcp-l: listen", errno); + + if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) + throw DeviceError("tcp-l: fcntl", errno); + + // run io for async io + io.set(this); + io.start(sockfd, ev::READ); + start_default_loop(); +} + +MAVConnTCPServer::~MAVConnTCPServer() { + close(); +} + +void MAVConnTCPServer::close() { + if (sockfd < 0) + return; + std::for_each(client_list.cbegin(), client_list.cend(), + [](MAVConnTCPClient *instp) { + instp->close(); + }); + + io.stop(); + ::close(sockfd); sockfd = -1; + + /* emit */ port_closed(); +} + +void MAVConnTCPServer::send_bytes(const uint8_t *bytes, size_t length) +{ + boost::recursive_mutex::scoped_lock lock(mutex); + std::for_each(client_list.begin(), client_list.end(), + [bytes, length](MAVConnTCPClient *instp) { + instp->send_bytes(bytes, length); + }); +} + +void MAVConnTCPServer::send_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid) +{ + boost::recursive_mutex::scoped_lock lock(mutex); + std::for_each(client_list.begin(), client_list.end(), + [message, sysid, compid](MAVConnTCPClient *instp) { + instp->send_message(message, sysid, compid); + }); +} + +void MAVConnTCPServer::accept_cb(ev::io &watcher, int revents) +{ + if (ev::ERROR & revents) { + ROS_ERROR_NAMED("mavconn", "accept_cb::revents: 0x%08x", revents); + close(); + return; + } + + if (!(ev::READ & revents)) + return; + + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + + int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len); + if (client_sd < 0) { + ROS_ERROR_NAMED("mavconn", "tcp-l: accept: %s", strerror(errno)); + return; + } + + if (channes_available() <= 0) { + ROS_ERROR_NAMED("mavconn", "tcp-l:accept_cb: all channels in use, drop connection"); + ::close(client_sd); + return; + } + + MAVConnTCPClient *instp = new MAVConnTCPClient(sys_id, comp_id, client_sd, client_addr); + instp->message_received.connect(boost::bind(&MAVConnTCPServer::recv_message, this, _1, _2, _3)); + instp->port_closed.connect(boost::bind(&MAVConnTCPServer::client_closed, this, instp)); + + client_list.push_back(instp); +} + +void MAVConnTCPServer::client_closed(MAVConnTCPClient *instp) +{ + ROS_INFO_NAMED("mavconn", "tcp-l: Client connection closed, channel %d, address: %s:%d", + instp->channel, + inet_ntoa(instp->server_addr.sin_addr), + ntohs(instp->server_addr.sin_port)); + + client_list.remove(instp); + delete instp; +} + +void MAVConnTCPServer::recv_message(const mavlink_message_t *message, uint8_t sysid, uint8_t compid) +{ + message_received(message, sysid, compid); +} }; // namespace mavconn