Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Reduce memory consumption on BinaryDataManager.init #6633

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

await BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(execution.id!);
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]);

await deleteExecution(execution);

Expand Down
13 changes: 5 additions & 8 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {

async deleteExecution(executionId: string) {
// TODO: Should this be awaited? Should we add a catch in case it fails?
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(executionId);
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([executionId]);
return this.delete({ id: executionId });
}

Expand Down Expand Up @@ -392,17 +392,14 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return;
}

const idsToDelete = executions.map(({ id }) => id);

const executionIds = executions.map(({ id }) => id);
const binaryDataManager = BinaryDataManager.getInstance();
await Promise.all(
idsToDelete.map(async (id) => binaryDataManager.deleteBinaryDataByExecutionId(id)),
);
await binaryDataManager.deleteBinaryDataByExecutionIds(executionIds);

do {
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
const batch = idsToDelete.splice(0, 500);
const batch = executionIds.splice(0, 500);
await this.delete(batch);
} while (idsToDelete.length > 0);
} while (executionIds.length > 0);
}
}
136 changes: 65 additions & 71 deletions packages/core/src/BinaryDataManager/FileSystem.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob from 'fast-glob';
import { createReadStream } from 'fs';
import fs from 'fs/promises';
import path from 'path';
Expand All @@ -12,6 +13,9 @@ import { FileNotFoundError } from '../errors';
const PREFIX_METAFILE = 'binarymeta';
const PREFIX_PERSISTED_METAFILE = 'persistedmeta';

const executionExtractionRegexp =
/^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/;

export class BinaryDataFileSystem implements IBinaryDataManager {
private storagePath: string;

Expand All @@ -36,16 +40,12 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
}, this.persistedBinaryDataTTL * 30000);
}

return fs
.readdir(this.storagePath)
.catch(async () => fs.mkdir(this.storagePath, { recursive: true }))
.then(async () => fs.readdir(this.getBinaryDataMetaPath()))
.catch(async () => fs.mkdir(this.getBinaryDataMetaPath(), { recursive: true }))
.then(async () => fs.readdir(this.getBinaryDataPersistMetaPath()))
.catch(async () => fs.mkdir(this.getBinaryDataPersistMetaPath(), { recursive: true }))
.then(async () => this.deleteMarkedFiles())
.then(async () => this.deleteMarkedPersistedFiles())
.then(() => {});
await this.assertFolder(this.storagePath);
await this.assertFolder(this.getBinaryDataMetaPath());
await this.assertFolder(this.getBinaryDataPersistMetaPath());

await this.deleteMarkedFiles();
await this.deleteMarkedPersistedFiles();
}

async getFileSize(identifier: string): Promise<number> {
Expand Down Expand Up @@ -122,46 +122,37 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
`${PREFIX_PERSISTED_METAFILE}_${executionId}_${timeoutTime}`,
);

return fs
.readFile(filePath)
.catch(async () => fs.writeFile(filePath, identifier))
.then(() => {});
try {
await fs.access(filePath);
} catch {
await fs.writeFile(filePath, identifier);
}
}

private async deleteMarkedFilesByMeta(metaPath: string, filePrefix: string): Promise<void> {
const currentTimeValue = new Date().valueOf();
const metaFileNames = await fs.readdir(metaPath);

const execsAdded: { [key: string]: number } = {};

const promises = metaFileNames.reduce<Array<Promise<void>>>((prev, curr) => {
const [prefix, executionId, ts] = curr.split('_');

if (prefix !== filePrefix) {
return prev;
}

const execTimestamp = parseInt(ts, 10);

if (execTimestamp < currentTimeValue) {
if (execsAdded[executionId]) {
// do not delete data, only meta file
prev.push(this.deleteMetaFileByPath(path.join(metaPath, curr)));
return prev;
}

execsAdded[executionId] = 1;
prev.push(
this.deleteBinaryDataByExecutionId(executionId).then(async () =>
this.deleteMetaFileByPath(path.join(metaPath, curr)),
),
);
}

return prev;
}, []);

await Promise.all(promises);
const metaFileNames = await glob(`${filePrefix}_*`, { cwd: metaPath });

const executionIds = metaFileNames
.map((f) => f.split('_') as [string, string, string])
.filter(([prefix, , ts]) => {
if (prefix !== filePrefix) return false;
const execTimestamp = parseInt(ts, 10);
return execTimestamp < currentTimeValue;
})
.map((e) => e[1]);

const filesToDelete = [];
const deletedIds = await this.deleteBinaryDataByExecutionIds(executionIds);
for (const executionId of deletedIds) {
filesToDelete.push(
...(await glob(`${filePrefix}_${executionId}_`, {
absolute: true,
cwd: metaPath,
})),
);
}
await Promise.all(filesToDelete.map(async (file) => fs.rm(file)));
}

