Skip to content

Commit

Permalink
Add Ability to filter datafeed for ml jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipb committed Jan 29, 2021
1 parent 6b74bd5 commit 2799b5b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 15 deletions.
9 changes: 4 additions & 5 deletions x-pack/plugins/infra/public/containers/ml/infra_ml_module.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import { useCallback, useMemo } from 'react';
import { DatasetFilter } from '../../../common/infra_ml';
import { useKibanaContextForPlugin } from '../../hooks/use_kibana';
import { useTrackedPromise } from '../../utils/use_tracked_promise';
import { useModuleStatus } from './infra_ml_module_status';
Expand Down Expand Up @@ -51,15 +50,15 @@ export const useInfraMLModule = <JobType extends string>({
selectedIndices: string[],
start: number | undefined,
end: number | undefined,
datasetFilter: DatasetFilter,
filter: string,
partitionField?: string
) => {
dispatchModuleStatus({ type: 'startedSetup' });
const setupResult = await moduleDescriptor.setUpModule(
{
start,
end,
datasetFilter,
filter,
moduleSourceConfiguration: {
indices: selectedIndices,
sourceId,
Expand Down Expand Up @@ -113,13 +112,13 @@ export const useInfraMLModule = <JobType extends string>({
selectedIndices: string[],
start: number | undefined,
end: number | undefined,
datasetFilter: DatasetFilter,
filter: string,
partitionField?: string
) => {
dispatchModuleStatus({ type: 'startedSetup' });
cleanUpModule()
.then(() => {
setUpModule(selectedIndices, start, end, datasetFilter, partitionField);
setUpModule(selectedIndices, start, end, filter, partitionField);
})
.catch(() => {
dispatchModuleStatus({ type: 'failedSetup' });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
ValidateLogEntryDatasetsResponsePayload,
ValidationIndicesResponsePayload,
} from '../../../common/http_api/log_analysis';
import { DatasetFilter } from '../../../common/infra_ml';
import { DeleteJobsResponsePayload } from './api/ml_cleanup';
import { FetchJobStatusResponsePayload } from './api/ml_get_jobs_summary_api';
import { GetMlModuleResponsePayload } from './api/ml_get_module';
Expand All @@ -19,7 +18,7 @@ export { JobModelSizeStats, JobSummary } from './api/ml_get_jobs_summary_api';
export interface SetUpModuleArgs {
start?: number | undefined;
end?: number | undefined;
datasetFilter?: DatasetFilter;
filter?: any;
moduleSourceConfiguration: ModuleSourceConfiguration;
partitionField?: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)
const {
start,
end,
filter,
moduleSourceConfiguration: { spaceId, sourceId, indices, timestampField },
partitionField,
} = setUpModuleArgs;
Expand Down Expand Up @@ -106,10 +107,18 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)

const datafeedOverrides = jobIds.map((id) => {
const { datafeed: defaultDatafeedConfig } = getDefaultJobConfigs(id);
const config = { ...defaultDatafeedConfig };

if (filter) {
config.query = JSON.parse(filter);
}

if (!partitionField || id === 'hosts_memory_usage') {
// Since the host memory usage doesn't have custom aggs, we don't need to do anything to add a partition field
return defaultDatafeedConfig;
return {
...config,
job_id: id,
};
}

// If we have a partition field, we need to change the aggregation to do a terms agg at the top level
Expand All @@ -125,7 +134,7 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)
};

return {
...defaultDatafeedConfig,
...config,
job_id: id,
aggregations,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)
const {
start,
end,
filter,
moduleSourceConfiguration: { spaceId, sourceId, indices, timestampField },
partitionField,
} = setUpModuleArgs;
Expand Down Expand Up @@ -106,10 +107,18 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)

const datafeedOverrides = jobIds.map((id) => {
const { datafeed: defaultDatafeedConfig } = getDefaultJobConfigs(id);
const config = { ...defaultDatafeedConfig };

if (filter) {
config.query = JSON.parse(filter);
}

if (!partitionField || id === 'k8s_memory_usage') {
// Since the host memory usage doesn't have custom aggs, we don't need to do anything to add a partition field
return defaultDatafeedConfig;
return {
...config,
job_id: id,
};
}

// Because the ML K8s jobs ship with a default partition field of {kubernetes.namespace}, ignore that agg and wrap it in our own agg.
Expand All @@ -130,7 +139,7 @@ const setUpModule = async (setUpModuleArgs: SetUpModuleArgs, fetch: HttpHandler)
};

return {
...defaultDatafeedConfig,
...config,
job_id: id,
aggregations,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { debounce } from 'lodash';
import React, { useState, useCallback, useMemo, useEffect } from 'react';
import { EuiForm, EuiDescribedFormGroup, EuiFormRow } from '@elastic/eui';
import { EuiText, EuiSpacer } from '@elastic/eui';
Expand All @@ -21,6 +21,8 @@ import { useMetricK8sModuleContext } from '../../../../../../containers/ml/modul
import { useMetricHostsModuleContext } from '../../../../../../containers/ml/modules/metrics_hosts/module';
import { FixedDatePicker } from '../../../../../../components/fixed_datepicker';
import { DEFAULT_K8S_PARTITION_FIELD } from '../../../../../../containers/ml/modules/metrics_k8s/module_descriptor';
import { MetricsExplorerKueryBar } from '../../../../metrics_explorer/components/kuery_bar';
import { convertKueryToElasticSearchQuery } from '../../../../../../utils/kuery';

interface Props {
jobType: 'hosts' | 'kubernetes';
Expand All @@ -35,6 +37,8 @@ export const JobSetupScreen = (props: Props) => {
const [partitionField, setPartitionField] = useState<string[] | null>(null);
const h = useMetricHostsModuleContext();
const k = useMetricK8sModuleContext();
const [filter, setFilter] = useState<string>('');
const [filterQuery, setFilterQuery] = useState<string>('');
const { createDerivedIndexPattern } = useSourceViaHttp({
sourceId: 'default',
type: 'metrics',
Expand Down Expand Up @@ -88,19 +92,38 @@ export const JobSetupScreen = (props: Props) => {
indicies,
moment(startDate).toDate().getTime(),
undefined,
{ type: 'includeAll' },
filterQuery,
partitionField ? partitionField[0] : undefined
);
} else {
setUpModule(
indicies,
moment(startDate).toDate().getTime(),
undefined,
{ type: 'includeAll' },
filterQuery,
partitionField ? partitionField[0] : undefined
);
}
}, [cleanUpAndSetUpModule, setUpModule, hasSummaries, indicies, partitionField, startDate]);
}, [
cleanUpAndSetUpModule,
filterQuery,
setUpModule,
hasSummaries,
indicies,
partitionField,
startDate,
]);

const onFilterChange = useCallback(
(f: string) => {
setFilter(f || '');
setFilterQuery(convertKueryToElasticSearchQuery(f, derivedIndexPattern) || '');
},
[derivedIndexPattern]
);

/* eslint-disable-next-line react-hooks/exhaustive-deps */
const debouncedOnFilterChange = useCallback(debounce(onFilterChange, 500), [onFilterChange]);

const onPartitionFieldChange = useCallback((value: Array<{ label: string }>) => {
setPartitionField(value.map((v) => v.label));
Expand Down Expand Up @@ -249,6 +272,40 @@ export const JobSetupScreen = (props: Props) => {
/>
</EuiFormRow>
</EuiDescribedFormGroup>

<EuiDescribedFormGroup
title={
<h3>
<FormattedMessage
id="xpack.infra.ml.steps.setupProcess.filter.title"
defaultMessage="Filter data"
/>
</h3>
}
description={
<FormattedMessage
id="xpack.infra.ml.steps.setupProcess.filter.description"
defaultMessage="Filter description"
/>
}
>
<EuiFormRow
display="rowCompressed"
label={
<FormattedMessage
id="xpack.infra.ml.steps.setupProcess.filter.label"
defaultMessage="Filter (optional)"
/>
}
>
<MetricsExplorerKueryBar
derivedIndexPattern={derivedIndexPattern}
onSubmit={onFilterChange}
onChange={debouncedOnFilterChange}
value={filter}
/>
</EuiFormRow>
</EuiDescribedFormGroup>
</EuiForm>
</>
)}
Expand Down

0 comments on commit 2799b5b

Please sign in to comment.