Skip to content

Commit

Permalink
Fix kqueue/epoll for attached read/write pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm1 committed May 6, 2009
1 parent eae390a commit c4581c6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 20 deletions.
84 changes: 66 additions & 18 deletions ext/ed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em):
LastIo (gCurrentLoopTime),
InactivityTimeout (0)
{
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLOUT;
#endif
// 22Jan09: Moved ArmKqueueWriter into SetConnectPending() to fix assertion failure in _WriteOutboundData()
// 5May09: Moved EPOLLOUT into SetConnectPending() so it doesn't happen for attached read pipes
}


Expand Down Expand Up @@ -275,9 +273,60 @@ ConnectionDescriptor::SetConnectPending
void ConnectionDescriptor::SetConnectPending(bool f)
{
bConnectPending = f;
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueWriter (this);
#endif

if (bConnectPending) { // not yet connected, select for writability
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLOUT;
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueWriter (this);
#endif
} else { // connected, wait for incoming data and write out any pending outgoing data
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0);
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueReader (this);
if (SelectForWrite())
MyEventMachine->ArmKqueueWriter (this);
#endif
}
}


/***************************************
ConnectionDescriptor::SetNotifyReadable
****************************************/

void ConnectionDescriptor::SetNotifyReadable(bool readable)
{
bNotifyReadable = readable;
if (bNotifyReadable) {
#ifdef HAVE_EPOLL
EpollEvent.events |= EPOLLIN;
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueReader (this);
#endif
}
}


/***************************************
ConnectionDescriptor::SetNotifyWritable
****************************************/

void ConnectionDescriptor::SetNotifyWritable(bool writable)
{
bNotifyWritable = writable;
if (bNotifyWritable) {
#ifdef HAVE_EPOLL
EpollEvent.events |= EPOLLOUT;
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueWriter (this);
#endif
}
}


Expand Down Expand Up @@ -333,10 +382,12 @@ int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length)
char *buffer = (char *) malloc (length + 1);
if (!buffer)
throw std::runtime_error ("no allocation for outbound data");

memcpy (buffer, data, length);
buffer [length] = 0;
OutboundPages.push_back (OutboundPage (buffer, length));
OutboundDataSize += length;

#ifdef HAVE_EPOLL
EpollEvent.events = (EPOLLIN | EPOLLOUT);
assert (MyEventMachine);
Expand All @@ -345,6 +396,7 @@ int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length)
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueWriter (this);
#endif

return length;
}

Expand Down Expand Up @@ -579,17 +631,10 @@ void ConnectionDescriptor::Write()
if ((o == 0) && (error == 0)) {
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_COMPLETED, "", 0);
bConnectPending = false;
#ifdef HAVE_EPOLL
// The callback may have scheduled outbound data.
EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0);
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueReader (this);
// The callback may have scheduled outbound data.
if (SelectForWrite())
MyEventMachine->ArmKqueueWriter (this);
#endif

// 5May09: Moved epoll/kqueue read/write arming into SetConnectPending, so it can be called
// from EventMachine_t::AttachFD as well.
SetConnectPending (false);
}
else
ScheduleClose (false);
Expand Down Expand Up @@ -1394,11 +1439,13 @@ int DatagramDescriptor::SendOutboundData (const char *data, int length)
buffer [length] = 0;
OutboundPages.push_back (OutboundPage (buffer, length, ReturnAddress));
OutboundDataSize += length;

#ifdef HAVE_EPOLL
EpollEvent.events = (EPOLLIN | EPOLLOUT);
assert (MyEventMachine);
MyEventMachine->Modify (this);
#endif

return length;
}

Expand Down Expand Up @@ -1438,7 +1485,6 @@ int DatagramDescriptor::SendOutboundDatagram (const char *data, int length, cons
pin.sin_port = htons (port);



if (!data && (length > 0))
throw std::runtime_error ("bad outbound data");
char *buffer = (char *) malloc (length + 1);
Expand All @@ -1448,11 +1494,13 @@ int DatagramDescriptor::SendOutboundDatagram (const char *data, int length, cons
buffer [length] = 0;
OutboundPages.push_back (OutboundPage (buffer, length, pin));
OutboundDataSize += length;

#ifdef HAVE_EPOLL
EpollEvent.events = (EPOLLIN | EPOLLOUT);
assert (MyEventMachine);
MyEventMachine->Modify (this);
#endif

return length;
}

Expand Down
4 changes: 2 additions & 2 deletions ext/ed.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ class ConnectionDescriptor: public EventableDescriptor

void SetConnectPending (bool f);

void SetNotifyReadable (bool readable) { bNotifyReadable = readable; }
void SetNotifyWritable (bool writable) { bNotifyWritable = writable; }
void SetNotifyReadable (bool);
void SetNotifyWritable (bool);

virtual void Read();
virtual void Write();
Expand Down

0 comments on commit c4581c6

Please sign in to comment.