async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise<string> {
Expand All @@ -174,37 +165,44 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
return newBinaryDataId;
}

async deleteBinaryDataByExecutionId(executionId: string): Promise<void> {
const regex = new RegExp(`${executionId}_*`);
const filenames = await fs.readdir(this.storagePath);

const promises = filenames.reduce<Array<Promise<void>>>((allProms, filename) => {
if (regex.test(filename)) {
allProms.push(fs.rm(this.resolveStoragePath(filename)));
async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<string[]> {
const set = new Set(executionIds);
const fileNames = await fs.readdir(this.storagePath);
const deletedIds = [];
for (const fileName of fileNames) {
const executionId = fileName.match(executionExtractionRegexp)?.[1];
Copy link
Contributor

@krynble krynble Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the regexp also matches the .metadata that is the extension as part of this? If so, it'd cause the deletion below to fail as it'd try to delete something.metadata.metadata

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the regexp has a $ after the uuid matching, so it should not be matching the metadata file names.

if (executionId && set.has(executionId)) {
const filePath = this.resolveStoragePath(fileName);
await Promise.all([fs.rm(filePath), fs.rm(`${filePath}.metadata`)]);
deletedIds.push(executionId);
}
return allProms;
}, []);

await Promise.all(promises);
}
return deletedIds;
}

async deleteBinaryDataByIdentifier(identifier: string): Promise<void> {
return this.deleteFromLocalStorage(identifier);
}

async persistBinaryDataForExecutionId(executionId: string): Promise<void> {
return fs.readdir(this.getBinaryDataPersistMetaPath()).then(async (metaFiles) => {
const promises = metaFiles.reduce<Array<Promise<void>>>((prev, curr) => {
if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) {
prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr)));
return prev;
}

const metaFiles = await fs.readdir(this.getBinaryDataPersistMetaPath());
const promises = metaFiles.reduce<Array<Promise<void>>>((prev, curr) => {
if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) {
prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr)));
return prev;
}, []);
}

await Promise.all(promises);
});
return prev;
}, []);
await Promise.all(promises);
}

private async assertFolder(folder: string): Promise<void> {
try {
await fs.access(folder);
} catch {
await fs.mkdir(folder, { recursive: true });
}
}

private generateFileName(prefix: string): string {
Expand All @@ -219,10 +217,6 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
return path.join(this.storagePath, 'persistMeta');
}

private async deleteMetaFileByPath(metaFilePath: string): Promise<void> {
return fs.rm(metaFilePath);
}

private async deleteFromLocalStorage(identifier: string) {
return fs.rm(this.getBinaryPath(identifier));
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/BinaryDataManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ export class BinaryDataManager {
}
}

async deleteBinaryDataByExecutionId(executionId: string): Promise<void> {
async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<void> {
if (this.managers[this.binaryDataMode]) {
await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionId(executionId);
await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionIds(executionIds);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export interface IBinaryDataManager {
deleteMarkedFiles(): Promise<unknown>;
deleteBinaryDataByIdentifier(identifier: string): Promise<void>;
duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise<string>;
deleteBinaryDataByExecutionId(executionId: string): Promise<void>;
deleteBinaryDataByExecutionIds(executionIds: string[]): Promise<string[]>;
persistBinaryDataForExecutionId(executionId: string): Promise<void>;
}

Expand Down