Permalink
Browse files

Fix confusion of tasks due to removed one

Active heap of timeouts uses timestamp values.
When we remove a task we set timestamp to -1 which confuses heap work.
This fix added 'planned' timeout for that purpose, releasing main one
only for heap maanagement.
  • Loading branch information...
klirichek committed Aug 14, 2018
1 parent 72c2fd7 commit 53deecfecb374431f84f8592a1095a77407b8aea
Showing with 30 additions and 26 deletions.
  1. +30 −26 src/searchdha.cpp
@@ -2337,7 +2337,7 @@ bool AgentConn_t::ReceiveAnswer ( DWORD uRecv )
if ( !iRest ) // not only handshake, but whole header is here
{
auto uStat = dBuf.GetWord ();
auto uVer = dBuf.GetWord (); // there is version here. But it is not used.
auto VARIABLE_IS_NOT_USED uVer = dBuf.GetWord (); // there is version here. But it is not used.
auto iReplySize = dBuf.GetInt ();
sphLogDebugA ( "%d Header (Status=%d, Version=%d, answer need %d bytes)", m_iStoreTag, uStat, uVer, iReplySize );
@@ -2567,13 +2567,14 @@ struct Task_t
{
enum IO : BYTE { NO = 0, RW = 1, RO = 2 };
AgentConn_t * m_pPayload = nullptr; // ext conn we hold
int64_t m_iTimeoutTime = -1; // timeout (used for bin heap morph)
int m_iTimeoutIdx = -1; // idx inside timeouts bin heap (or -1 if not there)
int m_ifd = -1;
bool m_bHardTimeout = false;
BYTE m_uIOActive = NO; // active IO callbacks: 0-none, 1-r+w, 2-r
BYTE m_uIOChanged = NO; // need IO changes: dequeue (if !m_uIOActive), 1-set to rw, 2-set to ro
AgentConn_t * m_pPayload = nullptr; // ext conn we hold
int64_t m_iTimeoutTime = -1; // active timeout (used for bin heap morph)
int64_t m_iPlannedTimeout = 0; // asked timeout (-1 - delete task, 0 - no changes; >0 - set value)
int m_iTimeoutIdx = -1; // idx inside timeouts bin heap (or -1 if not there)
int m_ifd = -1;
bool m_bHardTimeout = false;
BYTE m_uIOActive = NO; // active IO callbacks: 0-none, 1-r+w, 2-r
BYTE m_uIOChanged = NO; // need IO changes: dequeue (if !m_uIOActive), 1-set to rw, 2-set to ro
};
inline static bool operator< (const Task_t& dLeft, const Task_t& dRight )
@@ -3371,8 +3372,8 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
AgentConn_t * DeleteTask ( Task_t * pTask, bool bReleasePayload=true )
{
assert ( pTask );
sphLogDebugL ( "L DeleteTask for %p, (conn %p), release=%d", pTask, pTask->m_pPayload, bReleasePayload );
assert (pTask->m_uIOChanged == 0);
sphLogDebugL ( "L DeleteTask for %p, (conn %p, io %d), release=%d", pTask, pTask->m_pPayload, pTask->m_uIOActive, bReleasePayload );
pTask->m_uIOChanged = 0;
events_change_io ( pTask );
auto pConnection = pTask->m_pPayload;
pTask->m_pPayload = nullptr;
@@ -3414,7 +3415,7 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
assert ( pTask->m_iTimeoutTime!=0);
if ( pTask->m_iTimeoutTime<0 ) // process 'finish' socket.
if ( pTask->m_iPlannedTimeout<0 ) // process delete.
{
sphLogDebugL ( "L finally remove task %p", pTask );
m_dTimeouts.Remove ( pTask );
@@ -3428,12 +3429,15 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
if ( pTask->m_uIOChanged )
events_change_io ( pTask );
m_dTimeouts.Change ( pTask );
sphLogDebugL ( "L change/add timeout for %p, " INT64_FMT " (%d) is changed one", pTask,
pTask->m_iTimeoutTime, ( int ) ( pTask->m_iTimeoutTime - sphMicroTimer () ) );
sphLogDebugL ( "%s", m_dTimeouts.DebugDump ( "L " ).cstr () );
if ( pTask->m_iPlannedTimeout )
{
pTask->m_iTimeoutTime = pTask->m_iPlannedTimeout;
pTask->m_iPlannedTimeout = 0;
m_dTimeouts.Change ( pTask );
sphLogDebugL ( "L change/add timeout for %p, " INT64_FMT " (%d) is changed one", pTask, pTask->m_iTimeoutTime,
( int ) ( pTask->m_iTimeoutTime - sphMicroTimer () ) );
sphLogDebugL ( "%s", m_dTimeouts.DebugDump ( "L " ).cstr () );
}
}
/// take current internal and external queues, parse it and enqueue changes.
@@ -3442,14 +3446,14 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
{
sphLogDebugL ( "L ProcessEnqueuedTasks" );
auto uStartLen = m_dInternalTasks.GetLength ();
auto VARIABLE_IS_NOT_USED uStartLen = m_dInternalTasks.GetLength ();
auto pExternalQueue = PopQueue ();
if ( pExternalQueue )
m_dInternalTasks.Append ( *pExternalQueue );
SafeDelete ( pExternalQueue );
auto uLastLen = m_dInternalTasks.GetLength ();
auto VARIABLE_IS_NOT_USED uLastLen = m_dInternalTasks.GetLength ();
m_dInternalTasks.Uniq ();
if ( m_dInternalTasks.IsEmpty () )
@@ -3672,19 +3676,19 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
Task_t * pTask = CreateNewTask ( pConnection );
assert ( pTask );
assert ( iTimeoutMS!=-1 );
assert ( iTimeoutMS>0 );
// check for same timeout as we have. Avoid dupes, if so.
sphLogDebugv ( "- %d EnqueueNewTask invoked with pconn=%p, ts=" INT64_FMT ", ActivateIO=%d"
, pConnection->m_iStoreTag, pConnection, iTimeoutMS, uActivateIO );
pTask->m_iTimeoutTime = iTimeoutMS;
pTask->m_iPlannedTimeout = iTimeoutMS;
pTask->m_bHardTimeout = bTimeoutKind;
if ( uActivateIO )
pTask->m_uIOChanged = uActivateIO;
sphLogDebugv ( "- %d EnqueueNewTask enqueueing (task %p) " INT64_FMT " Us, IO(%d->%d)", pConnection->m_iStoreTag, pTask, pTask->m_iTimeoutTime, pTask->m_uIOActive, pTask->m_uIOChanged );
sphLogDebugv ( "- %d EnqueueNewTask enqueueing (task %p) " INT64_FMT " Us, IO(%d->%d)", pConnection->m_iStoreTag, pTask, iTimeoutMS, pTask->m_uIOActive, pTask->m_uIOChanged );
AddToQueue ( pTask, pConnection->InNetLoop () );
// for win it is vitable important to apply changes immediately,
@@ -3702,10 +3706,10 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
assert ( pTask );
// check for same timeout as we have. Avoid dupes, if so.
if ( pTask->m_iTimeoutTime==iTimeoutMS )
if ( !iTimeoutMS || pTask->m_iTimeoutTime==iTimeoutMS )
return;
pTask->m_iTimeoutTime = iTimeoutMS;
pTask->m_iPlannedTimeout = iTimeoutMS;
// case of delete: pConn socket m.b. already closed and ==-1. Actualize it right now.
if ( iTimeoutMS<0 )
@@ -3714,8 +3718,8 @@ class LazyNetEvents_c : ISphNoncopyable, protected NetEventsFlavour_c
pConnection->m_pPollerTask = nullptr; // this will allow to create another task.
}
sphLogDebugv ( "- %d ChangeDeleteTask enqueueing (task %p), fd=%d " INT64_FMT " Us", pConnection->m_iStoreTag
, pTask, pTask->m_ifd, pTask->m_iTimeoutTime );
sphLogDebugv ( "- %d ChangeDeleteTask enqueueing (task %p), fd=%d " INT64_FMT " Us -> " INT64_FMT " Us", pConnection->m_iStoreTag
, pTask, pTask->m_ifd, pTask->m_iTimeoutTime, iTimeoutMS );
AddToQueue ( pTask, pConnection->InNetLoop () );
}

0 comments on commit 53deecf

Please sign in to comment.