Skip to content

Commit

Permalink
cleaning ClientCursor to obey kernel rules and encapsulate more so ca…
Browse files Browse the repository at this point in the history
…n add logic later
  • Loading branch information
erh committed Oct 29, 2010
1 parent dab6e79 commit 6157d16
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 122 deletions.
56 changes: 28 additions & 28 deletions db/clientcursor.cpp
Expand Up @@ -43,27 +43,27 @@ namespace mongo {
if( clientCursorsById.size() ) {
log() << "ERROR clientcursors exist but should not at this point" << endl;
ClientCursor *cc = clientCursorsById.begin()->second;
log() << "first one: " << cc->cursorid << ' ' << cc->ns << endl;
log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl;
clientCursorsById.clear();
assert(false);
}
}


void ClientCursor::setLastLoc_inlock(DiskLoc L) {
assert( pos != -2 ); // defensive - see ~ClientCursor
assert( _pos != -2 ); // defensive - see ~ClientCursor

if ( L == _lastLoc )
return;

CCByLoc& bl = byLoc();

if ( !_lastLoc.isNull() ) {
bl.erase( ByLocKey( _lastLoc, cursorid ) );
bl.erase( ByLocKey( _lastLoc, _cursorid ) );
}

if ( !L.isNull() )
bl[ByLocKey(L,cursorid)] = this;
bl[ByLocKey(L,_cursorid)] = this;
_lastLoc = L;
}

