Skip to content
Permalink
Browse files

Skip phantom (already closed, but not finally deleted from the poller…

…) connections
  • Loading branch information
klirichek committed Aug 15, 2018
1 parent e61ec00 commit 5fba49d5e28c17de4c0acbd984466127f42de6e8
Showing with 41 additions and 10 deletions.
  1. +30 −10 src/searchdha.cpp
  2. +11 −0 src/sphinxstd.h
@@ -1783,6 +1783,7 @@ void AgentConn_t::AbortCallback()

void AgentConn_t::ErrorCallback ( int64_t iWaited )
{
SetNetLoop ();
if ( !m_pPollerTask )
return;
m_iWaited += iWaited;
@@ -1794,6 +1795,7 @@ void AgentConn_t::ErrorCallback ( int64_t iWaited )

void AgentConn_t::SendCallback ( int64_t iWaited, DWORD uSent )
{
SetNetLoop ();
if ( !m_pPollerTask )
return;

@@ -1808,6 +1810,7 @@ void AgentConn_t::SendCallback ( int64_t iWaited, DWORD uSent )

void AgentConn_t::RecvCallback ( int64_t iWaited, DWORD uReceived )
{
SetNetLoop ();
if ( !m_pPollerTask )
return;

@@ -2607,6 +2610,7 @@ struct Task_t
int64_t m_iPlannedTimeout = 0; // asked timeout (-1 - delete task, 0 - no changes; >0 - set value)
int m_iTimeoutIdx = -1; // idx inside timeouts bin heap (or -1 if not there)
int m_ifd = -1;
int m_iStoredfd = -1; //
bool m_bHardTimeout = false;
BYTE m_uIOActive = NO; // active IO callbacks: 0-none, 1-r+w, 2-r
BYTE m_uIOChanged = NO; // need IO changes: dequeue (if !m_uIOActive), 1-set to rw, 2-set to ro
@@ -2621,6 +2625,7 @@ inline static bool operator< (const Task_t& dLeft, const Task_t& dRight )
class TimeoutQueue_c
{
CSphTightVector<Task_t*> m_dQueue;
CSphTightVector<uintptr_t> m_dCloud;

inline void ShiftUp ( int iHole )
{
@@ -2670,6 +2675,8 @@ class TimeoutQueue_c

m_dQueue.Add ( pTask );
ShiftUp ( m_dQueue.GetLength () - 1 );
m_dCloud.Add ( ( uintptr_t ) pTask );
m_dCloud.Uniq();
}

/// remove root (ie. top priority) entry
@@ -2679,6 +2686,7 @@ class TimeoutQueue_c
return;

m_dQueue[0]->m_iTimeoutIdx = -1;
Verify (m_dCloud.RemoveValueFromSorted ( ( uintptr_t ) m_dQueue[0] ) );
m_dQueue.RemoveFast (0);
ShiftDown(0);
}
@@ -2712,10 +2720,11 @@ class TimeoutQueue_c
if ( iHole<0 || iHole>=m_dQueue.GetLength () )
return;

Verify ( m_dCloud.RemoveValueFromSorted ( ( uintptr_t ) pTask ) );
m_dQueue.RemoveFast ( iHole );
if ( iHole<m_dQueue.GetLength () )
{
if ( iHole && iHole<m_dQueue.GetLength() && *m_dQueue[iHole]<*m_dQueue[( iHole - 1 ) / 2] )
if ( iHole && *m_dQueue[iHole]<*m_dQueue[( iHole - 1 ) / 2] )
ShiftUp ( iHole );
else
ShiftDown ( iHole );
@@ -2728,6 +2737,11 @@ class TimeoutQueue_c
return m_dQueue.IsEmpty ();
}

inline bool IsNotHere ( const Task_t * pTask ) const
{
return !m_dCloud.BinarySearch ( ( uintptr_t ) pTask );
}

/// get minimal (root) elem
inline Task_t * Root () const
{
@@ -3007,6 +3021,7 @@ class NetEventsFlavour_c
{
protected:
int m_iEvents = 0; ///< how many events are in queue.
static const int m_iCReserve = 256; /// will always provide extra that space of events to poller

// perform actual changing of pTask subscription state.
// NOTE! m_uIOChanged==0 field here means active 'unsubscribe' (in use for deletion)
@@ -3041,7 +3056,7 @@ void events_change_io ( Task_t * pTask )
// fixme! m.b. more workers, or just one enough?
m_IOCP = CreateIoCompletionPort ( INVALID_HANDLE_VALUE, NULL, 0, 1 );
sphLogDebugL ( "L IOCP %d created", m_IOCP );

m_dReady.Reserve (m_iCReserve + iSizeHint);
}

inline void events_destroy ()
@@ -3131,7 +3146,7 @@ void events_change_io ( Task_t * pTask )
{
ULONG uReady = 0;
DWORD uTimeout = ( iTimeoutUS>=0 ) ? ( iTimeoutUS/1000 ) : INFINITE;
m_dReady.Resize ( m_iEvents+1 ); // +1 since our signaler is not added as resident of the queue
m_dReady.Resize ( m_iEvents+m_iCReserve ); // +1 since our signaler is not added as resident of the queue
if ( !GetQueuedCompletionStatusEx ( m_IOCP, m_dReady.Begin (), m_dReady.GetLength (), &uReady, uTimeout, FALSE ) )
{
auto iErr = GetLastError ();
@@ -3187,7 +3202,7 @@ void events_change_io ( Task_t * pTask )
m_iEFD = epoll_create ( iSizeHint ); // 1000 is dummy, see man
if ( m_iEFD==-1 )
sphDie ( "failed to create epoll main FD, errno=%d, %s", errno, strerrorm ( errno ) );

m_dReady.Reserve ( m_iCReserve + iSizeHint );
sphLogDebugv ( "epoll %d created", m_iEFD );
}

@@ -3250,7 +3265,7 @@ void events_change_io ( Task_t * pTask )

inline int events_wait ( int64_t iTimeoutUS )
{
m_dReady.Resize ( m_iEvents );
m_dReady.Resize ( m_iEvents + m_iCReserve );
int iTimeoutMS = iTimeoutUS<0 ? -1 : ( ( iTimeoutUS + 500 ) / 1000 );
return epoll_wait ( m_iEFD, m_dReady.Begin (), m_dReady.GetLength (), iTimeoutMS );
};
@@ -3278,6 +3293,7 @@ void events_change_io ( Task_t * pTask )

sphLogDebugv ( "kqueue %d created", m_iEFD );
m_dScheduled.Reserve ( iSizeHint * 2 );
m_dReady.Reserve ( iSizeHint * 2 + m_iCReserve );
}

int events_apply_task_changes ( Task_t * pTask )
@@ -3339,7 +3355,7 @@ void events_change_io ( Task_t * pTask )

inline int events_wait ( int64_t iTimeoutUS )
{
m_dReady.Resize ( m_iEvents + m_dScheduled.GetLength () );
m_dReady.Resize ( m_iEvents + m_dScheduled.GetLength () + m_iCReserve );
timespec ts;
timespec * pts = nullptr;
if ( iTimeoutUS>=0 )
@@ -3388,7 +3404,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
inline Task_t * CreateNewTask ( AgentConn_t * pConnection )
{
auto pTask = new Task_t;
pTask->m_ifd = pConnection->m_iSock;
pTask->m_ifd = pTask->m_iStoredfd = pConnection->m_iSock;
pTask->m_pPayload = pConnection;
pConnection->m_pPollerTask = pTask;
pConnection->AddRef ();
@@ -3609,6 +3625,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
return true;
}
sphLogDebugL ( "L poller wait returned %d events from %d", iEvents, m_iEvents );
m_dReady.Resize ( iEvents );

/// we have some events to speak about...
for ( int i = 0; i<iEvents; ++i )
@@ -3632,6 +3649,12 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
else
sphLogDebugL ( "L event action for task %p(%d), %d", pTask, pTask->m_ifd, tEvent.GetEvents () );

if ( m_dTimeouts.IsNotHere ( pTask ) )
{
sphLogDebugL ( "L phantom event detected! %p(%d), %d", pTask, pTask->m_ifd, tEvent.GetEvents () );
continue;
}

bool bError = tEvent.IsError ();
bool bEof = tEvent.IsEof ();
if ( bError )
@@ -3650,7 +3673,6 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
if ( bError )
{
sphLogDebugL ( "L error action %p, waited " INT64_FMT, pTask, iWaited );
pConn->SetNetLoop ();
pConn->ErrorCallback ( iWaited );
sphLogDebugL ( "L error action %p completed", pTask );
} else
@@ -3660,7 +3682,6 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
if ( !bEof )
{
sphLogDebugL ( "L write action %p, waited " INT64_FMT ", transferred %d", pTask, iWaited, tEvent.BytesTransferred () );
pConn->SetNetLoop ();
pConn->SendCallback ( iWaited, tEvent.BytesTransferred () );
sphLogDebugL ( "L write action %p completed", pTask );
} else
@@ -3670,7 +3691,6 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
if ( tEvent.IsRead () )
{
sphLogDebugL ( "L read action %p, waited " INT64_FMT ", transferred %d", pTask, iWaited, tEvent.BytesTransferred () );
pConn->SetNetLoop ();
pConn->RecvCallback ( iWaited, tEvent.BytesTransferred () );
sphLogDebugL ( "L read action %p completed", pTask );
}
@@ -1119,6 +1119,17 @@ template < typename T, typename POLICY=CSphVectorPolicy<T> > class CSphVector
return false;
}

/// remove element by value, asuming vec is sorted/uniq
bool RemoveValueFromSorted ( T tValue )
{
T* pValue = BinarySearch (tValue);
if ( !pValue )
return false;

RemoveFast ( pValue - Begin() );
return true;
}

/// pop last value
const T & Pop ()
{

0 comments on commit 5fba49d

Please sign in to comment.
You can’t perform that action at this time.