Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extended completed file #380

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/express-gcs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ const app = express();

const storage = new GCStorage({ maxUploadSize: '1GB' });

storage.onComplete = ({ uri, id }) => {
console.log(`File upload complete, storage path: ${uri}`);
// send gcs link to client
return { id, link: uri };
storage.onComplete = async file => {
const info = await file.get().catch(console.error);
console.log(info);
return file.move(file.originalName).catch(console.error);
};

app.use('/files', uploadx({ storage }));
Expand Down
66 changes: 18 additions & 48 deletions examples/express.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,27 @@
import * as express from 'express';
import { DiskFile, DiskStorage, OnComplete, uploadx, UploadxResponse } from 'node-uploadx';
import { DiskFile, uploadx } from 'node-uploadx';
import { join } from 'path';

const app = express();

const auth: express.Handler = (req, res, next) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(req as any)['user'] = { id: '92be348f-172d-5f69-840d-100f79e4d1ef' };
next();
const onComplete: express.RequestHandler = async (req, res, next) => {
const file = req.body as DiskFile;
await file.lock(() => res.status(423).json({ message: 'processing' }));
const sha1 = await file.hash('sha1');
await file.move(join('upload', file.originalName));
await file.lock(() => res.json({ ...file, sha1 }));
await file.delete();
return res.json({ ...file, sha1 });
};

app.use(auth);

type OnCompleteBody = {
message: string;
id: string;
};

const onComplete: OnComplete<DiskFile, UploadxResponse<OnCompleteBody>> = file => {
const message = `File upload is finished, path: ${file.name}`;
console.log(message);
return {
statusCode: 200,
message,
id: file.id,
headers: { ETag: file.id }
};
};

const storage = new DiskStorage({
directory: 'upload',
maxMetadataSize: '1mb',
onComplete,
expiration: { maxAge: '1h', purgeInterval: '10min' },
validation: {
mime: { value: ['video/*'], response: [415, { message: 'video only' }] },
size: {
value: 500_000_000,
isValid(file) {
this.response = [
412,
{ message: `The file size(${file.size}) is larger than ${this.value as number} bytes` }
];
return file.size <= this.value;
}
},
mtime: {
isValid: file => !!file.metadata.lastModified,
response: [403, { message: 'Missing `lastModified` property' }]
}
}
});

app.use('/files', uploadx({ storage }));
app.all(
'/files',
uploadx.upload({
directory: 'upload',
expiration: { maxAge: '12h', purgeInterval: '1h' }
}),
onComplete
);

app.listen(3002, () => {
console.log('listening on port:', 3002);
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/handlers/base-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
fail,
getBaseUrl,
isUploadxError,
isUploadxResponse,
isValidationError,
Logger,
pick,
Expand Down Expand Up @@ -133,6 +134,14 @@ export abstract class BaseHandler<TFile extends Readonly<File>>
if ('status' in file && file.status) {
this.log('[%s]: %s', file.status, file.name);
this.listenerCount(file.status) && this.emit(file.status, file);
if (file.lockedBy) {
const lockResponse =
file.lockedBy instanceof Function
? ((await file.lockedBy()) as UploadxResponse)
: file.lockedBy;
if (lockResponse && isUploadxResponse(lockResponse)) this.send(res, lockResponse);
return;
}
if (file.status === 'completed') {
req['_body'] = true;
req['body'] = file;
Expand Down
35 changes: 31 additions & 4 deletions packages/core/src/storages/disk-storage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import * as http from 'http';
import { resolve as pathResolve } from 'path';
import { ensureFile, ERRORS, fail, fsp, getWriteStream, HttpError } from '../utils';
import {
ensureFile,
ERRORS,
fail,
fileChecksum,
fsp,
getWriteStream,
HttpError,
move
} from '../utils';
import { File, FileInit, FilePart, hasContent, isCompleted, isValidPart } from './file';
import { BaseStorage, BaseStorageOptions } from './storage';
import { METAFILE_EXTNAME, MetaStorage } from './meta-storage';
import { LocalMetaStorage, LocalMetaStorageOptions } from './local-meta-storage';

const INVALID_OFFSET = -1;

export class DiskFile extends File {}
export interface DiskFile extends File {
move: (dest: string) => Promise<any>;
copy: (dest: string) => Promise<any>;
delete: () => Promise<any>;
hash: (algorithm?: 'sha1' | 'md5', encoding?: 'hex' | 'base64') => Promise<string>;
}

export type DiskStorageOptions = BaseStorageOptions<DiskFile> & {
/**
Expand Down Expand Up @@ -51,8 +65,19 @@ export class DiskStorage extends BaseStorage<DiskFile> {
return super.normalizeError(error);
}

buildCompletedFile(file: DiskFile): DiskFile {
const completed = { ...file };
completed.lock = token => (completed.lockedBy = token);
completed.delete = () => this.delete(file.name);
completed.hash = (algorithm?: 'sha1' | 'md5', encoding?: 'hex' | 'base64') =>
fileChecksum(this.getFilePath(file.name), algorithm, encoding);
completed.copy = async (dest: string) => fsp.copyFile(this.getFilePath(file.name), dest);
completed.move = async (dest: string) => move(this.getFilePath(file.name), dest);
return completed;
}

async create(req: http.IncomingMessage, fileInit: FileInit): Promise<DiskFile> {
const file = new DiskFile(fileInit);
const file = new File(fileInit) as DiskFile;
file.name = this.namingFunction(file);
await this.validate(file);
const path = this.getFilePath(file.name);
Expand All @@ -64,14 +89,16 @@ export class DiskStorage extends BaseStorage<DiskFile> {

async write(part: FilePart): Promise<DiskFile> {
const file = await this.getMeta(part.name);
await this.checkIfExpired(file);
if (file.status === 'completed') return file;
if (file.lockedBy) return file;
await this.checkIfExpired(file);
if (!isValidPart(part, file)) return fail(ERRORS.FILE_CONFLICT);
try {
file.bytesWritten = await this._write({ ...file, ...part });
if (file.bytesWritten === INVALID_OFFSET) return fail(ERRORS.FILE_CONFLICT);
if (isCompleted(file)) {
await this.saveMeta(file);
return this.buildCompletedFile(file);
}
return file;
} catch (err) {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/storages/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class File implements FileInit {
userId?;
expiredAt?: string | Date | number;
createdAt?: string | Date | number;
lock!: (token: unknown) => any;
lockedBy?: unknown;

constructor({ metadata = {}, originalName, contentType, size, userId }: FileInit) {
this.metadata = metadata;
Expand Down
38 changes: 35 additions & 3 deletions packages/core/src/utils/fs.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
import { createWriteStream, promises as fsp, WriteStream } from 'fs';
import { createHash, HexBase64Latin1Encoding } from 'crypto';
import { createReadStream, createWriteStream, promises as fsp, WriteStream } from 'fs';
import { dirname, posix } from 'path';

function isError(error: unknown): error is NodeJS.ErrnoException {
return error instanceof Error;
}

export function fileChecksum(
filePath: string,
algorithm: 'sha1' | 'md5' = 'md5',
encoding: HexBase64Latin1Encoding = 'hex'
): Promise<string> {
return new Promise(resolve => {
const hash = createHash(algorithm);
createReadStream(filePath)
.on('data', data => hash.update(data))
.on('end', () => resolve(hash.digest(encoding)));
});
}

export async function copy(src: string, dest: string): Promise<void> {
return fsp.copyFile(src, dest);
}

export async function move(src: string, dest: string): Promise<void> {
try {
await fsp.rename(src, dest);
} catch (e) {
if (isError(e) && e.code === 'EXDEV') {
await copy(src, dest);
await fsp.unlink(src);
}
}
}
/**
* Ensures that the directory exists
* @param dir
*/
export async function ensureDir(dir: string): Promise<void> {
await fsp.mkdir(dir, { recursive: true });
export function ensureDir(dir: string): Promise<void> {
return fsp.mkdir(dir, { recursive: true });
}

/**
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/utils/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as http from 'http';
import { Readable } from 'stream';
import { Metadata } from '../storages';
import { isNumber } from './primitives';

export const typeis = (req: http.IncomingMessage, types: string[]): string | false => {
const contentType = req.headers['content-type'] || '';
Expand Down Expand Up @@ -73,6 +74,11 @@ export interface UploadxResponse<T = ResponseBody> extends Record<string, any> {
headers?: Headers;
body?: T;
}

export function isUploadxResponse(value: unknown): value is UploadxResponse {
return !!(typeof value === 'object' && isNumber((value as UploadxResponse).statusCode));
}

export type ResponseBody = string | Record<string, any>;
export type ResponseBodyType = 'text' | 'json';
export type ResponseTuple<T = ResponseBody> = [statusCode: number, body?: T, headers?: Headers];
Expand Down
76 changes: 67 additions & 9 deletions packages/gcs/src/gcs-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import * as http from 'http';
import request from 'node-fetch';
import { authScopes, BUCKET_NAME, storageAPI, uploadAPI } from './constants';
import { GCSMetaStorage, GCSMetaStorageOptions } from './gcs-meta-storage';
import { resolve } from 'url';

export interface ClientError extends Error {
code: string;
Expand Down Expand Up @@ -72,9 +73,13 @@ export interface GCStorageOptions extends BaseStorageOptions<GCSFile>, GoogleAut
metaStorageConfig?: LocalMetaStorageOptions | GCSMetaStorageOptions;
}

export class GCSFile extends File {
export interface GCSFile extends File {
GCSUploadURI?: string;
uri = '';
uri: string;
move: (dest: string) => Promise<Record<string, string>>;
copy: (dest: string) => Promise<Record<string, string>>;
get: () => Promise<Record<string, string>>;
delete: () => Promise<any>;
}

/**
Expand All @@ -95,6 +100,7 @@ export class GCStorage extends BaseStorage<GCSFile> {
storageBaseURI: string;
uploadBaseURI: string;
meta: MetaStorage<GCSFile>;
private readonly bucket: string;

constructor(public config: GCStorageOptions = {}) {
super(config);
Expand All @@ -109,11 +115,11 @@ export class GCStorage extends BaseStorage<GCSFile> {
}
config.scopes ||= authScopes;
config.keyFile ||= process.env.GCS_KEYFILE;
const bucketName = config.bucket || process.env.GCS_BUCKET || BUCKET_NAME;
this.storageBaseURI = [storageAPI, bucketName, 'o'].join('/');
this.uploadBaseURI = [uploadAPI, bucketName, 'o'].join('/');
this.bucket = config.bucket || process.env.GCS_BUCKET || BUCKET_NAME;
this.storageBaseURI = [storageAPI, this.bucket, 'o'].join('/');
this.uploadBaseURI = [uploadAPI, this.bucket, 'o'].join('/');
this.authClient = new GoogleAuth(config);
this._checkBucket(bucketName);
this._checkBucket();
}

normalizeError(error: ClientError): HttpError {
Expand All @@ -131,7 +137,7 @@ export class GCStorage extends BaseStorage<GCSFile> {
}

async create(req: http.IncomingMessage, config: FileInit): Promise<GCSFile> {
const file = new GCSFile(config);
const file = new File(config) as GCSFile;
file.name = this.namingFunction(file);
await this.validate(file);
try {
Expand Down Expand Up @@ -173,6 +179,7 @@ export class GCStorage extends BaseStorage<GCSFile> {
if (isCompleted(file)) {
file.uri = `${this.storageBaseURI}/${file.name}`;
await this._onComplete(file);
return this.buildCompletedFile(file);
}
return file;
}
Expand All @@ -190,6 +197,57 @@ export class GCStorage extends BaseStorage<GCSFile> {
return [{ name } as GCSFile];
}

async copy(name: string, dest: string): Promise<Record<string, string>> {
type CopyProgress = {
rewriteToken?: string;
kind: string;
objectSize: number;
totalBytesRewritten: number;
done: boolean;
resource: Record<string, any>;
};
const newPath = resolve(`/${this.bucket}/${name}`, encodeURI(dest));
const [, bucket, ...pathSegments] = newPath.split('/');
const filename = pathSegments.join('/');
const url = `${this.storageBaseURI}/${name}/rewriteTo/b/${bucket}/o/${filename}`;
let progress = {} as CopyProgress;
const opts = {
body: '',
headers: { 'Content-Type': 'application/json' },
method: 'POST' as const,
url
};
do {
opts.body = progress.rewriteToken
? JSON.stringify({ rewriteToken: progress.rewriteToken })
: '';
progress = (await this.authClient.request<CopyProgress>(opts)).data;
} while (progress.rewriteToken);
return progress.resource;
}

async move(name: string, dest: string): Promise<Record<string, string>> {
const resource = await this.copy(name, dest);
const url = `${this.storageBaseURI}/${name}`;
await this.authClient.request({ method: 'DELETE' as const, url });
return resource;
}

async _get(name: string): Promise<Record<string, string>> {
const url = `${this.storageBaseURI}/${name}`;
return (await this.authClient.request<Record<string, string>>({ url })).data;
}

buildCompletedFile(file: GCSFile): GCSFile {
const completed = { ...file };
completed.lock = token => (completed.lockedBy = token);
completed.get = () => this._get(file.name);
completed.delete = () => this.delete(file.name);
completed.copy = async (dest: string) => this.copy(file.name, dest);
completed.move = async (dest: string) => this.move(file.name, dest);
return completed;
}

protected async _write(part: FilePart & GCSFile): Promise<number> {
const { size, uri, body } = part;
const contentRange = buildContentRange(part);
Expand Down Expand Up @@ -228,9 +286,9 @@ export class GCStorage extends BaseStorage<GCSFile> {
return this.deleteMeta(file.name);
};

private _checkBucket(bucketName: string): void {
private _checkBucket(): void {
this.authClient
.request({ url: `${storageAPI}/${bucketName}` })
.request({ url: this.storageBaseURI })
.then(() => (this.isReady = true))
.catch((err: ClientError) => {
// eslint-disable-next-line no-console
Expand Down
Loading