Skip to content

Commit

Permalink
Handle content stream errors in report pre-deletion (elastic#173792)
Browse files Browse the repository at this point in the history
Re-addresses elastic#171363

The bug was still evident, especially when using network throttling to
add slight lag to the request turnaround times.

This PR adds more handling of errors that could be thrown slightly prior
to deleting the report document, when we try to clear all chunks of the
report using the content stream.

<details>
<summary>Before</summary>



https://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f


</details>

<details>
<summary>After</summary>


https://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78


</details>

### Checklist
- [x] Unit tests
  • Loading branch information
tsullivan committed Jan 2, 2024
1 parent e62b7ac commit dc813c3
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 221 deletions.
Expand Up @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => {
const docIndex = doc.index;
const stream = await getContentStream(reporting, { id: docId, index: docIndex });
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await new Promise<void>((resolve) => {
stream.end('', 'utf8', () => {
resolve();
});
const reportingSetup = reporting.getPluginSetupDeps();
const logger = reportingSetup.logger.get('delete-report');

// An "error" event is emitted if an error is
// passed to the `stream.end` callback from
// the _final method of the ContentStream.
// This event must be handled.
stream.on('error', (err) => {
logger.error(err);
});
await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
try {
// Overwriting existing content with an
// empty buffer to remove all the chunks.
await new Promise<void>((resolve, reject) => {
stream.end('', 'utf8', (error?: Error) => {
if (error) {
// handle error that could be thrown
// from the _write method of the ContentStream
reject(error);
} else {
resolve();
}
});
});

await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
} catch (error) {
logger.error(error);
return res.customError({
statusCode: 500,
});
}
});
};

Expand Down
Expand Up @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs'

type SetupServerReturn = Awaited<ReturnType<typeof setupServer>>;

describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
describe(`Reporting Job Management Routes: Internal`, () => {
const reportingSymbol = Symbol('reporting');
let server: SetupServerReturn['server'];
let usageCounter: IUsageCounter;
Expand Down Expand Up @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
await server.stop();
});

it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
describe('download report', () => {
it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});

it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
},
security: { authc: { getCurrentUser: () => undefined } },
},
security: { authc: { getCurrentUser: () => undefined } },
},
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});

it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});

it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);
it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});

it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);
it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});

it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);
it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});

it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});
it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
});
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
);
});

describe('successful downloads', () => {
it('when a known job-type is complete', async () => {
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);
Expand Down Expand Up @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
});
});
});

describe('delete report', () => {
it('handles content stream errors', async () => {
stream = new Readable({
read() {
this.push('test');
this.push(null);
},
}) as typeof stream;
stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => {
callback(new Error('An error occurred in ending the content stream'));
});

(getContentStream as jest.MockedFunction<typeof getContentStream>).mockResolvedValue(stream);
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8');
});
});
});

0 comments on commit dc813c3

Please sign in to comment.