Skip to content

Commit

Permalink
Create task to cleanup action execution failures (elastic#96971)
Browse files Browse the repository at this point in the history
* Initial commit

* Add tests and support for concurrency

* Ability to disable functionality, use bulk APIs

* Fix type check

* Fix jest tests

* Cleanup

* Cleanup pt2

* Add unit tests

* Fix type check

* Fixes

* Update test failures

* Split schedule between cleanup and idle

* Add functional tests

* Add one more test

* Cleanup repeated code

* Remove duplicate actions plugin requirement

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
2 people authored and madirey committed May 11, 2021
1 parent 96c4dd9 commit 3aad429
Show file tree
Hide file tree
Showing 32 changed files with 1,247 additions and 25 deletions.
6 changes: 6 additions & 0 deletions x-pack/plugins/actions/server/actions_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ describe('create()', () => {
proxyOnlyHosts: undefined,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration('60s'),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
});

const localActionTypeRegistryParams = {
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/actions/server/actions_config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { schema } from '@kbn/config-schema';
import { ByteSizeValue } from '@kbn/config-schema';
import { ActionsConfig } from './config';
import {
Expand All @@ -24,6 +25,12 @@ const defaultActionsConfig: ActionsConfig = {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration(60000),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
};

describe('ensureUriAllowed', () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { SavedObjectsFindResult, SavedObjectsSerializer } from 'kibana/server';
import { loggingSystemMock, elasticsearchServiceMock } from '../../../../../src/core/server/mocks';
import { spacesMock } from '../../../spaces/server/mocks';
import { CleanupTasksOpts, cleanupTasks } from './cleanup_tasks';
import { TaskInstance } from '../../../task_manager/server';
import { ApiResponse, estypes } from '@elastic/elasticsearch';

describe('cleanupTasks', () => {
const logger = loggingSystemMock.create().get();
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const spaces = spacesMock.createStart();
const savedObjectsSerializer = ({
generateRawId: jest
.fn()
.mockImplementation((namespace: string | undefined, type: string, id: string) => {
const namespacePrefix = namespace ? `${namespace}:` : '';
return `${namespacePrefix}${type}:${id}`;
}),
} as unknown) as SavedObjectsSerializer;

const cleanupTasksOpts: CleanupTasksOpts = {
logger,
esClient,
spaces,
savedObjectsSerializer,
kibanaIndex: '.kibana',
taskManagerIndex: '.kibana_task_manager',
tasks: [],
};

const taskSO: SavedObjectsFindResult<TaskInstance> = {
id: '123',
type: 'task',
references: [],
score: 0,
attributes: {
id: '123',
taskType: 'foo',
scheduledAt: new Date(),
state: {},
runAt: new Date(),
startedAt: new Date(),
retryAt: new Date(),
ownerId: '234',
params: { spaceId: undefined, actionTaskParamsId: '123' },
schedule: { interval: '5m' },
},
};

beforeEach(() => {
esClient.bulk.mockReset();
});

it('should skip cleanup when there are no tasks to cleanup', async () => {
const result = await cleanupTasks(cleanupTasksOpts);
expect(result).toEqual({
success: true,
successCount: 0,
failureCount: 0,
});
expect(esClient.bulk).not.toHaveBeenCalled();
});

it('should delete action_task_params and task objects', async () => {
esClient.bulk.mockResolvedValue(({
body: { items: [], errors: false, took: 1 },
} as unknown) as ApiResponse<estypes.BulkResponse, unknown>);
const result = await cleanupTasks({
...cleanupTasksOpts,
tasks: [taskSO],
});
expect(esClient.bulk).toHaveBeenCalledWith({
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }],
});
expect(esClient.bulk).toHaveBeenCalledWith({
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }],
});
expect(result).toEqual({
success: true,
successCount: 1,
failureCount: 0,
});
});

it('should not delete the task if the action_task_params failed to delete', async () => {
esClient.bulk.mockResolvedValue(({
body: {
items: [
{
delete: {
_index: cleanupTasksOpts.kibanaIndex,
_id: 'action_task_params:123',
status: 500,
result: 'Failure',
error: true,
},
},
],
errors: true,
took: 1,
},
} as unknown) as ApiResponse<estypes.BulkResponse, unknown>);
const result = await cleanupTasks({
...cleanupTasksOpts,
tasks: [taskSO],
});
expect(esClient.bulk).toHaveBeenCalledWith({
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }],
});
expect(esClient.bulk).not.toHaveBeenCalledWith({
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }],
});
expect(result).toEqual({
success: false,
successCount: 0,
failureCount: 1,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
Logger,
ElasticsearchClient,
SavedObjectsFindResult,
SavedObjectsSerializer,
} from 'kibana/server';
import { TaskInstance } from '../../../task_manager/server';
import { SpacesPluginStart } from '../../../spaces/server';
import {
bulkDelete,
extractBulkResponseDeleteFailures,
getRawActionTaskParamsIdFromTask,
} from './lib';

export interface CleanupTasksOpts {
logger: Logger;
esClient: ElasticsearchClient;
tasks: Array<SavedObjectsFindResult<TaskInstance>>;
spaces?: SpacesPluginStart;
savedObjectsSerializer: SavedObjectsSerializer;
kibanaIndex: string;
taskManagerIndex: string;
}

export interface CleanupTasksResult {
success: boolean;
successCount: number;
failureCount: number;
}

/**
* Cleanup tasks
*
* This function receives action execution tasks that are in a failed state, removes
* the linked "action_task_params" object first and then if successful, the task manager's task.
*/
export async function cleanupTasks({
logger,
esClient,
tasks,
spaces,
savedObjectsSerializer,
kibanaIndex,
taskManagerIndex,
}: CleanupTasksOpts): Promise<CleanupTasksResult> {
const deserializedTasks = tasks.map((task) => ({
...task,
attributes: {
...task.attributes,
params:
typeof task.attributes.params === 'string'
? JSON.parse(task.attributes.params)
: task.attributes.params || {},
},
}));

// Remove accumulated action task params
const actionTaskParamIdsToDelete = deserializedTasks.map((task) =>
getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer })
);
const actionTaskParamBulkDeleteResult = await bulkDelete(
esClient,
kibanaIndex,
actionTaskParamIdsToDelete
);
const failedActionTaskParams = actionTaskParamBulkDeleteResult
? extractBulkResponseDeleteFailures(actionTaskParamBulkDeleteResult)
: [];
if (failedActionTaskParams?.length) {
logger.debug(
`Failed to delete the following action_task_params [${JSON.stringify(
failedActionTaskParams
)}]`
);
}

