Skip to content

Commit

Permalink
rmq_rework: Add retry on socket_open
Browse files Browse the repository at this point in the history
  • Loading branch information
olivier-detour authored and matt-42 committed Mar 11, 2018
1 parent 1f3631b commit cbea497
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
30 changes: 27 additions & 3 deletions silicon/backends/rabbitmq.hh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# include <stdexcept>
# include <iostream>
# include <chrono>
# include <thread>

# include <amqp_tcp_socket.h>
# include <amqp.h>
Expand All @@ -12,10 +14,26 @@
# include <silicon/symbols.hh>
# include <silicon/service.hh>

using namespace std::chrono_literals;

namespace sl
{
namespace rmq
{
template <typename T>
bool retry(std::string const & n, std::function<bool()> f, size_t m, T t)
{
for (size_t i {0}; i < m; ++i)
{
if(f())
return true;

std::cerr << n << " failed: retry in " << std::chrono::milliseconds(t).count() << "ms\n";

std::this_thread::sleep_for(t);
}
return false;
}
auto get_string = [] (auto const & b) { return std::string(static_cast<char const *>(b.bytes), b.len); };

void
Expand Down Expand Up @@ -129,15 +147,21 @@ namespace rmq
public socket
{
template <typename... O>
tcp_socket(std::string const & host, unsigned short port, O &&... )
tcp_socket(std::string const & host, unsigned short port, O &&... opts)
{
auto options = D(opts...);

unsigned int r = options.get(s::_retry, 5);

conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket)
throw std::runtime_error("amqp.tcp.socket.new");

auto status = amqp_socket_open(socket, host.c_str(), port);
if (status)
if (!retry("amqp.socket.open", [&]()
{
return amqp_socket_open(socket, host.c_str(), port) == 0;
}, r, 1s))
throw std::runtime_error("amqp.socket.open");
}

Expand Down
5 changes: 5 additions & 0 deletions silicon/symbols.hh
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,8 @@
#define IOD_SYMBOL_direct
iod_define_symbol(direct)
#endif

#ifndef IOD_SYMBOL_retry
#define IOD_SYMBOL_retry
iod_define_symbol(retry)
#endif

0 comments on commit cbea497

Please sign in to comment.