Permalink
Browse files

Add support for timeouts and externally polled fds

  • Loading branch information...
rescrv committed May 14, 2012
1 parent 72f1372 commit cbee47012f0bdf1de6fcab09308173504b3235d8
Showing with 119 additions and 2 deletions.
  1. +75 −1 busybee.cc
  2. +6 −0 busybee_constants.h
  3. +11 −0 busybee_mta.h
  4. +5 −1 busybee_returncode.h
  5. +11 −0 busybee_st.h
  6. +11 −0 busybee_sta.h
View
@@ -252,6 +252,9 @@ busybee_mta :: busybee_mta(const po6::net::ipaddr& ip,
, m_channels(sysconf(_SC_OPEN_MAX))
, m_postponed(e::next_pow2(m_channels.size() * 2))
, m_pause_barrier(num_threads)
+ , m_timeout(-1)
+ , m_external_fd(0)
+ , m_external_events(0)
, m_shutdown(false)
{
@@ -284,6 +287,8 @@ busybee_mta :: busybee_mta(const po6::net::ipaddr& ip,
case BUSYBEE_CONNECTFAIL:
case BUSYBEE_ADDFDFAIL:
case BUSYBEE_BUFFERFULL:
+ case BUSYBEE_TIMEOUT:
+ case BUSYBEE_EXTERNAL:
default:
throw std::runtime_error("Could not create connection to self.");
}
@@ -312,6 +317,9 @@ busybee_sta :: busybee_sta(const po6::net::ipaddr& ip,
, m_incoming(e::next_pow2(NUM_MSGS_PER_RECV))
, m_channels(sysconf(_SC_OPEN_MAX))
, m_postponed(e::next_pow2(m_channels.size() * 2))
+ , m_timeout(-1)
+ , m_external_fd(0)
+ , m_external_events(0)
{
if (m_epoll.get() < 0)
@@ -343,6 +351,8 @@ busybee_sta :: busybee_sta(const po6::net::ipaddr& ip,
case BUSYBEE_CONNECTFAIL:
case BUSYBEE_ADDFDFAIL:
case BUSYBEE_BUFFERFULL:
+ case BUSYBEE_TIMEOUT:
+ case BUSYBEE_EXTERNAL:
default:
throw std::runtime_error("Could not create connection to self.");
}
@@ -367,6 +377,9 @@ busybee_st :: busybee_st()
, m_incoming(e::next_pow2(NUM_MSGS_PER_RECV))
, m_channels(sysconf(_SC_OPEN_MAX))
, m_postponed(e::next_pow2(m_channels.size() * 2))
+ , m_timeout(-1)
+ , m_external_fd(0)
+ , m_external_events(0)
{
if (m_epoll.get() < 0)
@@ -412,6 +425,60 @@ CLASSNAME :: unpause()
}
#endif // BUSYBEE_MULTITHREADED
+void
+CLASSNAME :: set_timeout(int timeout)
+{
+ m_timeout = timeout;
+}
+
+int
+CLASSNAME :: add_external_fd(int fd, uint32_t events)
+{
+ assert(fd > 0);
+#ifdef BUSYBEE_MULTITHREADED
+ po6::threads::mutex::hold hold(&m_channels[fd]->mtx);
+#endif // BUSYBEE_MULTITHREADED
+ work_close(m_channels[fd].get());
+ m_channels[fd]->tag = -1;
+ epoll_event ee;
+ ee.data.fd = fd;
+ ee.events = events;
+ return epoll_ctl(m_epoll.get(), EPOLL_CTL_ADD, fd, &ee);
+}
+
+void
+CLASSNAME :: get_last_external(int* fd, uint32_t* events)
+{
+ *fd = m_external_fd;
+ *events = m_external_events;
+}
+
+busybee_returncode
+CLASSNAME :: drop(const po6::net::location& to)
+{
+ channel* chan;
+ uint32_t chantag;
+ busybee_returncode res = get_channel(to, &chan, &chantag);
+
+ // Getting a channel fails
+ if (res != BUSYBEE_SUCCESS)
+ {
+ return res;
+ }
+
+#ifdef BUSYBEE_MULTITHREADED
+ po6::threads::mutex::hold hold(&chan->mtx);
+#endif // BUSYBEE_MULTITHREADED
+
+ if (chantag != chan->tag)
+ {
+ return BUSYBEE_SUCCESS;
+ }
+
+ work_close(chan);
+ return BUSYBEE_SUCCESS;
+}
+
busybee_returncode
CLASSNAME :: send(const po6::net::location& to,
std::auto_ptr<e::buffer> msg)
@@ -455,8 +522,10 @@ CLASSNAME :: send(const po6::net::location& to,
// then postpone events that were generated while we held the lock. The
// destructors run in reverse order, so the channel will be unlocked,
// and then postponed.
+#endif // BUSYBEE_MULTITHREADED
e::guard g1 = e::makeobjguard(*this, & CLASSNAME ::postpone_event, chan);
g1.use_variable();
+#ifdef BUSYBEE_MULTITHREADED
e::guard g2 = e::makeobjguard(chan->mtx, &po6::threads::mutex::unlock);
g2.use_variable();
#endif // BUSYBEE_MULTITHREADED
@@ -512,6 +581,11 @@ CLASSNAME :: recv(po6::net::location* from,
return BUSYBEE_POLLFAILED;
}
+ if (status >= 0 && errno == EINTR)
+ {
+ return BUSYBEE_TIMEOUT;
+ }
+
continue;
}
@@ -734,7 +808,7 @@ CLASSNAME :: receive_event(int*fd, uint32_t* events)
}
epoll_event ee;
- int ret = epoll_wait(m_epoll.get(), &ee, 1, 50);
+ int ret = epoll_wait(m_epoll.get(), &ee, 1, m_timeout < 0 ? 50 : m_timeout);
*fd = ee.data.fd;
*events = ee.events;
return ret;
View
@@ -36,4 +36,10 @@
#define BUSYBEE_HEADER_SIZE sizeof(uint32_t)
#define BUSYBEE_MAX_MSG_SIZE UINT32_MAX
+enum busybe_events
+{
+ BUSYBEE_E_IN = 1,
+ BUSYBEE_E_OUT = 2
+};
+
#endif // busybee_constants_h_
View
@@ -72,6 +72,14 @@ class busybee_mta
void unpause();
public:
+ void set_timeout(int timeout);
+
+ public:
+ int add_external_fd(int fd, uint32_t events);
+ void get_last_external(int* fd, uint32_t* events);
+
+ public:
+ busybee_returncode drop(const po6::net::location& to);
busybee_returncode send(const po6::net::location& to,
std::auto_ptr<e::buffer> msg);
busybee_returncode recv(po6::net::location* from,
@@ -146,6 +154,9 @@ class busybee_mta
std::vector<std::tr1::shared_ptr<channel> > m_channels;
e::nonblocking_bounded_fifo<pending> m_postponed;
e::worker_barrier m_pause_barrier;
+ int m_timeout;
+ int m_external_fd;
+ int m_external_events;
bool m_shutdown;
};
View
@@ -41,7 +41,9 @@ enum busybee_returncode
BUSYBEE_DISCONNECT = 4612,
BUSYBEE_CONNECTFAIL = 4613,
BUSYBEE_ADDFDFAIL = 4614,
- BUSYBEE_BUFFERFULL = 4615
+ BUSYBEE_BUFFERFULL = 4615,
+ BUSYBEE_TIMEOUT = 4616,
+ BUSYBEE_EXTERNAL = 4617
};
#define str(x) #x
@@ -61,6 +63,8 @@ operator << (std::ostream& lhs, busybee_returncode rhs)
stringify(BUSYBEE_CONNECTFAIL);
stringify(BUSYBEE_ADDFDFAIL);
stringify(BUSYBEE_BUFFERFULL);
+ stringify(BUSYBEE_TIMEOUT);
+ stringify(BUSYBEE_EXTERNAL);
default:
lhs << "unknown returncode";
break;
View
@@ -51,6 +51,14 @@ class busybee_st
~busybee_st() throw ();
public:
+ void set_timeout(int timeout);
+
+ public:
+ int add_external_fd(int fd, uint32_t events);
+ void get_last_external(int* fd, uint32_t* events);
+
+ public:
+ busybee_returncode drop(const po6::net::location& to);
busybee_returncode send(const po6::net::location& to,
std::auto_ptr<e::buffer> msg);
busybee_returncode recv(po6::net::location* from,
@@ -100,6 +108,9 @@ class busybee_st
e::nonblocking_bounded_fifo<message> m_incoming;
std::vector<std::tr1::shared_ptr<channel> > m_channels;
e::nonblocking_bounded_fifo<pending> m_postponed;
+ int m_timeout;
+ int m_external_fd;
+ int m_external_events;
};
#endif // busybee_st_h_
View
@@ -58,6 +58,14 @@ class busybee_sta
~busybee_sta() throw ();
public:
+ void set_timeout(int timeout);
+
+ public:
+ int add_external_fd(int fd, uint32_t events);
+ void get_last_external(int* fd, uint32_t* events);
+
+ public:
+ busybee_returncode drop(const po6::net::location& to);
busybee_returncode send(const po6::net::location& to,
std::auto_ptr<e::buffer> msg);
busybee_returncode recv(po6::net::location* from,
@@ -119,6 +127,9 @@ class busybee_sta
e::nonblocking_bounded_fifo<message> m_incoming;
std::vector<std::tr1::shared_ptr<channel> > m_channels;
e::nonblocking_bounded_fifo<pending> m_postponed;
+ int m_timeout;
+ int m_external_fd;
+ int m_external_events;
};
#endif // busybee_sta_h_

0 comments on commit cbee470

Please sign in to comment.