Skip to content

Commit

Permalink
core: fix File already exists error on duplicate downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
thecodrr committed May 7, 2024
1 parent 9d5bec3 commit 79f20d1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 39 deletions.
6 changes: 5 additions & 1 deletion packages/core/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Database {
isInitialized = false;
eventManager = new EventManager();
sseMutex = new Mutex();
_fs?: FileStorage;

storage: StorageAccessor = () => {
if (!this.options?.storage)
Expand All @@ -109,7 +110,10 @@ class Database {
throw new Error(
"Database not initialized. Did you forget to call db.setup()?"
);
return new FileStorage(this.options.fs, this.tokenManager);
return (
this._fs ||
(this._fs = new FileStorage(this.options.fs, this.tokenManager))
);
};

crypto: CryptoAccessor = () => {
Expand Down
145 changes: 107 additions & 38 deletions packages/core/src/database/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,22 @@ import { logger } from "../logger";
export type FileStorageAccessor = () => FileStorage;
export type DownloadableFile = {
filename: string;
// metadata: AttachmentMetadata;
chunkSize: number;
};
export type QueueItem = DownloadableFile & {
cancel?: (reason?: string) => Promise<void>;
operation?: Promise<boolean>;
};

export class FileStorage {
downloads = new Map<string, QueueItem[]>();
uploads = new Map<string, QueueItem[]>();
id = Date.now();
downloads = new Map<string, QueueItem>();
uploads = new Map<string, QueueItem>();
groups = {
downloads: new Map<string, Set<string>>(),
uploads: new Map<string, Set<string>>()
};

constructor(
private readonly fs: IFileStorage,
private readonly tokenManager: TokenManager
Expand All @@ -50,12 +56,26 @@ export class FileStorage {
groupId: string,
eventData?: Record<string, unknown>
) {
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
let current = 0;
this.downloads.set(groupId, files);
const group = this.groups.downloads.get(groupId) || new Set();
files.forEach((f) => group.add(f.filename));
this.groups.downloads.set(groupId, group);

for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;

const download = this.downloads.get(file.filename);
if (download && download.operation) {
logger.debug("[queueDownloads] duplicate download", {
filename: file.filename,
groupId
});
await download.operation;
continue;
}

const { filename, chunkSize } = file;
if (await this.exists(filename)) {
current++;
Expand All @@ -68,22 +88,29 @@ export class FileStorage {
continue;
}

EV.publish(EVENTS.fileDownload, {
total,
current,
groupId,
filename
});

const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch(() => false)
.finally(() => {
this.downloads.delete(filename);
group.delete(filename);
});

EV.publish(EVENTS.fileDownload, {
total,
current,
groupId,
filename
});

const result = await execute().catch(() => false);
this.downloads.set(filename, file);
const result = await file.operation;
if (eventData)
EV.publish(EVENTS.fileDownloaded, {
success: result,
Expand All @@ -94,24 +121,48 @@ export class FileStorage {
eventData
});
}
this.downloads.delete(groupId);
}

async queueUploads(files: DownloadableFile[], groupId: string) {
let current = 0;
const token = await this.tokenManager.getAccessToken();
const total = files.length;
let current = 0;
this.uploads.set(groupId, files);
const group = this.groups.uploads.get(groupId) || new Set();
files.forEach((f) => group.add(f.filename));
this.groups.uploads.set(groupId, group);

for (const file of files as QueueItem[]) {
if (!group.has(file.filename)) continue;

const upload = this.uploads.get(file.filename);
if (upload && upload.operation) {
logger.debug("[queueUploads] duplicate upload", {
filename: file.filename,
groupId
});
await file.operation;
continue;
}

const { filename, chunkSize } = file;
let error = null;
const url = `${hosts.API_HOST}/s3?name=${filename}`;
const { execute, cancel } = this.fs.uploadFile(filename, {
chunkSize,
url,
headers: { Authorization: `Bearer ${token}` }
});
file.cancel = cancel;
file.operation = execute()
.catch((e) => {
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
})
.finally(() => {
this.uploads.delete(filename);
group.delete(filename);
});

EV.publish(EVENTS.fileUpload, {
total,
Expand All @@ -120,13 +171,8 @@ export class FileStorage {
filename
});

let error = null;
const result = await execute().catch((e) => {
logger.error(e, "failed to upload attachment", { hash: filename });
error = e;
return false;
});

this.uploads.set(filename, file);
const result = await file.operation;
EV.publish(EVENTS.fileUploaded, {
error,
success: result,
Expand All @@ -136,44 +182,67 @@ export class FileStorage {
filename
});
}
this.uploads.delete(groupId);
}

async downloadFile(groupId: string, filename: string, chunkSize: number) {
if (await this.exists(filename)) return true;

const download = this.downloads.get(filename);
if (download && download.operation) {
logger.debug("[downloadFile] duplicate download", { filename, groupId });
return await download.operation;
}

logger.debug("[downloadFile] downloading", { filename, groupId });

const url = `${hosts.API_HOST}/s3?name=${filename}`;
const file: QueueItem = { filename, chunkSize };
const token = await this.tokenManager.getAccessToken();
const group = this.groups.downloads.get(groupId) || new Set();
const { execute, cancel } = this.fs.downloadFile(filename, {
url,
chunkSize,
headers: { Authorization: `Bearer ${token}` }
});
this.downloads.set(groupId, [{ cancel, filename, chunkSize }]);
const result = await execute();
this.downloads.delete(groupId);
return result;
file.cancel = cancel;
file.operation = execute().finally(() => {
this.downloads.delete(filename);
group.delete(filename);
});

this.downloads.set(filename, file);
this.groups.downloads.set(groupId, group.add(filename));
return await file.operation;
}

async cancel(groupId: string) {
const queues = [
{ type: "download", files: this.downloads.get(groupId) },
{ type: "upload", files: this.uploads.get(groupId) }
].filter((a) => !!a.files);
{
type: "download",
ids: this.groups.downloads.get(groupId),
files: this.downloads
},
{
type: "upload",
ids: this.groups.uploads.get(groupId),
files: this.uploads
}
].filter((a) => !!a.ids);

for (const queue of queues) {
if (!queue.files) continue;
for (let i = 0; i < queue.files.length; ++i) {
const file = queue.files[i];
if (file.cancel) await file.cancel("Operation canceled.");
queue.files.splice(i, 1);
if (!queue.ids) continue;

for (const filename of queue.ids) {
const file = queue.files.get(filename);
if (file?.cancel) await file.cancel("Operation canceled.");
queue.ids.delete(filename);
}

if (queue.type === "download") {
this.downloads.delete(groupId);
this.groups.downloads.delete(groupId);
EV.publish(EVENTS.downloadCanceled, { groupId, canceled: true });
} else if (queue.type === "upload") {
this.uploads.delete(groupId);
this.groups.uploads.delete(groupId);
EV.publish(EVENTS.uploadCanceled, { groupId, canceled: true });
}
}
Expand Down

0 comments on commit 79f20d1

Please sign in to comment.