Expand Down Expand Up @@ -96,7 +96,7 @@ namespace mongo {
ClientCursor *cc = i->second;
if( cc->_db != db )
continue;
if ( strncmp(nsPrefix, cc->ns.c_str(), len) == 0 ) {
if ( strncmp(nsPrefix, cc->_ns.c_str(), len) == 0 ) {
toDelete.push_back(i->second);
}
}
Expand Down Expand Up @@ -140,7 +140,7 @@ namespace mongo {
i++;
if( j->second->shouldTimeout( millis ) ){
numberTimedOut++;
log(1) << "killing old cursor " << j->second->cursorid << ' ' << j->second->ns
log(1) << "killing old cursor " << j->second->_cursorid << ' ' << j->second->_ns
<< " idle:" << j->second->idleTime() << "ms\n";
delete j->second;
}
Expand All @@ -158,7 +158,7 @@ namespace mongo {
log() << "perf warning: byLoc.size=" << bl.size() << " in aboutToDeleteBucket\n";
}
for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); i++ )
i->second->c->aboutToDeleteBucket(b);
i->second->_c->aboutToDeleteBucket(b);
}
void aboutToDeleteBucket(const DiskLoc& b) {
ClientCursor::informAboutToDeleteBucket(b);
Expand Down Expand Up @@ -192,12 +192,12 @@ namespace mongo {
if( toAdvance.size() >= 3000 ) {
log() << "perf warning MPW101: " << toAdvance.size() << " cursors for one diskloc "
<< dl.toString()
<< ' ' << toAdvance[1000]->ns
<< ' ' << toAdvance[2000]->ns
<< ' ' << toAdvance[1000]->_ns
<< ' ' << toAdvance[2000]->_ns
<< ' ' << toAdvance[1000]->_pinValue
<< ' ' << toAdvance[2000]->_pinValue
<< ' ' << toAdvance[1000]->pos
<< ' ' << toAdvance[2000]->pos
<< ' ' << toAdvance[1000]->_pos
<< ' ' << toAdvance[2000]->_pos
<< ' ' << toAdvance[1000]->_idleAgeMillis
<< ' ' << toAdvance[2000]->_idleAgeMillis
<< ' ' << toAdvance[1000]->_doingDeletes
Expand All @@ -212,7 +212,7 @@ namespace mongo {

if ( cc->_doingDeletes ) continue;

Cursor *c = cc->c.get();
Cursor *c = cc->_c.get();
if ( c->capped() ){
delete cc;
continue;
Expand Down Expand Up @@ -240,16 +240,16 @@ namespace mongo {
void aboutToDelete(const DiskLoc& dl) { ClientCursor::aboutToDelete(dl); }

ClientCursor::~ClientCursor() {
assert( pos != -2 );
assert( _pos != -2 );

{
recursive_scoped_lock lock(ccmutex);
setLastLoc_inlock( DiskLoc() ); // removes us from bylocation multimap
clientCursorsById.erase(cursorid);
clientCursorsById.erase(_cursorid);

// defensive:
(CursorId&) cursorid = -1;
pos = -2;
(CursorId&)_cursorid = -1;
_pos = -2;
}
}

Expand All @@ -258,17 +258,17 @@ namespace mongo {
need to call when you are ready to "unlock".
*/
void ClientCursor::updateLocation() {
assert( cursorid );
assert( _cursorid );
_idleAgeMillis = 0;
DiskLoc cl = c->refLoc();
DiskLoc cl = _c->refLoc();
if ( lastLoc() == cl ) {
//log() << "info: lastloc==curloc " << ns << '\n';
} else {
recursive_scoped_lock lock(ccmutex);
setLastLoc_inlock(cl);
}
// may be necessary for MultiCursor even when cl hasn't changed
c->noteLocation();
_c->noteLocation();
}

int ClientCursor::yieldSuggest() {
Expand Down Expand Up @@ -310,10 +310,10 @@ namespace mongo {
}

bool ClientCursor::prepareToYield( YieldData &data ) {
if ( ! c->supportYields() )
if ( ! _c->supportYields() )
return false;
// need to store in case 'this' gets deleted
data._id = cursorid;
data._id = _cursorid;

data._doingDeletes = _doingDeletes;
_doingDeletes = false;
Expand All @@ -331,13 +331,13 @@ namespace mongo {
inEmpty = true;
log() << "TEST: manipulate collection during cc:yield" << endl;
if( test == 1 )
Helpers::emptyCollection(ns.c_str());
Helpers::emptyCollection(_ns.c_str());
else if( test == 2 ) {
BSONObjBuilder b; string m;
dropCollection(ns.c_str(), m, b);
dropCollection(_ns.c_str(), m, b);
}
else {
dropDatabase(ns.c_str());
dropDatabase(_ns.c_str());
}
}
}
Expand All @@ -352,12 +352,12 @@ namespace mongo {
}

cc->_doingDeletes = data._doingDeletes;
cc->c->checkLocation();
cc->_c->checkLocation();
return true;
}

bool ClientCursor::yield( int micros ) {
if ( ! c->supportYields() )
if ( ! _c->supportYields() )
return true;
YieldData data;
prepareToYield( data );
Expand Down Expand Up @@ -404,7 +404,7 @@ namespace mongo {
void ClientCursor::updateSlaveLocation( CurOp& curop ){
if ( _slaveReadTill.isNull() )
return;
mongo::updateSlaveLocation( curop , ns.c_str() , _slaveReadTill );
mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill );
}


Expand Down Expand Up @@ -451,7 +451,7 @@ namespace mongo {
recursive_scoped_lock lock(ccmutex);

for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ){
if ( i->second->ns == ns )
if ( i->second->_ns == ns )
all.insert( i->first );
}
}
Expand Down
145 changes: 89 additions & 56 deletions db/clientcursor.h
Expand Up @@ -70,23 +70,6 @@ namespace mongo {

class ClientCursor {
friend class CmdCursorInfo;
DiskLoc _lastLoc; // use getter and setter not this (important)
unsigned _idleAgeMillis; // how long has the cursor been around, relative to server idle time

/* 0 = normal
1 = no timeout allowed
100 = in use (pinned) -- see Pointer class
*/
unsigned _pinValue;

bool _doingDeletes;
ElapsedTracker _yieldSometimesTracker;

static CCById clientCursorsById;
static long long numberTimedOut;
static boost::recursive_mutex ccmutex; // must use this for all statics above!
static CursorId allocCursorId_inlock();

public:
static void assertNoCursors();

Expand Down Expand Up @@ -136,7 +119,7 @@ namespace mongo {
}
if ( c ) {
_c = c;
_id = c->cursorid;
_id = c->_cursorid;
} else {
_c = 0;
_id = -1;
Expand All @@ -152,40 +135,34 @@ namespace mongo {
CursorId _id;
};

/*const*/ CursorId cursorid;
const string ns;
const shared_ptr<Cursor> c;
int pos; // # objects into the cursor so far
const BSONObj query; // used for logging diags only; optional in constructor
const int _queryOptions; // see enum QueryOptions dbclient.h
OpTime _slaveReadTill;
Database * const _db;

ClientCursor(int queryOptions, const shared_ptr<Cursor>& _c, const string& _ns, BSONObj _query = BSONObj()) :
ClientCursor(int queryOptions, const shared_ptr<Cursor>& c, const string& ns, BSONObj query = BSONObj()) :
_ns(ns), _db( cc().database() ),
_c(c), _pos(0),
_query(query), _queryOptions(queryOptions),
_idleAgeMillis(0), _pinValue(0),
_doingDeletes(false), _yieldSometimesTracker(128,10),
ns(_ns), c(_c),
pos(0), query(_query),
_queryOptions(queryOptions),
_db( cc().database() )
_doingDeletes(false), _yieldSometimesTracker(128,10)
{
assert( _db );
assert( str::startsWith(_ns, _db->name) );
if( queryOptions & QueryOption_NoCursorTimeout )
noTimeout();
recursive_scoped_lock lock(ccmutex);
cursorid = allocCursorId_inlock();
clientCursorsById.insert( make_pair(cursorid, this) );
_cursorid = allocCursorId_inlock();
clientCursorsById.insert( make_pair(_cursorid, this) );
}

~ClientCursor();

// *************** basic accessors *******************

CursorId cursorid() const { return _cursorid; }
string ns() const { return _ns; }
Database * db() const { return _db; }
const BSONObj& query() const { return _query; }
int queryOptions() const { return _queryOptions; }

DiskLoc lastLoc() const { return _lastLoc; }

shared_ptr< ParsedQuery > pq;
shared_ptr< FieldMatcher > fields; // which fields query wants returned
Message originalMessage; // this is effectively an auto ptr for data the matcher points to

/* Get rid of cursors for namespaces that begin with nsprefix.
Used by drop, dropIndexes, dropDatabase.
*/
Expand Down Expand Up @@ -218,7 +195,7 @@ namespace mongo {

struct YieldLock : boost::noncopyable {
explicit YieldLock( ptr<ClientCursor> cc )
: _canYield(cc->c->supportYields()) {
: _canYield(cc->_c->supportYields()) {
if ( _canYield ){
cc->prepareToYield( _data );
_unlock.reset(new dbtempreleasecond());
Expand Down Expand Up @@ -246,17 +223,24 @@ namespace mongo {
};

// --- some pass through helpers for Cursor ---

BSONObj indexKeyPattern() { return c->indexKeyPattern(); }
bool ok() { return c->ok(); }
bool advance(){ return c->advance(); }
BSONObj current() { return c->current(); }
DiskLoc currLoc() { return c->currLoc(); }

Cursor* c() const { return _c.get(); }
int pos() const { return _pos; }

void incPos( int n ) { _pos += n; } // TODO: this is bad
void setPos( int n ) { _pos = n; } // TODO : this is bad too

BSONObj indexKeyPattern() { return _c->indexKeyPattern(); }
bool ok() { return _c->ok(); }
bool advance(){ return _c->advance(); }
BSONObj current() { return _c->current(); }
DiskLoc currLoc() { return _c->currLoc(); }
BSONObj currKey() { return _c->currKey(); }

bool currentMatches(){
if ( ! c->matcher() )
if ( ! _c->matcher() )
return true;
return c->matcher()->matchesCurrent( c.get() );
return _c->matcher()->matchesCurrent( _c.get() );
}

private:
Expand Down Expand Up @@ -321,21 +305,70 @@ namespace mongo {

unsigned idleTime() const { return _idleAgeMillis; }

static void idleTimeReport(unsigned millis);
private:
// cursors normally timeout after an inactivy period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
void noTimeout() { _pinValue++; }

CCByLoc& byLoc() { return _db->ccByLoc; }
public:
void setDoingDeletes( bool doingDeletes ) {_doingDeletes = doingDeletes; }

void slaveReadTill( const OpTime& t ) { _slaveReadTill = t; }

public: // static methods

static void idleTimeReport(unsigned millis);

static void appendStats( BSONObjBuilder& result );
static unsigned numCursors() { return clientCursorsById.size(); }
static void informAboutToDeleteBucket(const DiskLoc& b);
static void aboutToDelete(const DiskLoc& dl);
static void find( const string& ns , set<CursorId>& all );


private: // methods

// cursors normally timeout after an inactivy period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
void noTimeout() { _pinValue++; }

CCByLoc& byLoc() { return _db->ccByLoc; }

private:

CursorId _cursorid;

const string _ns;
Database * _db;

const shared_ptr<Cursor> _c;
int _pos; // # objects into the cursor so far

const BSONObj _query; // used for logging diags only; optional in constructor
int _queryOptions; // see enum QueryOptions dbclient.h

OpTime _slaveReadTill;

DiskLoc _lastLoc; // use getter and setter not this (important)
unsigned _idleAgeMillis; // how long has the cursor been around, relative to server idle time

/* 0 = normal
1 = no timeout allowed
100 = in use (pinned) -- see Pointer class
*/
unsigned _pinValue;

bool _doingDeletes;
ElapsedTracker _yieldSometimesTracker;

public:
shared_ptr< ParsedQuery > pq;
shared_ptr< FieldMatcher > fields; // which fields query wants returned
Message originalMessage; // this is effectively an auto ptr for data the matcher points to



private: // static members

static CCById clientCursorsById;
static long long numberTimedOut;
static boost::recursive_mutex ccmutex; // must use this for all statics above!
static CursorId allocCursorId_inlock();

};

class ClientCursorMonitor : public BackgroundJob {
Expand Down

0 comments on commit 6157d16

Please sign in to comment.