Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): fix "too many open files" #2498

Merged
merged 6 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions packages/core/src/input/InputFileResolver.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import * as path from 'path';
import { promises as fs } from 'fs';
import { StringDecoder } from 'string_decoder';
import * as path from 'path';
import fs = require('fs');

import { from } from 'rxjs';
import { filter, mergeMap, toArray } from 'rxjs/operators';
import { File, StrykerOptions } from '@stryker-mutator/api/core';
import { Logger } from '@stryker-mutator/api/logging';
import { commonTokens, tokens } from '@stryker-mutator/api/plugin';
Expand All @@ -10,7 +12,7 @@ import { childProcessAsPromised, isErrnoException, normalizeWhitespaces, Stryker

import { coreTokens } from '../di';
import StrictReporter from '../reporters/StrictReporter';
import { glob } from '../utils/fileUtils';
import { glob, MAX_CONCURRENT_FILE_IO } from '../utils/fileUtils';
import { defaultOptions } from '../config/OptionsValidator';

import InputFileCollection from './InputFileCollection';
Expand Down Expand Up @@ -157,13 +159,20 @@ export default class InputFileResolver {
}

private async readFiles(fileNames: string[]): Promise<File[]> {
const files = await Promise.all(fileNames.map((fileName) => this.readFile(fileName)));
return files.filter(notEmpty);
const files = from(fileNames)
.pipe(
mergeMap((fileName) => this.readFile(fileName), MAX_CONCURRENT_FILE_IO),
filter(notEmpty),
toArray()
)
.toPromise();
return files;
}

private async readFile(fileName: string): Promise<File | null> {
try {
const content = await fs.readFile(fileName);
const p = fs.promises.readFile(fileName);
nicojs marked this conversation as resolved.
Show resolved Hide resolved
const content = await p;
const file = new File(fileName, content);
this.reportSourceFilesRead(file);
return file;
Expand Down
14 changes: 11 additions & 3 deletions packages/core/src/sandbox/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import * as mkdirp from 'mkdirp';
import { Logger, LoggerFactoryMethod } from '@stryker-mutator/api/logging';
import { tokens, commonTokens } from '@stryker-mutator/api/plugin';

import { from } from 'rxjs';

import { mergeMap, toArray } from 'rxjs/operators';

import { TemporaryDirectory } from '../utils/TemporaryDirectory';
import { findNodeModules, symlinkJunction, writeFile } from '../utils/fileUtils';
import { findNodeModules, MAX_CONCURRENT_FILE_IO, symlinkJunction, writeFile } from '../utils/fileUtils';
import { coreTokens } from '../di';

interface SandboxFactory {
Expand Down Expand Up @@ -75,8 +79,12 @@ export class Sandbox {
}

private fillSandbox(): Promise<void[]> {
const copyPromises = this.files.map((file) => this.fillFile(file));
return Promise.all(copyPromises);
return from(this.files)
.pipe(
mergeMap((file) => this.fillFile(file), MAX_CONCURRENT_FILE_IO),
toArray()
)
.toPromise();
}

private async runBuildCommand() {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/utils/fileUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import * as nodeGlob from 'glob';
import * as mkdirp from 'mkdirp';
import * as rimraf from 'rimraf';

export const MAX_CONCURRENT_FILE_IO = 256;

export function glob(expression: string): Promise<string[]> {
return new Promise<string[]>((resolve, reject) => {
nodeGlob(expression, { nodir: true }, (error, matches) => {
Expand Down
57 changes: 43 additions & 14 deletions packages/core/test/unit/input/InputFileResolver.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import * as os from 'os';
import * as path from 'path';
import * as fs from 'fs';
import fs = require('fs');

import { File } from '@stryker-mutator/api/core';
import { SourceFile } from '@stryker-mutator/api/report';
import { testInjector, factory, assertions } from '@stryker-mutator/test-helpers';
import { testInjector, factory, assertions, tick } from '@stryker-mutator/test-helpers';
import { createIsDirError, fileNotFoundError } from '@stryker-mutator/test-helpers/src/factory';
import { childProcessAsPromised, errorToString } from '@stryker-mutator/util';
import { childProcessAsPromised, errorToString, Task } from '@stryker-mutator/util';
import { expect } from 'chai';
import * as sinon from 'sinon';

Expand All @@ -29,8 +29,8 @@ describe(InputFileResolver.name, () => {
beforeEach(() => {
reporterMock = mock(BroadcastReporter);
globStub = sinon.stub(fileUtils, 'glob');
readFileStub = sinon
.stub(fs.promises, 'readFile')
readFileStub = sinon.stub(fs.promises, 'readFile');
readFileStub
.withArgs(sinon.match.string)
.resolves(Buffer.from('')) // fallback
.withArgs(sinon.match('file1'))
Expand All @@ -43,6 +43,7 @@ describe(InputFileResolver.name, () => {
.resolves(Buffer.from('mutate 1 content'))
.withArgs(sinon.match('mute2'))
.resolves(Buffer.from('mutate 2 content'));

globStub.withArgs('mute*').resolves(['/mute1.js', '/mute2.js']);
globStub.withArgs('mute1').resolves(['/mute1.js']);
globStub.withArgs('mute2').resolves(['/mute2.js']);
Expand Down Expand Up @@ -152,6 +153,43 @@ describe(InputFileResolver.name, () => {
]);
});

it('should reject when a globbing expression results in a reject', () => {
testInjector.options.files = ['fileError', 'fileError'];
testInjector.options.mutate = ['file1'];
sut = createSut();
const expectedError = new Error('ERROR: something went wrong');
globStub.withArgs('fileError').rejects(expectedError);
return expect(sut.resolve()).rejectedWith(expectedError);
});

it('should not open too many file handles', async () => {
// Arrange
const maxFileIO = 256;
sut = createSut();
const fileHandles: Array<{ fileName: string; task: Task<Buffer> }> = [];
for (let i = 0; i < maxFileIO + 1; i++) {
const fileName = `file_${i}.js`;
const readFileTask = new Task<Buffer>();
fileHandles.push({ fileName, task: readFileTask });
readFileStub.withArgs(sinon.match(fileName)).returns(readFileTask.promise);
}
childProcessExecStub.resolves({
stdout: Buffer.from(fileHandles.map(({ fileName }) => fileName).join(os.EOL)),
});

// Act
const onGoingWork = sut.resolve();
await tick();
expect(readFileStub).callCount(maxFileIO);
fileHandles[0].task.resolve(Buffer.from('content'));
await tick();

// Assert
expect(readFileStub).callCount(maxFileIO + 1);
fileHandles.forEach(({ task }) => task.resolve(Buffer.from('content')));
await onGoingWork;
});

describe('with mutate file expressions', () => {
it('should result in the expected mutate files', async () => {
testInjector.options.mutate = ['mute*'];
Expand Down Expand Up @@ -256,15 +294,6 @@ describe(InputFileResolver.name, () => {
});
});

it('should reject when a globbing expression results in a reject', () => {
testInjector.options.files = ['fileError', 'fileError'];
testInjector.options.mutate = ['file1'];
sut = createSut();
const expectedError = new Error('ERROR: something went wrong');
globStub.withArgs('fileError').rejects(expectedError);
return expect(sut.resolve()).rejectedWith(expectedError);
});

describe('when excluding files with "!"', () => {
it('should exclude the files that were previously included', async () => {
testInjector.options.files = ['file2', 'file1', '!file2'];
Expand Down
30 changes: 27 additions & 3 deletions packages/core/test/unit/sandbox/sandbox.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { expect } from 'chai';
import sinon = require('sinon');
import * as mkdirp from 'mkdirp';

import { testInjector } from '@stryker-mutator/test-helpers';
import { testInjector, tick } from '@stryker-mutator/test-helpers';
import { File } from '@stryker-mutator/api/core';
import { fileAlreadyExistsError } from '@stryker-mutator/test-helpers/src/factory';
import { normalizeWhitespaces } from '@stryker-mutator/util';
import { normalizeWhitespaces, Task } from '@stryker-mutator/util';

import { Sandbox } from '../../../src/sandbox/sandbox';
import { coreTokens } from '../../../src/di';
Expand Down Expand Up @@ -80,7 +80,31 @@ describe(Sandbox.name, () => {
it('should be able to copy a local file', async () => {
files.push(new File('localFile.txt', 'foobar'));
await createSut();
expect(fileUtils.writeFile).calledWith(path.join(SANDBOX_WORKING_DIR, 'localFile.txt'), Buffer.from('foobar'));
expect(writeFileStub).calledWith(path.join(SANDBOX_WORKING_DIR, 'localFile.txt'), Buffer.from('foobar'));
});

it('should not open too many file handles', async () => {
const maxFileIO = 256;
const fileHandles: Array<{ fileName: string; task: Task }> = [];
for (let i = 0; i < maxFileIO + 1; i++) {
const fileName = `file_${i}.js`;
const task = new Task();
fileHandles.push({ fileName, task });
writeFileStub.withArgs(sinon.match(fileName)).returns(task.promise);
files.push(new File(fileName, ''));
}

// Act
const onGoingWork = createSut();
await tick();
expect(writeFileStub).callCount(maxFileIO);
fileHandles[0].task.resolve();
await tick();

// Assert
expect(writeFileStub).callCount(maxFileIO + 1);
fileHandles.forEach(({ task }) => task.resolve());
await onGoingWork;
});

it('should symlink node modules in sandbox directory if exists', async () => {
Expand Down