Skip to content

Commit

Permalink
SERVER-2771 add synchronous kill functionality for background indexin…
Browse files Browse the repository at this point in the history
…g support
  • Loading branch information
milkie committed Oct 10, 2012
1 parent 7922e03 commit 800eea5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 19 deletions.
22 changes: 19 additions & 3 deletions src/mongo/db/curop.cpp
Expand Up @@ -18,6 +18,7 @@

#include "mongo/db/curop.h"
#include "mongo/db/database.h"
#include "mongo/db/kill_current_op.h"

namespace mongo {

Expand Down Expand Up @@ -46,7 +47,8 @@ namespace mongo {
_end = 0;
_message = "";
_progressMeter.finished();
_killed = false;
_killPending.store(0);
killCurrentOp.notifyAllWaiters();
_numYields = 0;
_expectedLatencyMs = 0;
_lockStat.reset();
Expand Down Expand Up @@ -97,6 +99,8 @@ namespace mongo {
}

CurOp::~CurOp() {
killCurrentOp.notifyAllWaiters();

if ( _wrapped ) {
scoped_lock bl(Client::clientsMutex);
_client->_curOp = _wrapped;
Expand Down Expand Up @@ -173,15 +177,27 @@ namespace mongo {
}
}

if( killed() )
b.append("killed", true);
if( killPending() )
b.append("killPending", true);

b.append( "numYields" , _numYields );
b.append( "lockStats" , _lockStat.report() );

return b.obj();
}

void CurOp::setKillWaiterFlags() {
for (size_t i = 0; i < _notifyList.size(); ++i)
*(_notifyList[i]) = true;
_notifyList.clear();
}

void CurOp::kill(bool* pNotifyFlag /* = NULL */) {
_killPending.store(1);
if (pNotifyFlag) {
_notifyList.push_back(pNotifyFlag);
}
}

AtomicUInt CurOp::_nextOpNum;

Expand Down
11 changes: 7 additions & 4 deletions src/mongo/db/curop.h
Expand Up @@ -223,8 +223,8 @@ namespace mongo {
string getMessage() const { return _message.toString(); }
ProgressMeter& getProgressMeter() { return _progressMeter; }
CurOp *parent() const { return _wrapped; }
void kill() { _killed = true; }
bool killed() const { return _killed; }
void kill(bool* pNotifyFlag = NULL);
bool killPending() const { return _killPending.load(); }
void yielded() { _numYields++; }
int numYields() const { return _numYields; }
void suppressFromCurop() { _suppressFromCurop = true; }
Expand All @@ -236,6 +236,8 @@ namespace mongo {

const LockStat& lockStat() const { return _lockStat; }
LockStat& lockStat() { return _lockStat; }

void setKillWaiterFlags();
private:
friend class Client;
void _reset();
Expand All @@ -257,14 +259,15 @@ namespace mongo {
OpDebug _debug;
ThreadSafeString _message;
ProgressMeter _progressMeter;
volatile bool _killed;
AtomicInt32 _killPending;
int _numYields;
LockStat _lockStat;
// _notifyList is protected by the global killCurrentOp's mtx.
std::vector<bool*> _notifyList;

// this is how much "extra" time a query might take
// a writebacklisten for example will block for 30s
// so this should be 30000 in that case
long long _expectedLatencyMs;

};
}
46 changes: 37 additions & 9 deletions src/mongo/db/kill_current_op.cpp
Expand Up @@ -41,14 +41,35 @@ namespace mongo {
interruptJs( 0 );
}

void KillCurrentOp::kill(AtomicUInt i) {
bool KillCurrentOp::kill(AtomicUInt i) {
return killImpl(i);
}

void KillCurrentOp::blockingKill(AtomicUInt opId) {
bool killed = false;
LOG(3) << "KillCurrentOp: starting blockingkill" << endl;
boost::unique_lock<boost::mutex> lck(_mtx);
bool foundId = killImpl(opId, &killed);
if (!foundId) return; // don't wait if not found

// block until the killed operation stops
LOG(3) << "KillCurrentOp: waiting for confirmation of kill" << endl;
while (killed == false) {
_condvar.wait(lck);
}
LOG(3) << "KillCurrentOp: kill syncing complete" << endl;
}

bool KillCurrentOp::killImpl(AtomicUInt i, bool* pNotifyFlag /* = NULL */) {
bool found = false;
{
scoped_lock l( Client::clientsMutex );
for( set< Client* >::const_iterator j = Client::clients.begin(); !found && j != Client::clients.end(); ++j ) {
for( set< Client* >::const_iterator j = Client::clients.begin();
!found && j != Client::clients.end();
++j ) {
for( CurOp *k = ( *j )->curop(); !found && k; k = k->parent() ) {
if ( k->opNum() == i ) {
k->kill();
k->kill(pNotifyFlag);
for( CurOp *l = ( *j )->curop(); l != k; l = l->parent() ) {
l->kill();
}
Expand All @@ -60,6 +81,16 @@ namespace mongo {
if ( found ) {
interruptJs( &i );
}
return found;
}


void KillCurrentOp::notifyAllWaiters() {
boost::unique_lock<boost::mutex> lck(_mtx);
if (!haveClient())
return;
cc().curop()->setKillWaiterFlags();
_condvar.notify_all();
}

void KillCurrentOp::checkForInterrupt( bool heedMutex ) {
Expand All @@ -68,7 +99,8 @@ namespace mongo {
return;
if( _globalKill )
uasserted(11600,"interrupted at shutdown");
if( c.curop()->killed() ) {
if( c.curop()->killPending() ) {
notifyAllWaiters();
uasserted(11601,"operation was interrupted");
}
}
Expand All @@ -77,12 +109,8 @@ namespace mongo {
Client& c = cc();
if( _globalKill )
return "interrupted at shutdown";
if( c.curop()->killed() )
if( c.curop()->killPending() )
return "interrupted";
return "";
}




}
35 changes: 32 additions & 3 deletions src/mongo/db/kill_current_op.h
Expand Up @@ -22,6 +22,7 @@
#include <boost/thread/mutex.hpp>

#include "mongo/bson/util/atomic_int.h"
#include "mongo/base/disallow_copying.h"

namespace mongo {

Expand All @@ -30,10 +31,23 @@ namespace mongo {
this class does not handle races between interruptJs and the checkForInterrupt functions - those must be
handled by the client of this class
*/
extern class KillCurrentOp {
class KillCurrentOp {
MONGO_DISALLOW_COPYING(KillCurrentOp);
public:
KillCurrentOp() : _globalKill(false) {}
void killAll();
void kill(AtomicUInt i);
/**
* @param i opid of operation to kill
* @return if operation was found
**/
bool kill(AtomicUInt i);

/**
* blocks until kill is acknowledged by the killee.
*
* Note: Does not wait for nested ops, only the top level op.
*/
void blockingKill(AtomicUInt opId);

/** @return true if global interrupt and should terminate the operation */
bool globalInterruptCheck() const { return _globalKill; }
Expand All @@ -46,9 +60,24 @@ namespace mongo {
/** @return "" if not interrupted. otherwise, you should stop. */
const char *checkForInterruptNoAssert();

/** set all flags for all the threads waiting for the current thread's operation to
* end; part of internal synchronous kill mechanism
**/
void notifyAllWaiters();

private:
void interruptJs( AtomicUInt *op );
volatile bool _globalKill;
} killCurrentOp;
boost::condition _condvar;
boost::mutex _mtx;

/**
* @param i opid of operation to kill
* @param pNotifyFlag optional bool to be set to true when kill actually happens
* @return if operation was found
**/
bool killImpl(AtomicUInt i, bool* pNotifyFlag = NULL);
};

extern KillCurrentOp killCurrentOp;
}

0 comments on commit 800eea5

Please sign in to comment.