Skip to content

Commit

Permalink
WIP [ResponseOps]: Refactor alerting task runner - combine loadRuleAt…
Browse files Browse the repository at this point in the history
…tributesAndRun() and validateAndExecuteRule()

resolves elastic#131544

Extract the `loadRuleAttributesAndRun()` and `validateAndExecuteRule()`
methods from the alerting task manager, into a separate module.

meta issue: elastic#124206
  • Loading branch information
pmuellr committed May 18, 2022
1 parent afe71c7 commit 06b2769
Showing 1 changed file with 260 additions and 0 deletions.
260 changes: 260 additions & 0 deletions x-pack/plugins/alerting/server/task_runner/load-validate-rule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import apm from 'elastic-apm-node';
import uuid from 'uuid';

import { KibanaRequest, Logger } from '@kbn/core/server';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/server';
import type { Request } from '@hapi/hapi';

import { asOk } from '../lib/result_type';
import { RuleTaskInstance } from './types';
import { ErrorWithReason, validateRuleTypeParams } from '../lib';
import {
Rule,
RuleExecutionStatusErrorReasons,
RawRule,
SanitizedRule,
RuleTypeParams,
RuleTypeRegistry,
} from '../types';
import {
MONITORING_HISTORY_LIMIT,
RuleTypeState,
AlertInstanceState,
AlertInstanceContext,
} from '../../common';
import { NormalizedRuleType } from '../rule_type_registry';
import { TaskRunnerContext } from './task_runner_factory';
import { createExecutionHandler } from './create_execution_handler';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';

type Params = RuleTypeParams;
type ExtractedParams = RuleTypeParams;
type State = RuleTypeState;
type InstanceState = AlertInstanceState;
type InstanceContext = AlertInstanceContext;
type ActionGroupIds = string;
type RecoveryActionGroupId = string;

type RuleType = NormalizedRuleType<
Params,
ExtractedParams,
State,
InstanceState,
InstanceContext,
ActionGroupIds,
RecoveryActionGroupId
>;

interface LoadAndValidateRuleOptions {
context: TaskRunnerContext;
taskInstance: RuleTaskInstance;
alertingEventLogger: AlertingEventLogger;
ruleTypeRegistry: RuleTypeRegistry;
ruleType: RuleType;
}

export async function loadRuleAttributesAndRun(options: LoadAndValidateRuleOptions) {
const { context, taskInstance, alertingEventLogger, ruleTypeRegistry, ruleType } = options;
const {
params: { alertId: ruleId, spaceId },
} = taskInstance;
let enabled: boolean;
let apiKey: string | null;
let consumer: string;
const executionId = uuid.v4();

try {
const decryptedAttributes = await getDecryptedAttributes(context, ruleId, spaceId);
apiKey = decryptedAttributes.apiKey;
enabled = decryptedAttributes.enabled;
consumer = decryptedAttributes.consumer;
} catch (err) {
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, err);
}

// hmmm what is this? used to be this.ruleConsumer = consumer
// options.ruleConsumer = consumer;

if (!enabled) {
throw new ErrorWithReason(
RuleExecutionStatusErrorReasons.Disabled,
new Error(`Rule failed to execute because rule ran after it was disabled.`)
);
}

const fakeRequest = getFakeKibanaRequest(context, spaceId, apiKey);

// Get rules client with space level permissions
const rulesClient = options.context.getRulesClientWithRequest(fakeRequest);

let rule: SanitizedRule<Params>;

// Ensure API key is still valid and user has access
try {
rule = await rulesClient.get({ id: ruleId });

if (apm.currentTransaction) {
apm.currentTransaction.name = `Execute Alerting Rule: "${rule.name}"`;
apm.currentTransaction.addLabels({
alerting_rule_consumer: rule.consumer,
alerting_rule_name: rule.name,
alerting_rule_tags: rule.tags.join(', '),
alerting_rule_type_id: rule.alertTypeId,
alerting_rule_params: JSON.stringify(rule.params),
});
}
} catch (err) {
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.Read, err);
}

alertingEventLogger.setRuleName(rule.name);

try {
ruleTypeRegistry.ensureRuleTypeEnabled(rule.alertTypeId);
} catch (err) {
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.License, err);
}

if (rule.monitoring) {
if (rule.monitoring.execution.history.length >= MONITORING_HISTORY_LIMIT) {
// Remove the first (oldest) record
rule.monitoring.execution.history.shift();
}
}

const validatedParams = validateRuleTypeParams(rule.params, ruleType.validate?.params);

const executionHandler = getExecutionHandler(
context,
ruleId,
rule.name,
rule.tags,
spaceId,
apiKey,
context.kibanaBaseUrl,
rule.actions,
rule.params,
fakeRequest,
consumer,
context.logger,
executionId,
ruleType,
alertingEventLogger
);

// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
const schedule = (await rulesClient.get({ id: ruleId })).schedule;

return {
validatedParams,
executionHandler,
monitoring: asOk(rule.monitoring),
schedule: asOk(schedule),
};
}

async function getDecryptedAttributes(
context: TaskRunnerContext,
ruleId: string,
spaceId: string
): Promise<{ apiKey: string | null; enabled: boolean; consumer: string }> {
const namespace = context.spaceIdToNamespace(spaceId);
// Only fetch encrypted attributes here, we'll create a saved objects client
// scoped with the API key to fetch the remaining data.
const {
attributes: { apiKey, enabled, consumer },
} = await context.encryptedSavedObjectsClient.getDecryptedAsInternalUser<RawRule>(
'alert',
ruleId,
{ namespace }
);

return { apiKey, enabled, consumer };
}

function getFakeKibanaRequest(
context: TaskRunnerContext,
spaceId: string,
apiKey: RawRule['apiKey']
) {
const requestHeaders: Record<string, string> = {};

if (apiKey) {
requestHeaders.authorization = `ApiKey ${apiKey}`;
}

const path = addSpaceIdToPath('/', spaceId);

const fakeRequest = KibanaRequest.from({
headers: requestHeaders,
path: '/',
route: { settings: {} },
url: {
href: '/',
},
raw: {
req: {
url: '/',
},
},
} as unknown as Request);

context.basePathService.set(fakeRequest, path);

return fakeRequest;
}

function getExecutionHandler(
context: TaskRunnerContext,
ruleId: string,
ruleName: string,
tags: string[] | undefined,
spaceId: string,
apiKey: RawRule['apiKey'],
kibanaBaseUrl: string | undefined,
actions: Rule<Params>['actions'],
ruleParams: Params,
request: KibanaRequest,
ruleConsumer: string,
logger: Logger,
executionId: string,
ruleType: RuleType,
alertingEventLogger: AlertingEventLogger
) {
return createExecutionHandler<
Params,
ExtractedParams,
State,
InstanceState,
InstanceContext,
ActionGroupIds,
RecoveryActionGroupId
>({
ruleId,
ruleName,
ruleConsumer,
tags,
executionId,
logger,
actionsPlugin: context.actionsPlugin,
apiKey,
actions,
spaceId,
ruleType,
kibanaBaseUrl,
alertingEventLogger,
request,
ruleParams,
supportsEphemeralTasks: context.supportsEphemeralTasks,
maxEphemeralActionsPerRule: context.maxEphemeralActionsPerRule,
actionsConfigMap: context.actionsConfigMap,
});
}

0 comments on commit 06b2769

Please sign in to comment.