// Remove accumulated tasks
const taskIdsToDelete = deserializedTasks
.map((task) => {
const rawId = getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer });
// Avoid removing tasks that failed to remove linked objects
if (failedActionTaskParams?.find((item) => item._id === rawId)) {
return null;
}
const rawTaskId = savedObjectsSerializer.generateRawId(undefined, 'task', task.id);
return rawTaskId;
})
.filter((id) => !!id) as string[];
const taskBulkDeleteResult = await bulkDelete(esClient, taskManagerIndex, taskIdsToDelete);
const failedTasks = taskBulkDeleteResult
? extractBulkResponseDeleteFailures(taskBulkDeleteResult)
: [];
if (failedTasks?.length) {
logger.debug(`Failed to delete the following tasks [${JSON.stringify(failedTasks)}]`);
}

return {
success: failedActionTaskParams?.length === 0 && failedTasks.length === 0,
successCount: tasks.length - failedActionTaskParams.length - failedTasks.length,
failureCount: failedActionTaskParams.length + failedTasks.length,
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const TASK_TYPE = 'cleanup_failed_action_executions';
export const TASK_ID = `Actions-${TASK_TYPE}`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { schema } from '@kbn/config-schema';
import { ActionsConfig } from '../config';
import { ensureScheduled } from './ensure_scheduled';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { loggingSystemMock } from '../../../../../src/core/server/mocks';

describe('ensureScheduled', () => {
const logger = loggingSystemMock.create().get();
const taskManager = taskManagerMock.createStart();

const config: ActionsConfig['cleanupFailedExecutionsTask'] = {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
};

beforeEach(() => jest.resetAllMocks());

it(`should call task manager's ensureScheduled function with proper params`, async () => {
await ensureScheduled(taskManager, logger, config);
expect(taskManager.ensureScheduled).toHaveBeenCalledTimes(1);
expect(taskManager.ensureScheduled.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"id": "Actions-cleanup_failed_action_executions",
"params": Object {},
"schedule": Object {
"interval": "5m",
},
"state": Object {
"runs": 0,
"total_cleaned_up": 0,
},
"taskType": "cleanup_failed_action_executions",
},
]
`);
});

it('should log an error and not throw when ensureScheduled function throws', async () => {
taskManager.ensureScheduled.mockRejectedValue(new Error('Fail'));
await ensureScheduled(taskManager, logger, config);
expect(logger.error).toHaveBeenCalledWith(
'Error scheduling Actions-cleanup_failed_action_executions, received Fail'
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Logger } from 'kibana/server';
import { TASK_ID, TASK_TYPE } from './constants';
import { ActionsConfig } from '../config';
import { TaskManagerStartContract, asInterval } from '../../../task_manager/server';

export async function ensureScheduled(
taskManager: TaskManagerStartContract,
logger: Logger,
{ cleanupInterval }: ActionsConfig['cleanupFailedExecutionsTask']
) {
try {
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: {
interval: asInterval(cleanupInterval.asMilliseconds()),
},
state: {
runs: 0,
total_cleaned_up: 0,
},
params: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}
Loading

0 comments on commit 3aad429

Please sign in to comment.