Skip to content

Commit

Permalink
make a long running operation terminate if the client has disconnecte…
Browse files Browse the repository at this point in the history
…d. we check for this at the beginning

of every request but intra request.  if a request will run for a very long time this is bad.

probably not for backporting -- might be nuances need to smoke for a while

also some yield code cleaning, really just making it easier to read.
  • Loading branch information
dwight committed Oct 11, 2011
1 parent 9701445 commit 08c3888
Show file tree
Hide file tree
Showing 17 changed files with 97 additions and 33 deletions.
4 changes: 2 additions & 2 deletions db/client.cpp
Expand Up @@ -578,9 +578,9 @@ namespace mongo {

time = min( time , 1000000 );

// there has been a kill request for this op - we should yield to allow the op to stop
// if there has been a kill request for this op - we should yield to allow the op to stop
// This function returns empty string if we aren't interrupted
if ( killCurrentOp.checkForInterruptNoAssert( false )[0] != '\0' ) {
if ( *killCurrentOp.checkForInterruptNoAssert() ) {
return 100;
}

Expand Down
13 changes: 11 additions & 2 deletions db/client.h
Expand Up @@ -131,6 +131,16 @@ namespace mongo {

friend class CurOp;

unsigned _sometimes;

public:
/** the concept here is the same as MONGO_SOMETIMES. however that
macro uses a static that will be shared by all threads, and each
time incremented it might eject that line from the other cpu caches (?),
so idea is that this is better.
*/
bool sometimes(unsigned howOften) { return ++_sometimes % howOften == 0; }

public:

/* set _god=true temporarily, safely */
Expand Down Expand Up @@ -214,8 +224,7 @@ namespace mongo {

}; // class Client::Context


};
}; // class Client

/** get the Client object for this thread. */
inline Client& cc() {
Expand Down
23 changes: 14 additions & 9 deletions db/clientcursor.cpp
Expand Up @@ -407,17 +407,19 @@ namespace mongo {
_c->noteLocation();
}

int ClientCursor::yieldSuggest() {
int ClientCursor::suggestYieldMicros() {
int writers = 0;
int readers = 0;

int micros = Client::recommendedYieldMicros( &writers , &readers );

if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ) {
// we have a read lock, and only reads are coming on, so why bother unlocking
micros = 0;
return 0;
}

wassert( micros < 10000000 );
dassert( micros < 1000001 );
return micros;
}

Expand Down Expand Up @@ -452,18 +454,19 @@ namespace mongo {
if ( yielded ) {
*yielded = false;
}
if ( ! _yieldSometimesTracker.ping() ) {
if ( ! _yieldSometimesTracker.intervalHasElapsed() ) {
Record* rec = _recordForYield( need );
if ( rec ) {
// yield for page fault
if ( yielded ) {
*yielded = true;
}
return yield( yieldSuggest() , rec );
return yield( suggestYieldMicros() , rec );
}
return true;
}

int micros = yieldSuggest();
int micros = suggestYieldMicros();
if ( micros > 0 ) {
if ( yielded ) {
*yielded = true;
Expand All @@ -477,8 +480,10 @@ namespace mongo {
killCurrentOp.checkForInterrupt( false );
{
auto_ptr<RWLockRecursive::Shared> lk;
if ( rec )
if ( rec ) {
// need to lock this else rec->touch won't be safe file could disapear etc.
lk.reset( new RWLockRecursive::Shared( MongoFile::mmmutex) );
}

dbtempreleasecond unlock;
if ( unlock.unlocked() ) {
Expand Down Expand Up @@ -554,15 +559,15 @@ namespace mongo {
return true;
}

/** @return true if cursor is still ok */
bool ClientCursor::yield( int micros , Record * recordToLoad ) {
if ( ! _c->supportYields() )

if ( ! _c->supportYields() ) // so me cursors (geo@oct2011) don't support yielding
return true;

YieldData data;
prepareToYield( data );

staticYield( micros , _ns , recordToLoad );

return ClientCursor::recoverFromYield( data );
}

Expand Down
2 changes: 1 addition & 1 deletion db/clientcursor.h
Expand Up @@ -191,7 +191,7 @@ namespace mongo {
*/
bool yieldSometimes( RecordNeeds need, bool *yielded = 0 );

static int yieldSuggest();
static int suggestYieldMicros();
static void staticYield( int micros , const StringData& ns , Record * rec );

struct YieldData { CursorId _id; bool _doingDeletes; };
Expand Down
2 changes: 1 addition & 1 deletion db/compact.cpp
Expand Up @@ -110,7 +110,7 @@ namespace mongo {

// remove the old records (orphan them) periodically so our commit block doesn't get too large
bool stopping = false;
RARELY stopping = *killCurrentOp.checkForInterruptNoAssert(false) != 0;
RARELY stopping = *killCurrentOp.checkForInterruptNoAssert() != 0;
if( stopping || getDur().aCommitIsNeeded() ) {
e->firstRecord.writing() = L;
Record *r = L.rec();
Expand Down
29 changes: 24 additions & 5 deletions db/curop.h
Expand Up @@ -349,22 +349,41 @@ namespace mongo {
bool globalInterruptCheck() const { return _globalKill; }

void checkForInterrupt( bool heedMutex = true ) {
Client& c = cc();
if ( heedMutex && dbMutex.isWriteLocked() )
return;
if( _globalKill )
uasserted(11600,"interrupted at shutdown");
if( cc().curop()->killed() )
if( c.curop()->killed() )
uasserted(11601,"interrupted");
if( c.sometimes(1024) ) {
AbstractMessagingPort *p = cc().port();
if( p )
p->assertStillConnected();
}
}

/** @return "" if not interrupted. otherwise, you should stop. */
const char *checkForInterruptNoAssert( bool heedMutex = true ) {
if ( heedMutex && dbMutex.isWriteLocked() )
return "";
const char *checkForInterruptNoAssert( /*bool heedMutex = true*/ ) {
Client& c = cc();
// always called withi false so commented out:
/*if ( heedMutex && dbMutex.isWriteLocked() )
return "";*/
if( _globalKill )
return "interrupted at shutdown";
if( cc().curop()->killed() )
if( c.curop()->killed() )
return "interrupted";
if( c.sometimes(1024) ) {
try {
AbstractMessagingPort *p = cc().port();
if( p )
p->assertStillConnected();
}
catch(...) {
log() << "no longer connected to client";
return "no longer connected to client";
}
}
return "";
}

Expand Down
2 changes: 1 addition & 1 deletion db/db.cpp
Expand Up @@ -393,7 +393,7 @@ namespace mongo {

const char * jsInterruptCallback() {
// should be safe to interrupt in js code, even if we have a write lock
return killCurrentOp.checkForInterruptNoAssert( false );
return killCurrentOp.checkForInterruptNoAssert();
}

unsigned jsGetInterruptSpecCallback() {
Expand Down
2 changes: 1 addition & 1 deletion db/ops/query.cpp
Expand Up @@ -102,7 +102,7 @@ namespace mongo {
int n = 0;

if ( unlikely(!cc) ) {
log() << "getMore: cursorid not found " << ns << " " << cursorid << endl;
LOGSOME << "getMore: cursorid not found " << ns << " " << cursorid << endl;
cursorid = 0;
resultFlags = ResultFlag_CursorNotFound;
}
Expand Down
2 changes: 1 addition & 1 deletion db/pdfile.cpp
Expand Up @@ -1167,7 +1167,7 @@ namespace mongo {
if( tot > 1000 ) {
static int n;
DEV if( n++ == 0 )
log() << "warning already writable too often" << endl;
log() << "warning upgradeToWritable: already in writable too often" << endl;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions db/queryoptimizer.cpp
Expand Up @@ -661,10 +661,10 @@ namespace mongo {
if ( ! _plans._mayYield )
return;

if ( ! _plans._yieldSometimesTracker.ping() )
if ( ! _plans._yieldSometimesTracker.intervalHasElapsed() )
return;

int micros = ClientCursor::yieldSuggest();
int micros = ClientCursor::suggestYieldMicros();
if ( micros <= 0 )
return;

Expand Down
2 changes: 1 addition & 1 deletion s/d_migrate.cpp
Expand Up @@ -488,7 +488,7 @@ namespace mongo {
scoped_spinlock lk( _trackerLocks );
set<DiskLoc>::iterator i = _cloneLocs.begin();
for ( ; i!=_cloneLocs.end(); ++i ) {
if (tracker.ping()) // should I yield?
if (tracker.intervalHasElapsed()) // should I yield?
break;

DiskLoc dl = *i;
Expand Down
2 changes: 0 additions & 2 deletions server.h
Expand Up @@ -6,8 +6,6 @@
Over time we should move more here, and more out of pch.h. And get rid of pch.h at some point.
*/

// todo is there a boost thign for this already?

#pragma once

#include "bson/inline_decls.h"
Expand Down
6 changes: 3 additions & 3 deletions util/net/listen.h
Expand Up @@ -108,7 +108,7 @@ namespace mongo {
* call this for every iteration
* returns true if one of the triggers has gone off
*/
bool ping() {
bool intervalHasElapsed() {
if ( ( ++_pings % _h ) == 0 ) {
_last = Listener::getElapsedTimeMillis();
return true;
Expand All @@ -124,8 +124,8 @@ namespace mongo {
}

private:
int _h;
int _ms;
const int _h;
const int _ms;

unsigned long long _pings;

Expand Down
4 changes: 4 additions & 0 deletions util/net/message_port.cpp
Expand Up @@ -248,6 +248,10 @@ namespace mongo {
return true;
}

void MessagingPort::assertStillConnected() {
uassert(15901, "client disconnected during operation", Socket::stillConnected());
}

void MessagingPort::say(Message& toSend, int responseTo) {
assert( !toSend.empty() );
mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
Expand Down
3 changes: 2 additions & 1 deletion util/net/message_port.h
Expand Up @@ -37,7 +37,7 @@ namespace mongo {
virtual HostAndPort remote() const = 0;
virtual unsigned remotePort() const = 0;

private:
virtual void assertStillConnected() = 0;

public:
// TODO make this private with some helpers
Expand Down Expand Up @@ -88,6 +88,7 @@ namespace mongo {
unsigned remotePort() const { return Socket::remotePort(); }
virtual HostAndPort remote() const;

void assertStillConnected();

private:

Expand Down
25 changes: 24 additions & 1 deletion util/net/sock.cpp
Expand Up @@ -524,6 +524,27 @@ namespace mongo {
return ::send( _fd , data , len , portSendFlags );
}

bool Socket::stillConnected() {
#ifdef MONGO_SSL
DEV log() << "TODO stillConnected() w/SSL" << endl;
#else
int r = _send("", 0);
if( r < 0 ) {
#if defined(_WIN32)
if ( WSAGetLastError() == WSAETIMEDOUT ) {
#else
if ( ( errno == EAGAIN || errno == EWOULDBLOCK ) ) {
#endif
;
}
else {
return false;
}
}
#endif
return true;
}

// sends all data or throws an exception
void Socket::send( const char * data , int len, const char *context ) {
while( len > 0 ) {
Expand Down Expand Up @@ -571,7 +592,9 @@ namespace mongo {
}
}

// sends all data or throws an exception
/** sends all data or throws an exception
* @param context descriptive for logging
*/
void Socket::send( const vector< pair< char *, int > > &data, const char *context ) {

#ifdef MONGO_SSL
Expand Down
5 changes: 5 additions & 0 deletions util/net/sock.h
Expand Up @@ -212,6 +212,8 @@ namespace mongo {

void setTimeout( double secs );

bool stillConnected();

#ifdef MONGO_SSL
/** secures inline */
void secure( SSLManager * ssl );
Expand All @@ -226,8 +228,11 @@ namespace mongo {

private:
void _init();

/** raw send, same semantics as ::send */
public:
int _send( const char * data , int len );
private:

/** sends dumbly, just each buffer at a time */
void _send( const vector< pair< char *, int > > &data, const char *context );
Expand Down

0 comments on commit 08c3888

Please sign in to comment.