Permalink
Browse files

Fix crash on read action after write.

Also many previous fixups commented as suspicious
  • Loading branch information...
klirichek authored and adriannuta committed Aug 20, 2018
1 parent ba0da7b commit 46890e70eb8dcd00c1bf1b030538b8f131c601c2
Showing with 49 additions and 19 deletions.
  1. +49 −15 src/searchdha.cpp
  2. +0 −4 src/searchdha.h
@@ -1703,7 +1703,7 @@ void AgentConn_t::ReportFinish ( bool bSuccess )
{
if ( m_pReporter )
m_pReporter->Report ( bSuccess );
m_iRetries = -1; // avoid any accidental retry in future
m_iRetries = -1; // avoid any accidental retry in future. fixme! better investigate why such accident may happen
}
/// switch from 'connecting' to 'healthy' state.
@@ -1810,6 +1810,9 @@ void AgentConn_t::TimeoutCallback ()
}
}
// fixme! Actually this must never happens.
// So, explicit sphWarning used to detect if it actually called
// the reason for orphanes is suggested to be combined write, then read in netloop with epoll
bool AgentConn_t::CheckOrphaned()
{
// check if we accidentally orphaned (that is bug!)
@@ -1978,7 +1981,7 @@ inline SSIZE_T AgentConn_t::SendChunk ()
struct msghdr dHdr = { 0 };
dHdr.msg_iov = m_dIOVec.IOPtr ();
dHdr.msg_iovlen = m_dIOVec.IOSize ();
return ::sendmsg ( m_iSock, &dHdr, MSG_NOSIGNAL );
return ::sendmsg ( m_iSock, &dHdr, MSG_NOSIGNAL | MSG_DONTWAIT );
}
#endif
@@ -2662,7 +2665,8 @@ struct Task_t
int64_t m_iPlannedTimeout = 0; // asked timeout (-1 - delete task, 0 - no changes; >0 - set value)
int m_iTimeoutIdx = -1; // idx inside timeouts bin heap (or -1 if not there)
int m_ifd = -1;
int m_iStoredfd = -1; //
int m_iStoredfd = -1; // helper to find original fd if socket was closed
int m_iTickProcessed=0; // tick # to detect and avoid reading callback in same loop with write
BYTE m_uIOActive = NO; // active IO callbacks: 0-none, 1-r+w, 2-r
BYTE m_uIOChanged = NO; // need IO changes: dequeue (if !m_uIOActive), 1-set to rw, 2-set to ro
};
@@ -3448,6 +3452,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
TimeoutQueue_c m_dTimeouts;
SphThread_t m_dWorkingThread;
int m_iLastReportedErrno = -1;
volatile int m_iTickNo = 1;
int64_t m_iNextTimeoutUS = 0;
private:
@@ -3512,8 +3517,8 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
void ProcessChanges ( Task_t * pTask )
{
sphLogDebugL ( "L ProcessChanges for %p, (conn %p) (%d->%d), tm(%d)=" INT64_FMT, pTask, pTask->m_pPayload,
pTask->m_uIOActive, pTask->m_uIOChanged, pTask->m_bHardTimeout, pTask->m_iTimeoutTime);
sphLogDebugL ( "L ProcessChanges for %p, (conn %p) (%d->%d), tm=" INT64_FMT, pTask, pTask->m_pPayload,
pTask->m_uIOActive, pTask->m_uIOChanged, pTask->m_iTimeoutTime);
assert ( pTask->m_iTimeoutTime!=0);
@@ -3614,7 +3619,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
* However if right now something came to the socket, next call to poller might
* signal it, and we catch the events on the next round.
*/
sphLogDebugL ( "L timeout action started %d", bHardTimeout );
sphLogDebugL ( "L timeout action started" );
pKeepConn->TimeoutCallback ();
sphLogDebugL ( "L timeout action finished" );
}
@@ -3636,11 +3641,18 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
}
}
inline bool IsTickProcessed ( Task_t * pTask )
{
if ( !pTask )
return false;
return pTask->m_iTickProcessed==m_iTickNo;
}
/// one event cycle.
/// \return false to stop event loop and exit.
bool EventTick () REQUIRES ( LazyThread )
{
sphLogDebugL ( "L ---------------------------- EventTick()" );
sphLogDebugL ( "L ---------------------------- EventTick(%d)", m_iTickNo );
do
ProcessEnqueuedTasks ();
while ( HasTimeoutActions () );
@@ -3652,6 +3664,20 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
auto iEvents = events_wait ( m_iNextTimeoutUS );
auto iWaited = sphMicroTimer() - iStarted;
// tick # allows to trace different events over one and same task.
// Say, write action processing may initiate reading, or even
// invalidate connection closing it and releasing.
// If later in the same loop we have same task for another action, such changed state
// may cause crash (say, if underlying connection is released and deleted).
// With epoll we have only one task which may be both 'write' and 'read' state,
// so it seems that just do one ELSE another should always work.
// But on BSD we have separate event for read and another for write.
// If one processed, no guarantee that another is not in the same resultset.
// For this case we actualize tick # on processing and then compare it with current one.
++m_iTickNo;
if ( !m_iTickNo ) ++m_iTickNo; // skip 0
if ( g_bShutdown )
{
AbortScheduled();
@@ -3695,9 +3721,11 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
else
sphLogDebugL ( "L event action for task %p(%d), %d", pTask, pTask->m_ifd, tEvent.GetEvents () );
// part of consequencing crash catching; m.b. not actual anymore if no warnings fired
// (stuff supporting IsNotHere is not necessary also in case).
if ( m_dTimeouts.IsNotHere ( pTask ) )
{
sphLogDebugL ( "L phantom event detected! %p(%d), %d", pTask, pTask->m_ifd, tEvent.GetEvents () );
sphWarning ( "phantom event detected! %p(%d), %d", pTask, pTask->m_ifd, tEvent.GetEvents () );
continue;
}
@@ -3714,11 +3742,12 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
}
auto pConn = pTask->m_pPayload;
if ( pConn && pTask->m_uIOActive )
if ( pConn && pTask->m_uIOActive && !IsTickProcessed ( pTask ) )
{
if ( bError )
{
sphLogDebugL ( "L error action %p, waited " INT64_FMT, pTask, iWaited );
pTask->m_iTickProcessed = m_iTickNo;
pConn->ErrorCallback ( iWaited );
sphLogDebugL ( "L error action %p completed", pTask );
} else
@@ -3728,21 +3757,23 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
if ( !bEof )
{
sphLogDebugL ( "L write action %p, waited " INT64_FMT ", transferred %d", pTask, iWaited, tEvent.BytesTransferred () );
pTask->m_iTickProcessed = m_iTickNo;
pConn->SendCallback ( iWaited, tEvent.BytesTransferred () );
sphLogDebugL ( "L write action %p completed", pTask );
} else
sphLogDebugL ( "L write action avoid because of eof", pTask );
sphLogDebugL ( "L write action avoid because of eof or same-generation tick", pTask );
}
if ( tEvent.IsRead () )
if ( tEvent.IsRead () && !IsTickProcessed ( pTask ) )
{
sphLogDebugL ( "L read action %p, waited " INT64_FMT ", transferred %d", pTask, iWaited, tEvent.BytesTransferred () );
pTask->m_iTickProcessed = m_iTickNo;
pConn->RecvCallback ( iWaited, tEvent.BytesTransferred () );
sphLogDebugL ( "L read action %p completed", pTask );
}
}
}
}
} // 'for' loop over ready events
return true;
}
@@ -3931,7 +3962,11 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver
bool IsDone () const final
{
if ( m_iTasks>=0 )
return m_iFinished==m_iTasks;
{
if ( m_iFinished > m_iTasks )
sphWarning ("Orphaned chain detected (expected %d, got %d)", m_iTasks, (int) m_iFinished );
return m_iFinished>=m_iTasks;
}
return false;
}
@@ -3957,8 +3992,7 @@ class CRemoteAgentsObserver : public IRemoteAgentsObserver
{
return m_iFinished;
}
protected:
~CRemoteAgentsObserver() final {}
private:
CSphAutoEvent m_tChanged; ///< the signaller
CSphAtomic m_iSucceeded { 0 }; //< num of tasks finished successfully
@@ -595,7 +595,6 @@ class IRemoteAgentsObserver : public IReporter_t
{
public:
// get num of succeeded agents
virtual long GetSucceeded () const = 0;
@@ -607,9 +606,6 @@ class IRemoteAgentsObserver : public IReporter_t
// block execution while some works finished
virtual void WaitChanges () = 0;
protected:
~IRemoteAgentsObserver() override {}
};
IRemoteAgentsObserver * GetObserver ();

0 comments on commit 46890e7

Please sign in to comment.