From 08c3888312f26a5bc0560bead47059992009826a Mon Sep 17 00:00:00 2001 From: dwight Date: Tue, 11 Oct 2011 15:13:51 -0400 Subject: [PATCH] make a long running operation terminate if the client has disconnected. we check for this at the beginning of every request but intra request. if a request will run for a very long time this is bad. probably not for backporting -- might be nuances need to smoke for a while also some yield code cleaning, really just making it easier to read. --- db/client.cpp | 4 ++-- db/client.h | 13 +++++++++++-- db/clientcursor.cpp | 23 ++++++++++++++--------- db/clientcursor.h | 2 +- db/compact.cpp | 2 +- db/curop.h | 29 ++++++++++++++++++++++++----- db/db.cpp | 2 +- db/ops/query.cpp | 2 +- db/pdfile.cpp | 2 +- db/queryoptimizer.cpp | 4 ++-- s/d_migrate.cpp | 2 +- server.h | 2 -- util/net/listen.h | 6 +++--- util/net/message_port.cpp | 4 ++++ util/net/message_port.h | 3 ++- util/net/sock.cpp | 25 ++++++++++++++++++++++++- util/net/sock.h | 5 +++++ 17 files changed, 97 insertions(+), 33 deletions(-) diff --git a/db/client.cpp b/db/client.cpp index c082a3d73bf55..fa114d830223e 100644 --- a/db/client.cpp +++ b/db/client.cpp @@ -578,9 +578,9 @@ namespace mongo { time = min( time , 1000000 ); - // there has been a kill request for this op - we should yield to allow the op to stop + // if there has been a kill request for this op - we should yield to allow the op to stop // This function returns empty string if we aren't interrupted - if ( killCurrentOp.checkForInterruptNoAssert( false )[0] != '\0' ) { + if ( *killCurrentOp.checkForInterruptNoAssert() ) { return 100; } diff --git a/db/client.h b/db/client.h index 235f97ce694d4..81025f7ce22b8 100644 --- a/db/client.h +++ b/db/client.h @@ -131,6 +131,16 @@ namespace mongo { friend class CurOp; + unsigned _sometimes; + + public: + /** the concept here is the same as MONGO_SOMETIMES. however that + macro uses a static that will be shared by all threads, and each + time incremented it might eject that line from the other cpu caches (?), + so idea is that this is better. + */ + bool sometimes(unsigned howOften) { return ++_sometimes % howOften == 0; } + public: /* set _god=true temporarily, safely */ @@ -214,8 +224,7 @@ namespace mongo { }; // class Client::Context - - }; + }; // class Client /** get the Client object for this thread. */ inline Client& cc() { diff --git a/db/clientcursor.cpp b/db/clientcursor.cpp index 0452cdb0473e0..d35d8eed3d959 100644 --- a/db/clientcursor.cpp +++ b/db/clientcursor.cpp @@ -407,7 +407,7 @@ namespace mongo { _c->noteLocation(); } - int ClientCursor::yieldSuggest() { + int ClientCursor::suggestYieldMicros() { int writers = 0; int readers = 0; @@ -415,9 +415,11 @@ namespace mongo { if ( micros > 0 && writers == 0 && dbMutex.getState() <= 0 ) { // we have a read lock, and only reads are coming on, so why bother unlocking - micros = 0; + return 0; } + wassert( micros < 10000000 ); + dassert( micros < 1000001 ); return micros; } @@ -452,18 +454,19 @@ namespace mongo { if ( yielded ) { *yielded = false; } - if ( ! _yieldSometimesTracker.ping() ) { + if ( ! _yieldSometimesTracker.intervalHasElapsed() ) { Record* rec = _recordForYield( need ); if ( rec ) { + // yield for page fault if ( yielded ) { *yielded = true; } - return yield( yieldSuggest() , rec ); + return yield( suggestYieldMicros() , rec ); } return true; } - int micros = yieldSuggest(); + int micros = suggestYieldMicros(); if ( micros > 0 ) { if ( yielded ) { *yielded = true; @@ -477,8 +480,10 @@ namespace mongo { killCurrentOp.checkForInterrupt( false ); { auto_ptr lk; - if ( rec ) + if ( rec ) { + // need to lock this else rec->touch won't be safe file could disapear etc. lk.reset( new RWLockRecursive::Shared( MongoFile::mmmutex) ); + } dbtempreleasecond unlock; if ( unlock.unlocked() ) { @@ -554,15 +559,15 @@ namespace mongo { return true; } + /** @return true if cursor is still ok */ bool ClientCursor::yield( int micros , Record * recordToLoad ) { - if ( ! _c->supportYields() ) + + if ( ! _c->supportYields() ) // so me cursors (geo@oct2011) don't support yielding return true; YieldData data; prepareToYield( data ); - staticYield( micros , _ns , recordToLoad ); - return ClientCursor::recoverFromYield( data ); } diff --git a/db/clientcursor.h b/db/clientcursor.h index 4e0d1b381027e..1e9f928a47725 100644 --- a/db/clientcursor.h +++ b/db/clientcursor.h @@ -191,7 +191,7 @@ namespace mongo { */ bool yieldSometimes( RecordNeeds need, bool *yielded = 0 ); - static int yieldSuggest(); + static int suggestYieldMicros(); static void staticYield( int micros , const StringData& ns , Record * rec ); struct YieldData { CursorId _id; bool _doingDeletes; }; diff --git a/db/compact.cpp b/db/compact.cpp index a0010e9df99df..35e87806edf41 100644 --- a/db/compact.cpp +++ b/db/compact.cpp @@ -110,7 +110,7 @@ namespace mongo { // remove the old records (orphan them) periodically so our commit block doesn't get too large bool stopping = false; - RARELY stopping = *killCurrentOp.checkForInterruptNoAssert(false) != 0; + RARELY stopping = *killCurrentOp.checkForInterruptNoAssert() != 0; if( stopping || getDur().aCommitIsNeeded() ) { e->firstRecord.writing() = L; Record *r = L.rec(); diff --git a/db/curop.h b/db/curop.h index 2717d78cc6248..14a2bb1f631ef 100644 --- a/db/curop.h +++ b/db/curop.h @@ -349,22 +349,41 @@ namespace mongo { bool globalInterruptCheck() const { return _globalKill; } void checkForInterrupt( bool heedMutex = true ) { + Client& c = cc(); if ( heedMutex && dbMutex.isWriteLocked() ) return; if( _globalKill ) uasserted(11600,"interrupted at shutdown"); - if( cc().curop()->killed() ) + if( c.curop()->killed() ) uasserted(11601,"interrupted"); + if( c.sometimes(1024) ) { + AbstractMessagingPort *p = cc().port(); + if( p ) + p->assertStillConnected(); + } } /** @return "" if not interrupted. otherwise, you should stop. */ - const char *checkForInterruptNoAssert( bool heedMutex = true ) { - if ( heedMutex && dbMutex.isWriteLocked() ) - return ""; + const char *checkForInterruptNoAssert( /*bool heedMutex = true*/ ) { + Client& c = cc(); + // always called withi false so commented out: + /*if ( heedMutex && dbMutex.isWriteLocked() ) + return "";*/ if( _globalKill ) return "interrupted at shutdown"; - if( cc().curop()->killed() ) + if( c.curop()->killed() ) return "interrupted"; + if( c.sometimes(1024) ) { + try { + AbstractMessagingPort *p = cc().port(); + if( p ) + p->assertStillConnected(); + } + catch(...) { + log() << "no longer connected to client"; + return "no longer connected to client"; + } + } return ""; } diff --git a/db/db.cpp b/db/db.cpp index 10914b78c7447..0d52bbb2210ad 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -393,7 +393,7 @@ namespace mongo { const char * jsInterruptCallback() { // should be safe to interrupt in js code, even if we have a write lock - return killCurrentOp.checkForInterruptNoAssert( false ); + return killCurrentOp.checkForInterruptNoAssert(); } unsigned jsGetInterruptSpecCallback() { diff --git a/db/ops/query.cpp b/db/ops/query.cpp index cf4dc98a64904..aa50e2952f16a 100644 --- a/db/ops/query.cpp +++ b/db/ops/query.cpp @@ -102,7 +102,7 @@ namespace mongo { int n = 0; if ( unlikely(!cc) ) { - log() << "getMore: cursorid not found " << ns << " " << cursorid << endl; + LOGSOME << "getMore: cursorid not found " << ns << " " << cursorid << endl; cursorid = 0; resultFlags = ResultFlag_CursorNotFound; } diff --git a/db/pdfile.cpp b/db/pdfile.cpp index 4162eedc243cf..d449278fd4012 100644 --- a/db/pdfile.cpp +++ b/db/pdfile.cpp @@ -1167,7 +1167,7 @@ namespace mongo { if( tot > 1000 ) { static int n; DEV if( n++ == 0 ) - log() << "warning already writable too often" << endl; + log() << "warning upgradeToWritable: already in writable too often" << endl; } } } diff --git a/db/queryoptimizer.cpp b/db/queryoptimizer.cpp index 0ce57006309f9..02e2a9dc74750 100644 --- a/db/queryoptimizer.cpp +++ b/db/queryoptimizer.cpp @@ -661,10 +661,10 @@ namespace mongo { if ( ! _plans._mayYield ) return; - if ( ! _plans._yieldSometimesTracker.ping() ) + if ( ! _plans._yieldSometimesTracker.intervalHasElapsed() ) return; - int micros = ClientCursor::yieldSuggest(); + int micros = ClientCursor::suggestYieldMicros(); if ( micros <= 0 ) return; diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index ea11ac7508e2e..ebc6b9166d69a 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -488,7 +488,7 @@ namespace mongo { scoped_spinlock lk( _trackerLocks ); set::iterator i = _cloneLocs.begin(); for ( ; i!=_cloneLocs.end(); ++i ) { - if (tracker.ping()) // should I yield? + if (tracker.intervalHasElapsed()) // should I yield? break; DiskLoc dl = *i; diff --git a/server.h b/server.h index a614bb45f1820..35c472c23dd2e 100644 --- a/server.h +++ b/server.h @@ -6,8 +6,6 @@ Over time we should move more here, and more out of pch.h. And get rid of pch.h at some point. */ -// todo is there a boost thign for this already? - #pragma once #include "bson/inline_decls.h" diff --git a/util/net/listen.h b/util/net/listen.h index 5312f6bbe7564..ca90e835b97a9 100644 --- a/util/net/listen.h +++ b/util/net/listen.h @@ -108,7 +108,7 @@ namespace mongo { * call this for every iteration * returns true if one of the triggers has gone off */ - bool ping() { + bool intervalHasElapsed() { if ( ( ++_pings % _h ) == 0 ) { _last = Listener::getElapsedTimeMillis(); return true; @@ -124,8 +124,8 @@ namespace mongo { } private: - int _h; - int _ms; + const int _h; + const int _ms; unsigned long long _pings; diff --git a/util/net/message_port.cpp b/util/net/message_port.cpp index 9abfaf7c97528..cb9dcc46ee03d 100644 --- a/util/net/message_port.cpp +++ b/util/net/message_port.cpp @@ -248,6 +248,10 @@ namespace mongo { return true; } + void MessagingPort::assertStillConnected() { + uassert(15901, "client disconnected during operation", Socket::stillConnected()); + } + void MessagingPort::say(Message& toSend, int responseTo) { assert( !toSend.empty() ); mmm( log() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) diff --git a/util/net/message_port.h b/util/net/message_port.h index 22ecafecfbcae..5d404d84f8a2c 100644 --- a/util/net/message_port.h +++ b/util/net/message_port.h @@ -37,7 +37,7 @@ namespace mongo { virtual HostAndPort remote() const = 0; virtual unsigned remotePort() const = 0; - private: + virtual void assertStillConnected() = 0; public: // TODO make this private with some helpers @@ -88,6 +88,7 @@ namespace mongo { unsigned remotePort() const { return Socket::remotePort(); } virtual HostAndPort remote() const; + void assertStillConnected(); private: diff --git a/util/net/sock.cpp b/util/net/sock.cpp index b4bbf524c6e14..e84cc4ab1bbc4 100644 --- a/util/net/sock.cpp +++ b/util/net/sock.cpp @@ -524,6 +524,27 @@ namespace mongo { return ::send( _fd , data , len , portSendFlags ); } + bool Socket::stillConnected() { +#ifdef MONGO_SSL + DEV log() << "TODO stillConnected() w/SSL" << endl; +#else + int r = _send("", 0); + if( r < 0 ) { +#if defined(_WIN32) + if ( WSAGetLastError() == WSAETIMEDOUT ) { +#else + if ( ( errno == EAGAIN || errno == EWOULDBLOCK ) ) { +#endif + ; + } + else { + return false; + } + } +#endif + return true; + } + // sends all data or throws an exception void Socket::send( const char * data , int len, const char *context ) { while( len > 0 ) { @@ -571,7 +592,9 @@ namespace mongo { } } - // sends all data or throws an exception + /** sends all data or throws an exception + * @param context descriptive for logging + */ void Socket::send( const vector< pair< char *, int > > &data, const char *context ) { #ifdef MONGO_SSL diff --git a/util/net/sock.h b/util/net/sock.h index 1cd513335257b..2053768cbd541 100644 --- a/util/net/sock.h +++ b/util/net/sock.h @@ -212,6 +212,8 @@ namespace mongo { void setTimeout( double secs ); + bool stillConnected(); + #ifdef MONGO_SSL /** secures inline */ void secure( SSLManager * ssl ); @@ -226,8 +228,11 @@ namespace mongo { private: void _init(); + /** raw send, same semantics as ::send */ + public: int _send( const char * data , int len ); + private: /** sends dumbly, just each buffer at a time */ void _send( const vector< pair< char *, int > > &data, const char *context );