-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
Copy pathwrite_to_change_collection_in_startup_recovery.js
161 lines (136 loc) · 6.25 KB
/
write_to_change_collection_in_startup_recovery.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Tests that replaying the oplog entries during the startup recovery also writes to the change
// collection.
// @tags: [
// requires_fcv_62,
// ]
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {verifyChangeCollectionEntries} from "jstests/serverless/libs/change_collection_util.js";
const replSetTest =
new ReplSetTest({nodes: 1, name: "ChangeStreamMultitenantReplicaSetTest", serverless: true});
replSetTest.startSet({
setParameter: {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagRequireTenantID: true,
featureFlagSecurityToken: true
}
});
replSetTest.initiate();
let primary = replSetTest.getPrimary();
const tenantId = ObjectId();
const token = _createTenantToken({tenant: tenantId});
primary._setSecurityToken(token);
// Enable the change stream to create the change collection.
assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
// Insert a document to the collection and then capture the corresponding oplog timestamp. This
// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the
// change collection entries.
const startTimestamp = assert
.commandWorked(primary.getDB("test").runCommand(
{insert: "seedCollection", documents: [{_id: "beginTs"}]}))
.operationTime;
// Pause the checkpointing, as such non-journaled collection including the change collection will
// not be persisted.
const pauseCheckpointThreadFailPoint = configureFailPoint(primary, "pauseCheckpointThread");
pauseCheckpointThreadFailPoint.wait();
// Insert a document to the collection.
assert.commandWorked(primary.getDB("test").runCommand(
{insert: "stockPrice", documents: [{_id: "mdb", price: 250}]}));
// Verify that the inserted document can be queried from the 'stockPrice', the 'oplog.rs', and
// the 'system.change_collection'.
assert.eq(assert
.commandWorked(primary.getDB("test").runCommand(
{find: "stockPrice", filter: {_id: "mdb", price: 250}}))
.cursor.firstBatch.length,
1);
// Clear the token before accessing the local database
primary._setSecurityToken(undefined);
assert.eq(assert
.commandWorked(primary.getDB("local").runCommand(
{find: "oplog.rs", filter: {ns: "test.stockPrice", o: {_id: "mdb", price: 250}}}))
.cursor.firstBatch.length,
1);
primary._setSecurityToken(token);
assert.eq(assert
.commandWorked(primary.getDB("config").runCommand({
find: "system.change_collection",
filter: {ns: "test.stockPrice", o: {_id: "mdb", price: 250}}
}))
.cursor.firstBatch.length,
1);
// Perform ungraceful shutdown of the primary node and do not clean the db path directory.
replSetTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true});
// Run a new mongoD instance with db path pointing to the replica set primary db directory.
const standalone = MongoRunner.runMongod({
setParameter: {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagRequireTenantID: true
},
dbpath: primary.dbpath,
noReplSet: true,
noCleanData: true
});
assert.neq(null, standalone, "Fail to restart the node as standalone");
// Set the token on the new connection
standalone._setSecurityToken(token);
// Verify that the inserted document does not exist both in the 'stockPrice' and
// the 'system.change_collection' but exists in the 'oplog.rs'.
assert.eq(assert
.commandWorked(standalone.getDB("test").runCommand(
{find: "stockPrice", filter: {_id: "mdb", price: 250}}))
.cursor.firstBatch.length,
0);
// Clear the token before accessing the local database
standalone._setSecurityToken(undefined);
assert.eq(assert
.commandWorked(standalone.getDB("local").runCommand(
{find: "oplog.rs", filter: {ns: "test.stockPrice", o: {_id: "mdb", price: 250}}}))
.cursor.firstBatch.length,
1);
standalone._setSecurityToken(token);
assert.eq(assert
.commandWorked(standalone.getDB("config").runCommand({
find: "system.change_collection",
filter: {ns: "test.stockPrice", o: {_id: "mdb", price: 250}}
}))
.cursor.firstBatch.length,
0);
// Stop the mongoD instance and do not clean the db directory.
MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: true, wait: true});
// Start the replica set primary with the same db path.
replSetTest.start(primary, {
noCleanData: true,
serverless: true,
setParameter: {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagRequireTenantID: true
}
});
primary = replSetTest.getPrimary();
primary._setSecurityToken(token);
// Verify that the 'stockPrice' and the 'system.change_collection' now have the inserted
// document. This document was inserted by applying oplog entries during the startup recovery.
assert.eq(assert
.commandWorked(primary.getDB("test").runCommand(
{find: "stockPrice", filter: {_id: "mdb", price: 250}}))
.cursor.firstBatch.length,
1);
assert.eq(assert
.commandWorked(primary.getDB("config").runCommand({
find: "system.change_collection",
filter: {ns: "test.stockPrice", o: {_id: "mdb", price: 250}}
}))
.cursor.firstBatch.length,
1);
// Get the oplog timestamp up to this point. All oplog entries upto this timestamp must exist in the
// change collection.
primary._setSecurityToken(undefined);
const endTimestamp = primary.getDB("local").oplog.rs.find().toArray().at(-1).ts;
assert(endTimestamp !== undefined);
// Verify that the oplog and the change collection entries between the ('startTimestamp',
// 'endTimestamp'] window are exactly same and in the same order.
verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp, tenantId, token);
replSetTest.stopSet();