Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: jblunck/a
base: e7ad51e8c2
...
head fork: jblunck/a
compare: c6d68b7f6b
  • 7 commits
  • 6 files changed
  • 0 commit comments
  • 1 contributor
Commits on Nov 06, 2011
@jblunck Add command-line options "h", "i" and "p" to upd_multicast_receiver_e…
…xample

The application now listens to the following options:

udp_multicast_receiver -i ipaddr -h ipaddr -p port

 -i specify interface on which to listen for multicast datagrams
 -h specify multicast group to join
 -p specify port on which to listen for multicast datagrams

Signed-off-by: Jan Blunck <jblunck@infradead.org>
bd2b7c8
Commits on Nov 08, 2011
@jblunck Turn local_datagram_receiver_example into performance test
Signed-off-by: Jan Blunck <jblunck@infradead.org>
c1fca07
@jblunck Add unittest to check local_datagram_receiver functionality
Signed-off-by: Jan Blunck <jblunck@infradead.org>
c75edec
@jblunck Build with debuginfo by default
Signed-off-by: Jan Blunck <jblunck@infradead.org>
f451a9f
@jblunck Make datagram_receiver noncopyable
Signed-off-by: Jan Blunck <jblunck@infradead.org>
56878cf
@jblunck Properly unbind() to cancel handler and close the socket
This is necessary to get rid of the allocated memory to make valgrind no
longer complain.

