-
Notifications
You must be signed in to change notification settings - Fork 71
/
multipart.parser.file.ts
49 lines (46 loc) · 2.18 KB
/
multipart.parser.file.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import { HttpRequest, HttpError, HttpStatus } from '@marblejs/core';
import { isNonNullable } from '@marblejs/core/dist/+internal/utils';
import { fromReadableStream } from '@marblejs/core/dist/+internal/observable';
import { fromEvent, Observable, of, throwError, merge, iif } from 'rxjs';
import { mapTo, map, mergeMap, takeUntil, toArray, tap, mergeMapTo, ignoreElements } from 'rxjs/operators';
import { Readable } from 'stream';
import { FileIncomingData, ParserOpts } from './multipart.interface';
import { setRequestData, shouldParseFieldname } from './multipart.util';
type FileEvent = [string, NodeJS.ReadableStream, string, string, string];
const fileSizeLimit = (maxBytes: number | undefined) => (data: FileIncomingData) => (finish$: Observable<any>) =>
fromEvent(data.file, 'limit').pipe(
takeUntil(finish$),
mergeMapTo(throwError(
new HttpError(`Reached file size limit for "${data.fieldname}" [${maxBytes} bytes]`, HttpStatus.PRECONDITION_FAILED),
)),
ignoreElements(),
);
export const parseFile = (req: HttpRequest) => (opts: ParserOpts) => (event$: Observable<FileEvent>, finish$: Observable<any>) =>
event$.pipe(
takeUntil(finish$),
map(([ fieldname, file, filename, encoding, mimetype ]) => ({ fieldname, file, filename, encoding, mimetype })),
mergeMap(data => iif(
() => !shouldParseFieldname(opts.files)(data.fieldname),
throwError(new HttpError(`File "${data.fieldname}" is not acceptable`, HttpStatus.PRECONDITION_FAILED)),
of(data),
)),
mergeMap(data => merge(fileSizeLimit(opts.maxFileSize)(data)(finish$), of(data))),
mergeMap(data => isNonNullable(opts.stream)
? of(data).pipe(
mergeMap(opts.stream),
tap(setRequestData(req)(data)),
tap(() => data.file.resume()),
mapTo(data),
)
: of(data).pipe(
map(data => data.file as Readable),
mergeMap(fromReadableStream),
toArray(),
map(chunks => ({ buffer: Buffer.concat(chunks) })),
map(({ buffer }) => ({ buffer, size: Buffer.byteLength(buffer) })),
tap(setRequestData(req)(data)),
tap(() => data.file.resume()),
mapTo(data),
),
),
);