-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathcreate_collection.ts
213 lines (193 loc) · 7.45 KB
/
create_collection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import type { Document } from '../bson';
import {
MIN_SUPPORTED_QE_SERVER_VERSION,
MIN_SUPPORTED_QE_WIRE_VERSION
} from '../cmap/wire_protocol/constants';
import { Collection } from '../collection';
import type { Db } from '../db';
import { MongoCompatibilityError } from '../error';
import type { PkFactory } from '../mongo_client';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { CommandOperation, type CommandOperationOptions } from './command';
import { CreateIndexesOperation } from './indexes';
import { Aspect, defineAspects } from './operation';
const ILLEGAL_COMMAND_FIELDS = new Set([
'w',
'wtimeout',
'timeoutMS',
'j',
'fsync',
'autoIndexId',
'pkFactory',
'raw',
'readPreference',
'session',
'readConcern',
'writeConcern',
'raw',
'fieldsAsRaw',
'useBigInt64',
'promoteLongs',
'promoteValues',
'promoteBuffers',
'bsonRegExp',
'serializeFunctions',
'ignoreUndefined',
'enableUtf8Validation'
]);
/** @public
* Configuration options for timeseries collections
* @see https://www.mongodb.com/docs/manual/core/timeseries-collections/
*/
export interface TimeSeriesCollectionOptions extends Document {
timeField: string;
metaField?: string;
granularity?: 'seconds' | 'minutes' | 'hours' | string;
bucketMaxSpanSeconds?: number;
bucketRoundingSeconds?: number;
}
/** @public
* Configuration options for clustered collections
* @see https://www.mongodb.com/docs/manual/core/clustered-collections/
*/
export interface ClusteredCollectionOptions extends Document {
name?: string;
key: Document;
unique: boolean;
}
/** @public */
export interface CreateCollectionOptions extends CommandOperationOptions {
/** Create a capped collection */
capped?: boolean;
/** @deprecated Create an index on the _id field of the document. This option is deprecated in MongoDB 3.2+ and will be removed once no longer supported by the server. */
autoIndexId?: boolean;
/** The size of the capped collection in bytes */
size?: number;
/** The maximum number of documents in the capped collection */
max?: number;
/** Available for the MMAPv1 storage engine only to set the usePowerOf2Sizes and the noPadding flag */
flags?: number;
/** Allows users to specify configuration to the storage engine on a per-collection basis when creating a collection */
storageEngine?: Document;
/** Allows users to specify validation rules or expressions for the collection. For more information, see Document Validation */
validator?: Document;
/** Determines how strictly MongoDB applies the validation rules to existing documents during an update */
validationLevel?: string;
/** Determines whether to error on invalid documents or just warn about the violations but allow invalid documents to be inserted */
validationAction?: string;
/** Allows users to specify a default configuration for indexes when creating a collection */
indexOptionDefaults?: Document;
/** The name of the source collection or view from which to create the view. The name is not the full namespace of the collection or view (i.e., does not include the database name and implies the same database as the view to create) */
viewOn?: string;
/** An array that consists of the aggregation pipeline stage. Creates the view by applying the specified pipeline to the viewOn collection or view */
pipeline?: Document[];
/** A primary key factory function for generation of custom _id keys. */
pkFactory?: PkFactory;
/** A document specifying configuration options for timeseries collections. */
timeseries?: TimeSeriesCollectionOptions;
/** A document specifying configuration options for clustered collections. For MongoDB 5.3 and above. */
clusteredIndex?: ClusteredCollectionOptions;
/** The number of seconds after which a document in a timeseries or clustered collection expires. */
expireAfterSeconds?: number;
/** @experimental */
encryptedFields?: Document;
/**
* If set, enables pre-update and post-update document events to be included for any
* change streams that listen on this collection.
*/
changeStreamPreAndPostImages?: { enabled: boolean };
}
/* @internal */
const INVALID_QE_VERSION =
'Driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption.';
/** @internal */
export class CreateCollectionOperation extends CommandOperation<Collection> {
override options: CreateCollectionOptions;
db: Db;
name: string;
constructor(db: Db, name: string, options: CreateCollectionOptions = {}) {
super(db, options);
this.options = options;
this.db = db;
this.name = name;
}
override get commandName() {
return 'create' as const;
}
override async execute(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
const options = this.options;
const encryptedFields: Document | undefined =
options.encryptedFields ??
db.client.s.options.autoEncryption?.encryptedFieldsMap?.[`${db.databaseName}.${name}`];
if (encryptedFields) {
// Creating a QE collection required min server of 7.0.0
// TODO(NODE-5353): Get wire version information from connection.
if (
!server.loadBalanced &&
server.description.maxWireVersion < MIN_SUPPORTED_QE_WIRE_VERSION
) {
throw new MongoCompatibilityError(
`${INVALID_QE_VERSION} The minimum server version required is ${MIN_SUPPORTED_QE_SERVER_VERSION}`
);
}
// Create auxilliary collections for queryable encryption support.
const escCollection = encryptedFields.escCollection ?? `enxcol_.${name}.esc`;
const ecocCollection = encryptedFields.ecocCollection ?? `enxcol_.${name}.ecoc`;
for (const collectionName of [escCollection, ecocCollection]) {
const createOp = new CreateCollectionOperation(db, collectionName, {
clusteredIndex: {
key: { _id: 1 },
unique: true
}
});
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
}
if (!options.encryptedFields) {
this.options = { ...this.options, encryptedFields };
}
}
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
if (encryptedFields) {
// Create the required index for queryable encryption support.
const createIndexOp = CreateIndexesOperation.fromIndexSpecification(
db,
name,
{ __safeContent__: 1 },
{}
);
await createIndexOp.execute(server, session, timeoutContext);
}
return coll;
}
private async executeWithoutEncryptedFieldsCheck(
server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<Collection> {
const db = this.db;
const name = this.name;
const options = this.options;
const cmd: Document = { create: name };
for (const n in options) {
if (
(options as any)[n] != null &&
typeof (options as any)[n] !== 'function' &&
!ILLEGAL_COMMAND_FIELDS.has(n)
) {
cmd[n] = (options as any)[n];
}
}
// otherwise just execute the command
await super.executeCommand(server, session, cmd, timeoutContext);
return new Collection(db, name, options);
}
}
defineAspects(CreateCollectionOperation, [Aspect.WRITE_OPERATION]);