Skip to content

Commit

Permalink
feat(sessions): track dirty state of sessions, drop after use
Browse files Browse the repository at this point in the history
If a network error occurs during an operation using a session, that
session must now be marked dirty and discarded after the operation
completes (allowing for retryability).

NODE-1977
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 06689df commit f61df16
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
24 changes: 24 additions & 0 deletions lib/core/sdam/server.js
Expand Up @@ -240,6 +240,12 @@ class Server extends EventEmitter {
}

wireProtocol.command(this, ns, cmd, options, (err, result) => {
if (err && err instanceof MongoNetworkError) {
if (options.session) {
options.session.serverSession.isDirty = true;
}
}

if (err && isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
Expand All @@ -258,6 +264,12 @@ class Server extends EventEmitter {
*/
query(ns, cmd, cursorState, options, callback) {
wireProtocol.query(this, ns, cmd, cursorState, options, (err, result) => {
if (err && err instanceof MongoNetworkError) {
if (options.session) {
options.session.serverSession.isDirty = true;
}
}

if (err && isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
Expand All @@ -276,6 +288,12 @@ class Server extends EventEmitter {
*/
getMore(ns, cursorState, batchSize, options, callback) {
wireProtocol.getMore(this, ns, cursorState, batchSize, options, (err, result) => {
if (err && err instanceof MongoNetworkError) {
if (options.session) {
options.session.serverSession.isDirty = true;
}
}

if (err && isSDAMUnrecoverableError(err)) {
this.emit('error', err);
}
Expand Down Expand Up @@ -406,6 +424,12 @@ function executeWriteOperation(args, options, callback) {
}

return wireProtocol[op](server, ns, ops, options, (err, result) => {
if (err && err instanceof MongoNetworkError) {
if (options.session) {
options.session.serverSession.isDirty = true;
}
}

if (err && isSDAMUnrecoverableError(err)) {
server.emit('error', err);
}
Expand Down
10 changes: 8 additions & 2 deletions lib/core/sessions.js
Expand Up @@ -527,6 +527,7 @@ class ServerSession {
this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
this.lastUse = Date.now();
this.txnNumber = 0;
this.isDirty = false;
}

/**
Expand Down Expand Up @@ -603,15 +604,20 @@ class ServerSessionPool {
release(session) {
const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
while (this.sessions.length) {
const session = this.sessions[this.sessions.length - 1];
if (session.hasTimedOut(sessionTimeoutMinutes)) {
const pooledSession = this.sessions[this.sessions.length - 1];
if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
this.sessions.pop();
} else {
break;
}
}

if (!session.hasTimedOut(sessionTimeoutMinutes)) {
if (session.isDirty) {
return;
}

// otherwise, readd this session to the session pool
this.sessions.unshift(session);
}
}
Expand Down

0 comments on commit f61df16

Please sign in to comment.