Signed-off-by: Jan Blunck <jblunck@infradead.org>
751e0ee
@jblunck Don't allow local_datagram_receiver to bind multiple times
Signed-off-by: Jan Blunck <jblunck@infradead.org>
c6d68b7
View
4 boost_asio/CMakeLists.txt
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 2.6)
+set(CMAKE_BUILD_TYPE RelWithDebInfo)
+
find_package(Boost COMPONENTS system unit_test_framework)
add_executable(udp_multicast_receiver_example udp_multicast_receiver_example.cpp)
@@ -15,4 +17,4 @@ add_executable(local_datagram_receiver_test local_datagram_receiver_test.cpp rea
target_link_libraries(local_datagram_receiver_test ${Boost_LIBRARIES} pthread)
add_executable(local_datagram_receiver_example local_datagram_receiver_example.cpp read_from.cpp)
-target_link_libraries(local_datagram_receiver_example ${Boost_LIBRARIES} pthread)
+target_link_libraries(local_datagram_receiver_example ${Boost_LIBRARIES} pthread rt)
View
6 boost_asio/datagram_receiver.hpp
@@ -6,6 +6,7 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <boost/utility.hpp>
/*
* Object Lifetime of the datagram_receiver<>
@@ -35,7 +36,7 @@ template <class EventHandler,
class StatusListener,
typename Protocol,
typename BufferPolicy = static_buffer_policy<64 * 1024> >
-class datagram_receiver :
+class datagram_receiver : boost::noncopyable,
public boost::enable_shared_from_this<datagram_receiver<EventHandler,
StatusListener,
Protocol,
@@ -98,7 +99,8 @@ class datagram_receiver :
size_t length)
{
if (error) {
- _status.error(error.message());
+ if (error != boost::asio::error::operation_aborted)
+ _status.error(error.message());
return;
}
View
21 boost_asio/local_datagram_receiver.hpp
@@ -82,6 +82,8 @@ class buffered_datagram_writer
* we do not wait in run() infinitely. But poll() all ready work
* first.
*/
+ _socket.cancel();
+ _socket.close();
_socket.get_io_service().poll();
_socket.get_io_service().stop();
return;
@@ -107,12 +109,14 @@ class local_datagram_receiver :
{
typedef datagram_receiver<EventHandler,StatusListener,boost::asio::local::datagram_protocol> base_type;
boost::asio::local::datagram_protocol::socket _socket;
+ unsigned short _bind_port;
public:
local_datagram_receiver(EventHandler& handler,
StatusListener& listener) :
base_type(handler, listener),
- _socket(base_type::get_io_service())
+ _socket(base_type::get_io_service()),
+ _bind_port(0)
{
}
@@ -121,7 +125,8 @@ class local_datagram_receiver :
const char * /* interface_address */,
const bool /* is_loopback */) :
base_type(handler, listener),
- _socket(base_type::get_io_service())
+ _socket(base_type::get_io_service()),
+ _bind_port(0)
{
}
@@ -130,6 +135,10 @@ class local_datagram_receiver :
*/
void bind(const unsigned short port)
{
+ if (_bind_port)
+ throw std::runtime_error("Already bound to port " + _bind_port);
+ _bind_port = port;
+
boost::shared_ptr<buffered_datagram_writer<T> > reader;
reader.reset(new buffered_datagram_writer<T>(base_type::get_io_service(),
port));
@@ -145,6 +154,14 @@ class local_datagram_receiver :
base_type::start_receive(_socket);
}
+ void unbind(const unsigned short port)
+ {
+ if (port != _bind_port)
+ throw std::runtime_error("Not bound to port " + port);
+
+ _socket.cancel();
+ _socket.close();
+ }
};
#endif // __LOCAL_DATAGRAM_RECEIVER_HPP__
View
50 boost_asio/local_datagram_receiver_example.cpp
@@ -1,45 +1,39 @@
#include "read_from.hpp"
#include "local_datagram_receiver.hpp"
#include "datagram_receiver_handler.hpp"
+#include "../stop_watch.hpp"
#include <iostream>
-struct LastReceivedBufferHandler
-{
- typedef char * data_type;
- std::string _buffer;
- bool _has_received;
-
- LastReceivedBufferHandler()
- : _has_received(false)
- {
- }
-
- void operator()(data_type buffer, std::size_t length)
- {
- _buffer = buffer;
- _has_received = true;
- }
-
- const std::string& get_buffer()
- {
- if (!_has_received)
- throw std::runtime_error("Not received anything");
- return _buffer;
- }
-};
-
-typedef local_datagram_receiver<struct LastReceivedBufferHandler,
+typedef local_datagram_receiver<struct nop_handler,
struct cerr_status_listener,
struct read_from_string> receiver_t;
+#define MESSAGES_NR 1e6
+
int main()
{
- struct LastReceivedBufferHandler nh;
+ read_from_string::buf = "";
+ for (int i=0 ; i < MESSAGES_NR ; ++i) {
+ read_from_string::buf.append("00101234567890");
+ }
+ read_from_string::buf.append("0000");
+
+ struct nop_handler nh;
struct cerr_status_listener mc;
boost::shared_ptr<receiver_t> receiver(new receiver_t(nh, mc));
receiver->bind(12345);
+
+ stop_watch s;
receiver->run();
+ uint64_t t = s.elapsed_ns();
+ std::cout << ( MESSAGES_NR * 1e9 ) / t << " msg/s" << std::endl;
+
+ // The following is necessary to free all used memory:
+ // 1.) cancel the handler
+ receiver->unbind(12345);
- std::cout << "Received: " << nh.get_buffer() << std::endl;
+ // 2.) execute the handler to process cancelation
+ receiver->get_io_service().reset();
+ receiver->get_io_service().poll();
return 0;
}
View
54 boost_asio/local_datagram_receiver_test.cpp
@@ -1,5 +1,6 @@
#include "local_datagram_receiver.hpp"
#include "read_from.hpp"
+#include "datagram_receiver_handler.hpp"
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MAIN
@@ -57,3 +58,56 @@ BOOST_AUTO_TEST_CASE(read_from_string__fill_buffer)
std::string result(data->begin(), data->end());
BOOST_CHECK_EQUAL(result, read_from_string::buf);
}
+
+struct LastReceivedBufferHandler
+{
+ typedef char * data_type;
+ std::string _buffer;
+ bool _has_received;
+
+ LastReceivedBufferHandler()
+ : _has_received(false)
+ {
+ }
+
+ void operator()(data_type buffer, std::size_t length)
+ {
+ _buffer = buffer;
+ _has_received = true;
+ }
+
+ const std::string& get_buffer()
+ {
+ if (!_has_received)
+ throw std::runtime_error("Not received anything");
+ return _buffer;
+ }
+};
+
+typedef local_datagram_receiver<struct LastReceivedBufferHandler,
+ struct cerr_status_listener,
+ struct read_from_string> receiver_t;
+
+BOOST_AUTO_TEST_CASE(local_datagram_receiver__test1)
+{
+ struct LastReceivedBufferHandler nh;
+ struct cerr_status_listener mc;
+ boost::shared_ptr<receiver_t> receiver(new receiver_t(nh, mc));
+ receiver->bind(12345);
+ receiver->run();
+ BOOST_CHECK_EQUAL(nh.get_buffer(), std::string(read_from_string::buf,4,10));
+}
+
+typedef local_datagram_receiver<struct nop_handler,
+ struct cerr_status_listener,
+ struct read_from_string> receiver2_t;
+
+BOOST_AUTO_TEST_CASE(local_datagram_receiver__bind_throws)
+{
+ struct nop_handler nh;
+ struct cerr_status_listener mc;
+ boost::shared_ptr<receiver2_t> receiver(new receiver2_t(nh, mc));
+ receiver->bind(12345);
+ BOOST_CHECK_THROW(receiver->bind(12345), std::runtime_error);
+ BOOST_CHECK_THROW(receiver->unbind(1), std::runtime_error);
+}
View
35 boost_asio/udp_multicast_receiver_example.cpp
@@ -1,18 +1,45 @@
#include "udp_multicast_receiver.hpp"
#include "datagram_receiver_handler.hpp"
+#include <boost/lexical_cast.hpp>
#include <iostream>
+#include <unistd.h> // for getopt
typedef udp_multicast_receiver<struct nop_handler,
struct cerr_status_listener> receiver_t;
-int main()
+using namespace boost::asio::ip;
+
+int main(int argc, char *argv[])
{
+ // parse options
+ address opts_interface_ipaddr(address_v4::any());
+ address opts_ipaddr;
+ unsigned short opts_port(0);
+
+ int opt;
+ while ((opt = getopt(argc, argv, "i:h:p:")) != -1)
+ {
+ switch (opt)
+ {
+ case 'i':
+ opts_interface_ipaddr = address::from_string(optarg);
+ break;
+ case 'h':
+ opts_ipaddr = address::from_string(optarg);
+ break;
+ case 'p':
+ opts_port = boost::lexical_cast<unsigned short>(optarg);
+ break;
+ }
+ }
+
struct nop_handler nh;
struct cerr_status_listener mc;
boost::shared_ptr<receiver_t> receiver(new receiver_t(nh, mc,
- "127.0.0.1", true));
- receiver->join("239.1.2.3", 12345);
- receiver->leave("239.1.2.3", 12345);
+ opts_interface_ipaddr.to_string().c_str(),
+ true));
+ receiver->join(opts_ipaddr.to_string().c_str(), opts_port);
receiver->run();
+ receiver->leave(opts_ipaddr.to_string().c_str(), opts_port);
return 0;
}

No commit comments for this range

Something went wrong with that request. Please try again.