Skip to content

Commit

Permalink
SERVER-71440 Remove OpCtx::setIgnoreInterruptsExceptForReplStateChange
Browse files Browse the repository at this point in the history
  • Loading branch information
starzia authored and Evergreen Agent committed Jan 27, 2023
1 parent 0cda040 commit 212db59
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 147 deletions.
3 changes: 0 additions & 3 deletions jstests/serial_run/allow_partial_results_with_maxTimeMS.js
Expand Up @@ -155,9 +155,6 @@ function searchForAndAssertPartialResults(initialTimeoutMS, queryFunc) {

// Try to get partial results, while nudging timeout value around the expected time.
// This first case will try to get all the results in one big batch.
// Start with half of the full runtime of the query.

// Fetch one big batch of results.
searchForAndAssertPartialResults(Math.round(fullQueryTimeoutMS), runBigBatchQuery);

// Try to get partial results in a getMore.
Expand Down
Expand Up @@ -3,6 +3,9 @@
* with 'maxTimeMS' and only a subset of the shards provide data before the timeout.
* Uses three methods to simulate MaxTimeMSExpired: failpoints, MongoBridge, and $where + sleep.
*
* This is in the serial_run suite because the tests using MongoBridge and $where + sleep depend on
* certain work being completed before a maxTimeMS timeout.
*
* @tags: [
* requires_sharding,
* requires_replication,
Expand Down Expand Up @@ -280,8 +283,6 @@ const shard0SleepFailure = new FindWhereSleepController(st.shard0);
const shard1SleepFailure = new FindWhereSleepController(st.shard1);
const allShardsSleepFailure = new MultiController([shard0SleepFailure, shard1SleepFailure]);

const allshardsMixedFailures = new MultiController([shard0NetworkFailure, shard1Failpoint]);

// Due to the hack with sleepFailures below, this has to be the innermost parameterizing function.
function withEachSingleShardFailure(callback) {
callback(shard0Failpoint);
Expand All @@ -302,7 +303,6 @@ function withEachAllShardFailure(callback) {
callback(allShardsFailpoint);
callback(allshardsNetworkFailure);
callback(allShardsSleepFailure);
callback(allshardsMixedFailures);
}

function getMoreShardTimeout(allowPartialResults, failureController, batchSize) {
Expand Down Expand Up @@ -375,23 +375,23 @@ function partialResultsTrueFirstBatch(failureController) {
assert.eq(0, res.cursor.id);
failureController.disable();
}
withEachSingleShardFailure(failure => partialResultsTrueFirstBatch(failure));
withEachSingleShardFailure(partialResultsTrueFirstBatch);

// With 'allowPartialResults: false', if one shard times out then return a timeout error.
function partialResultsFalseOneFailure(failureController) {
failureController.enable();
assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
failureController.disable();
}
withEachSingleShardFailure(failure => partialResultsFalseOneFailure(failure));
withEachSingleShardFailure(partialResultsFalseOneFailure);

// With 'allowPartialResults: false', if both shards time out then return a timeout error.
function allowPartialResultsFalseAllFailed(failureController) {
failureController.enable();
assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
failureController.disable();
}
withEachAllShardFailure(failure => allowPartialResultsFalseAllFailed(failure));
withEachAllShardFailure(allowPartialResultsFalseAllFailed);

// With 'allowPartialResults: true', if both shards time out then return empty "partial" results.
function allowPartialResultsTrueAllFailed(failureController) {
Expand All @@ -402,7 +402,7 @@ function allowPartialResultsTrueAllFailed(failureController) {
assert.eq(res.cursor.firstBatch.length, 0);
failureController.disable();
}
withEachAllShardFailure(failure => allowPartialResultsTrueAllFailed(failure));
withEachAllShardFailure(allowPartialResultsTrueAllFailed);

st.stop();
}());
7 changes: 1 addition & 6 deletions src/mongo/db/operation_context.cpp
Expand Up @@ -220,10 +220,6 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace

Status OperationContext::checkForInterruptNoAssert() noexcept {
if (_noReplStateChangeWhileIgnoringOtherInterrupts()) {
return Status::OK();
}

// TODO: Remove the MONGO_likely(hasClientAndServiceContext) once all operation contexts are
// constructed with clients.
const auto hasClientAndServiceContext = getClient() && getServiceContext();
Expand Down Expand Up @@ -313,8 +309,7 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
// maxTimeNeverTimeOut is set) then we assume that the incongruity is due to a clock mismatch
// and return _timeoutError regardless. To prevent this behaviour, only consider the op's
// deadline in the event that the maxTimeNeverTimeOut failpoint is not set.
bool opHasDeadline = (hasDeadline() && !_noReplStateChangeWhileIgnoringOtherInterrupts() &&
!MONGO_unlikely(maxTimeNeverTimeOut.shouldFail()));
bool opHasDeadline = (hasDeadline() && !MONGO_unlikely(maxTimeNeverTimeOut.shouldFail()));

if (opHasDeadline) {
deadline = std::min(deadline, getDeadline());
Expand Down
19 changes: 0 additions & 19 deletions src/mongo/db/operation_context.h
Expand Up @@ -514,14 +514,6 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
return _alwaysInterruptAtStepDownOrUp.load();
}

/**
* Sets that this operation should ignore interruption except for replication state change. Can
* only be called by the thread executing this on behalf of this OperationContext.
*/
void setIgnoreInterruptsExceptForReplStateChange(bool target) {
_ignoreInterruptsExceptForReplStateChange = target;
}

/**
* Clears metadata associated with a multi-document transaction.
*/
Expand Down Expand Up @@ -687,16 +679,6 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
}
}

/**
* Returns true if ignoring interrupts other than repl state change and no repl state change
* has occurred.
*/
bool _noReplStateChangeWhileIgnoringOtherInterrupts() const {
return _ignoreInterruptsExceptForReplStateChange &&
getKillStatus() != ErrorCodes::InterruptedDueToReplStateChange &&
!_killRequestedForReplStateChange.loadRelaxed();
}

/**
* Returns true if this operation has a deadline and it has passed according to the fast clock
* on ServiceContext.
Expand Down Expand Up @@ -800,7 +782,6 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
bool _shouldIncrementLatencyStats = true;
bool _inMultiDocumentTransaction = false;
bool _isStartingMultiDocumentTransaction = false;
bool _ignoreInterruptsExceptForReplStateChange = false;
// Commands from user applications must run validations and enforce constraints. Operations from
// a trusted source, such as initial sync or consuming an oplog entry generated by a primary
// typically desire to ignore constraints.
Expand Down
68 changes: 0 additions & 68 deletions src/mongo/db/operation_context_test.cpp
Expand Up @@ -730,74 +730,6 @@ TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpirati
ASSERT_TRUE(opCtx->getCancellationToken().isCanceled());
}

TEST_F(OperationDeadlineTests,
MaxTimeExpirationIgnoredWhenIgnoringInterruptsExceptReplStateChange) {
auto opCtx = client->makeOperationContext();
opCtx->setDeadlineByDate(mockClock->now(), ErrorCodes::MaxTimeMSExpired);
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
ASSERT_FALSE(opCtx->getCancellationToken().isCanceled());
opCtx->setIgnoreInterruptsExceptForReplStateChange(true);
// Advance the clock so the MaxTimeMS is hit before the timeout.
mockClock->advance(Milliseconds(100));
ASSERT_FALSE(
opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
ASSERT_FALSE(opCtx->getCancellationToken().isCanceled());
}

TEST_F(OperationDeadlineTests,
AlreadyExpiredMaxTimeIgnoredWhenIgnoringInterruptsExceptReplStateChange) {
auto opCtx = client->makeOperationContext();
opCtx->setDeadlineByDate(mockClock->now(), ErrorCodes::MaxTimeMSExpired);
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
ASSERT_FALSE(opCtx->getCancellationToken().isCanceled());
ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil(
cv, lk, mockClock->now() + Seconds(1), [] { return false; }),
DBException,
ErrorCodes::MaxTimeMSExpired);

ASSERT_EQ(ErrorCodes::MaxTimeMSExpired, opCtx->checkForInterruptNoAssert());
ASSERT_TRUE(opCtx->getCancellationToken().isCanceled());
opCtx->setIgnoreInterruptsExceptForReplStateChange(true);
ASSERT_OK(opCtx->checkForInterruptNoAssert());
// Advance the clock so the MaxTimeMS is hit before the timeout.
mockClock->advance(Milliseconds(100));
ASSERT_FALSE(
opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }));
ASSERT_TRUE(opCtx->getCancellationToken().isCanceled());
}

TEST_F(OperationDeadlineTests,
MaxTimeRespectedAfterReplStateChangeWhenIgnoringInterruptsExceptReplStateChange) {
auto opCtx = client->makeOperationContext();
opCtx->setDeadlineByDate(mockClock->now(), ErrorCodes::MaxTimeMSExpired);
auto m = MONGO_MAKE_LATCH();
stdx::condition_variable cv;
stdx::unique_lock<Latch> lk(m);
ASSERT_FALSE(opCtx->getCancellationToken().isCanceled());
ASSERT_THROWS_CODE(opCtx->waitForConditionOrInterruptUntil(
cv, lk, mockClock->now() + Seconds(1), [] { return false; }),
DBException,
ErrorCodes::MaxTimeMSExpired);

ASSERT_EQ(ErrorCodes::MaxTimeMSExpired, opCtx->checkForInterruptNoAssert());
ASSERT_TRUE(opCtx->getCancellationToken().isCanceled());
opCtx->setIgnoreInterruptsExceptForReplStateChange(true);
ASSERT_OK(opCtx->checkForInterruptNoAssert());
opCtx->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
ASSERT_EQ(ErrorCodes::MaxTimeMSExpired, opCtx->checkForInterruptNoAssert());
// Advance the clock so the MaxTimeMS is hit before the timeout.
mockClock->advance(Milliseconds(100));
ASSERT_THROWS_CODE(
opCtx->waitForConditionOrInterruptUntil(cv, lk, mockClock->now(), [] { return false; }),
DBException,
ErrorCodes::MaxTimeMSExpired);
ASSERT_TRUE(opCtx->getCancellationToken().isCanceled());
}

class ThreadedOperationDeadlineTests : public OperationDeadlineTests {
public:
using CvPred = std::function<bool()>;
Expand Down
72 changes: 28 additions & 44 deletions src/mongo/s/query/cluster_find.cpp
Expand Up @@ -314,7 +314,28 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,

try {
// Establish the cursors with a consistent shardVersion across shards.
params.remotes = establishCursorsOnShards(shardIds);

// If we have maxTimeMS and allowPartialResults, then leave some spare time in the opCtx
// deadline so that we have time to return partial results before the opCtx is killed.
auto deadline = opCtx->getDeadline();
if (findCommand.getAllowPartialResults() && findCommand.getMaxTimeMS()) {
// Reserve 10% of the time budget (up to 100,000 microseconds max) for processing
// buffered partial results.
deadline -= Microseconds{std::min(1000 * (*findCommand.getMaxTimeMS()) / 10, 100000)};
LOGV2_DEBUG(
5746901,
0,
"Setting an earlier artificial deadline because the find allows partial results.",
"deadline"_attr = deadline);
}

// The call to establishCursorsOnShards has its own timeout mechanism that is controlled
// by the opCtx, so we don't expect runWithDeadline to throw a timeout at this level. We
// use runWithDeadline because it has the side effect of pushing a temporary
// (artificial) deadline onto the opCtx used by establishCursorsOnShards.
opCtx->runWithDeadline(deadline, ErrorCodes::MaxTimeMSExpired, [&]() -> void {
params.remotes = establishCursorsOnShards(shardIds);
});
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::CollectionUUIDMismatch &&
!ex.extraInfo<CollectionUUIDMismatchInfo>()->actualCollection() &&
Expand Down Expand Up @@ -352,15 +373,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,

FindCommon::waitInFindBeforeMakingBatch(opCtx, query);

// If we're allowing partial results and we got MaxTimeMSExpired, then temporarily disable
// interrupts in the opCtx so that we can pull already-fetched data from ClusterClientCursor.
bool ignoringInterrupts = false;
if (findCommand.getAllowPartialResults() &&
opCtx->checkForInterruptNoAssert().code() == ErrorCodes::MaxTimeMSExpired) {
// MaxTimeMS is expired, but perhaps remotes not have expired their requests yet.
// Wait for all remote cursors to be exhausted so that we can safely disable interrupts
// in the opCtx. We want to be sure that later calls to ccc->next() do not block on
// more data.
// MaxTimeMS is expired in the router, but some remotes may still have outsanding requests.
// Wait for all remotes to expire their requests.

// Maximum number of 1ms sleeps to wait for remote cursors to be exhausted.
constexpr int kMaxAttempts = 10;
Expand All @@ -377,18 +393,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
}
stdx::this_thread::sleep_for(stdx::chrono::milliseconds(1));
}

// The first MaxTimeMSExpired will have called opCtx->markKilled() so any later
// call to opCtx->checkForInterruptNoAssert() will return an error. We need to
// temporarily ignore this while we pull data from the ClusterClientCursor.
LOGV2_DEBUG(
5746901,
0,
"Attempting to return partial results because MaxTimeMS expired and the query set "
"AllowPartialResults. Temporarily disabling interrupts on the OperationContext "
"while partial results are pulled from the ClusterClientCursor.");
opCtx->setIgnoreInterruptsExceptForReplStateChange(true);
ignoringInterrupts = true;
}

auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
Expand All @@ -398,20 +402,13 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// results come from the initial batches that were obtained when establishing cursors, but
// ClusterClientCursor::next will fetch further results if necessary.
while (!FindCommon::enoughForFirstBatch(findCommand, results->size())) {
auto nextWithStatus = ccc->next();
if (findCommand.getAllowPartialResults() &&
(nextWithStatus.getStatus() == ErrorCodes::MaxTimeMSExpired)) {
auto next = uassertStatusOK(ccc->next());
if (findCommand.getAllowPartialResults()) {
if (ccc->remotesExhausted()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
break;
}
// Continue because there may be results waiting from other remotes.
continue;
} else {
// all error statuses besides permissible remote timeouts should be returned to the user
uassertStatusOK(nextWithStatus);
}
if (nextWithStatus.getValue().isEOF()) {
if (next.isEOF()) {
// We reached end-of-stream. If the cursor is not tailable, then we mark it as
// exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even
// when we reach end-of-stream. However, if all the remote cursors are exhausted, there
Expand All @@ -422,7 +419,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
break;
}

auto nextObj = *(nextWithStatus.getValue().getResult());
auto nextObj = *next.getResult();

// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
Expand All @@ -437,19 +434,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
results->push_back(std::move(nextObj));
}

if (ignoringInterrupts) {
opCtx->setIgnoreInterruptsExceptForReplStateChange(false);
ignoringInterrupts = false;
LOGV2_DEBUG(5746902, 0, "Re-enabled interrupts on the OperationContext.");
}

// Surface any opCtx interrupts, except ignore MaxTimeMSExpired with allowPartialResults.
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!(interruptStatus.code() == ErrorCodes::MaxTimeMSExpired &&
findCommand.getAllowPartialResults())) {
uassertStatusOK(interruptStatus);
}

ccc->detachFromOperationContext();

if (findCommand.getSingleBatch() && !ccc->isTailable()) {
Expand Down

0 comments on commit 212db59

Please sign in to comment.