-
Notifications
You must be signed in to change notification settings - Fork 522
/
odspDeltaStorageService.ts
255 lines (233 loc) · 9.12 KB
/
odspDeltaStorageService.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/
import { ITelemetryBaseProperties } from "@fluidframework/core-interfaces";
import { assert } from "@fluidframework/core-utils/internal";
import { validateMessages } from "@fluidframework/driver-base/internal";
import {
IDeltasFetchResult,
IDocumentDeltaStorageService,
type IStream,
} from "@fluidframework/driver-definitions/internal";
import { requestOps, streamObserver } from "@fluidframework/driver-utils/internal";
import { InstrumentedStorageTokenFetcher } from "@fluidframework/odsp-driver-definitions/internal";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils";
import { PerformanceEvent } from "@fluidframework/telemetry-utils/internal";
import { v4 as uuid } from "uuid";
import { IDeltaStorageGetResponse, ISequencedDeltaOpMessage } from "./contracts.js";
import { EpochTracker } from "./epochTracker.js";
import { OdspDocumentStorageService } from "./odspDocumentStorageManager.js";
import { getWithRetryForTokenRefresh } from "./odspUtils.js";
/**
* Provides access to the underlying delta storage on the server for sharepoint driver.
*/
export class OdspDeltaStorageService {
constructor(
private readonly deltaFeedUrl: string,
private readonly getStorageToken: InstrumentedStorageTokenFetcher,
private readonly epochTracker: EpochTracker,
private readonly logger: ITelemetryLoggerExt,
) {}
/**
* Retrieves ops from storage
* @param from - inclusive
* @param to - exclusive
* @param telemetryProps - properties to add when issuing telemetry events
* @param scenarioName - reason for fetching ops
* @returns ops retrieved & info if result was partial (i.e. more is available)
*/
public async get(
from: number,
to: number,
telemetryProps: ITelemetryBaseProperties,
scenarioName?: string,
): Promise<IDeltasFetchResult> {
return getWithRetryForTokenRefresh(async (options) => {
// Note - this call ends up in getSocketStorageDiscovery() and can refresh token
// Thus it needs to be done before we call getStorageToken() to reduce extra calls
const baseUrl = this.buildUrl(from, to);
const storageToken = await this.getStorageToken(options, "DeltaStorage");
return PerformanceEvent.timedExecAsync(
this.logger,
{
eventName: "OpsFetch",
attempts: options.refresh ? 2 : 1,
from,
to,
...telemetryProps,
reason: scenarioName,
},
async (event) => {
const formBoundary = uuid();
let postBody = `--${formBoundary}\r\n`;
postBody += `Authorization: Bearer ${storageToken}\r\n`;
postBody += `X-HTTP-Method-Override: GET\r\n`;
postBody += `_post: 1\r\n`;
postBody += `\r\n--${formBoundary}--`;
const headers: { [index: string]: string } = {
"Content-Type": `multipart/form-data;boundary=${formBoundary}`,
};
// Some request take a long time (1-2 minutes) to complete, where telemetry shows very small amount
// of time spent on server, and usually small payload sizes. I.e. all the time is spent somewhere in
// networking. Even bigger problem - a lot of requests timeout (based on cursory look - after 1-2 minutes)
// So adding some timeout to ensure we retry again in hope of faster success.
// Please see https://github.com/microsoft/FluidFramework/issues/6997 for details.
const abort = new AbortController();
const timer = setTimeout(() => abort.abort(), 30000);
const response =
await this.epochTracker.fetchAndParseAsJSON<IDeltaStorageGetResponse>(
baseUrl,
{
headers,
body: postBody,
method: "POST",
signal: abort.signal,
},
"ops",
true,
scenarioName,
);
clearTimeout(timer);
const deltaStorageResponse = response.content;
const messages =
deltaStorageResponse.value.length > 0 &&
"op" in deltaStorageResponse.value[0]
? (deltaStorageResponse.value as ISequencedDeltaOpMessage[]).map(
(operation) => operation.op,
)
: (deltaStorageResponse.value as ISequencedDocumentMessage[]);
event.end({
length: messages.length,
...response.propsToLog,
});
// It is assumed that server always returns all the ops that it has in the range that was requested.
// This may change in the future, if so, we need to adjust and receive "end" value from server in such case.
return { messages, partialResult: false };
},
);
});
}
public buildUrl(from: number, to: number): string {
const filter = encodeURIComponent(
`sequenceNumber ge ${from} and sequenceNumber le ${to - 1}`,
);
const queryString = `?ump=1&filter=${filter}`;
return `${this.deltaFeedUrl}${queryString}`;
}
}
export class OdspDeltaStorageWithCache implements IDocumentDeltaStorageService {
private useCacheForOps = true;
public constructor(
private snapshotOps: ISequencedDocumentMessage[] | undefined,
private readonly logger: ITelemetryLoggerExt,
private readonly batchSize: number,
private readonly concurrency: number,
private readonly getFromStorage: (
from: number,
to: number,
telemetryProps: ITelemetryBaseProperties,
fetchReason?: string,
) => Promise<IDeltasFetchResult>,
private readonly getCached: (
from: number,
to: number,
) => Promise<ISequencedDocumentMessage[]>,
private readonly requestFromSocket: (from: number, to: number) => void,
private readonly opsReceived: (ops: ISequencedDocumentMessage[]) => void,
private readonly storageManagerGetter: () => OdspDocumentStorageService | undefined,
) {}
public fetchMessages(
fromTotal: number,
toTotal: number | undefined,
abortSignal?: AbortSignal,
cachedOnly?: boolean,
fetchReason?: string,
): IStream<ISequencedDocumentMessage[]> {
// We do not control what's in the cache. Current API assumes that fetchMessages() keeps banging on
// storage / cache until it gets ops it needs. This would result in deadlock if fixed range is asked from
// cache and it's not there.
// Better implementation would be to return only what we have in cache, but that also breaks API
assert(!cachedOnly || toTotal === undefined, 0x1e3);
// Don't use cache for ops is snapshot is fetched from network or if it was not fetched at all.
this.useCacheForOps =
this.useCacheForOps &&
this.storageManagerGetter()?.isFirstSnapshotFromNetwork === false;
let opsFromSnapshot = 0;
let opsFromCache = 0;
let opsFromStorage = 0;
const requestCallback = async (
from: number,
to: number,
telemetryProps: ITelemetryBaseProperties,
): Promise<IDeltasFetchResult> => {
if (this.snapshotOps !== undefined && this.snapshotOps.length > 0) {
const messages = this.snapshotOps.filter(
(op) => op.sequenceNumber >= from && op.sequenceNumber < to,
);
validateMessages("cached", messages, from, this.logger);
if (messages.length > 0 && messages[0].sequenceNumber === from) {
this.snapshotOps = this.snapshotOps.filter((op) => op.sequenceNumber >= to);
opsFromSnapshot += messages.length;
return { messages, partialResult: true };
}
this.snapshotOps = undefined;
}
// Kick out request to PUSH for ops if it has them
this.requestFromSocket(from, to);
// Cache in normal flow is continuous. Once there is a miss, stop consulting cache.
// This saves a bit of processing time.
if (this.useCacheForOps) {
const messagesFromCache = await this.getCached(from, to);
validateMessages("cached", messagesFromCache, from, this.logger);
// Set the firstCacheMiss as true in case we didn't get all the ops.
// This will save an extra cache read on "DocumentOpen" or "PostDocumentOpen".
this.useCacheForOps = from + messagesFromCache.length >= to;
if (messagesFromCache.length > 0) {
opsFromCache += messagesFromCache.length;
return {
messages: messagesFromCache,
partialResult: true,
};
}
}
if (cachedOnly) {
return { messages: [], partialResult: false };
}
const ops = await this.getFromStorage(from, to, telemetryProps, fetchReason);
validateMessages("storage", ops.messages, from, this.logger);
opsFromStorage += ops.messages.length;
this.opsReceived(ops.messages);
return ops;
};
const stream = requestOps(
async (from: number, to: number, telemetryProps: ITelemetryBaseProperties) => {
const result = await requestCallback(from, to, telemetryProps);
// Catch all case, just in case
validateMessages("catch all", result.messages, from, this.logger);
return result;
},
// Staging: starting with no concurrency, listening for feedback first.
// In future releases we will switch to actual concurrency
this.concurrency,
fromTotal, // inclusive
toTotal, // exclusive
this.batchSize,
this.logger,
abortSignal,
fetchReason,
);
return streamObserver(stream, (result) => {
if (result.done && opsFromSnapshot + opsFromCache + opsFromStorage !== 0) {
this.logger.sendPerformanceEvent({
eventName: "CacheOpsRetrieved",
opsFromSnapshot,
opsFromCache,
opsFromStorage,
reason: fetchReason,
});
}
});
}
}