From 6ee4fd8da8abeb493b475a843bf35c5307063428 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Thu, 16 Nov 2023 18:17:44 +0100 Subject: [PATCH] Issue #12: implement DataSourceWithLogsContextSupport interface --- src/IntervalMap.ts | 18 +++++++ src/datasource.ts | 119 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 3 deletions(-) create mode 100644 src/IntervalMap.ts diff --git a/src/IntervalMap.ts b/src/IntervalMap.ts new file mode 100644 index 0000000..08e1164 --- /dev/null +++ b/src/IntervalMap.ts @@ -0,0 +1,18 @@ +import { DurationUnit } from '@grafana/data'; +import { Interval } from './types'; + +type IntervalMap = Record< + Interval, + { + startOf: DurationUnit; + amount: DurationUnit; + } +>; + +export const intervalMap: IntervalMap = { + Hourly: { startOf: 'hour', amount: 'hours' }, + Daily: { startOf: 'day', amount: 'days' }, + Weekly: { startOf: 'isoWeek', amount: 'weeks' }, + Monthly: { startOf: 'month', amount: 'months' }, + Yearly: { startOf: 'year', amount: 'years' }, +}; diff --git a/src/datasource.ts b/src/datasource.ts index 59bf6fd..c33eb3c 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,35 +1,44 @@ import { cloneDeep, first as _first, map as _map, groupBy } from 'lodash'; import { Observable, lastValueFrom, from, isObservable, of } from 'rxjs'; import { catchError, mergeMap, map } from 'rxjs/operators'; +import { intervalMap } from './IntervalMap'; +import { Interval } from './types'; import { AbstractQuery, + CoreApp, DataFrame, + DataQueryError, DataQueryRequest, DataQueryResponse, DataSourceApi, DataSourceInstanceSettings, DataSourceJsonData, + DataSourceWithLogsContextSupport, DataSourceWithQueryImportSupport, DataSourceWithSupplementaryQueriesSupport, + dateTime, FieldColorModeId, FieldType, getDefaultTimeRange, LoadingState, LogLevel, + LogRowModel, LogsVolumeCustomMetaData, LogsVolumeType, MetricFindValue, QueryFixAction, + rangeUtil, ScopedVars, SupplementaryQueryType, TimeRange, } from '@grafana/data'; -import { BucketAggregation, DataLinkConfig, ElasticsearchQuery, Field, FieldMapping, IndexMetadata, TermsQuery } from './types'; +import { BucketAggregation, DataLinkConfig, ElasticsearchQuery, Field, FieldMapping, IndexMetadata, Logs, TermsQuery } from './types'; import { + config, DataSourceWithBackend, getTemplateSrv, TemplateSrv, } from '@grafana/runtime'; -import { QuickwitOptions } from 'quickwit'; +import { LogRowContextOptions, LogRowContextQueryDirection, QuickwitOptions } from 'quickwit'; import { ElasticQueryBuilder } from 'QueryBuilder'; import { colors } from '@grafana/ui'; @@ -39,7 +48,7 @@ import { isMetricAggregationWithField } from 'components/QueryEditor/MetricAggre import { bucketAggregationConfig } from 'components/QueryEditor/BucketAggregationsEditor/utils'; import { isBucketAggregationWithField } from 'components/QueryEditor/BucketAggregationsEditor/aggregations'; import ElasticsearchLanguageProvider from 'LanguageProvider'; - +import { ReactNode } from 'react'; export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-'; @@ -48,6 +57,7 @@ export type ElasticDatasource = QuickwitDataSource; export class QuickwitDataSource extends DataSourceWithBackend implements + DataSourceWithLogsContextSupport, DataSourceWithSupplementaryQueriesSupport, DataSourceWithQueryImportSupport { @@ -59,6 +69,7 @@ export class QuickwitDataSource queryBuilder: ElasticQueryBuilder; dataLinks: DataLinkConfig[]; languageProvider: ElasticsearchLanguageProvider; + intervalPattern?: Interval; constructor( instanceSettings: DataSourceInstanceSettings, @@ -427,6 +438,75 @@ export class QuickwitDataSource return text; } + private makeLogContextDataRequest = (row: LogRowModel, options?: LogRowContextOptions) => { + const direction = options?.direction || LogRowContextQueryDirection.Backward; + const logQuery: Logs = { + type: 'logs', + id: '1', + settings: { + limit: options?.limit ? options?.limit.toString() : '10', + // Sorting of results in the context query + sortDirection: direction === LogRowContextQueryDirection.Backward ? 'desc' : 'asc', + // Used to get the next log lines before/after the current log line using sort field of selected log line + searchAfter: row.dataFrame.fields.find((f) => f.name === 'sort')?.values[row.rowIndex] ?? [row.timeEpochMs], + }, + }; + + const query: ElasticsearchQuery = { + refId: `log-context-${row.dataFrame.refId}-${direction}`, + metrics: [logQuery], + query: '', + }; + + const timeRange = createContextTimeRange(row.timeEpochMs, direction, this.intervalPattern); + const range = { + from: timeRange.from, + to: timeRange.to, + raw: timeRange, + }; + + const interval = rangeUtil.calculateInterval(range, 1); + + const contextRequest: DataQueryRequest = { + requestId: `log-context-request-${row.dataFrame.refId}-${options?.direction}`, + targets: [query], + interval: interval.interval, + intervalMs: interval.intervalMs, + range, + scopedVars: {}, + timezone: 'UTC', + app: CoreApp.Explore, + startTime: Date.now(), + hideFromInspector: true, + }; + return contextRequest; + }; + + getLogRowContext = async (row: LogRowModel, options?: LogRowContextOptions): Promise<{ data: DataFrame[] }> => { + const contextRequest = this.makeLogContextDataRequest(row, options); + + return lastValueFrom( + this.query(contextRequest).pipe( + catchError((err) => { + const error: DataQueryError = { + message: 'Error during context query. Please check JS console logs.', + status: err.status, + statusText: err.statusText, + }; + throw error; + }) + ) + ); + }; + + showContextToggle(row?: LogRowModel | undefined): boolean { + throw new Error('Method not implemented.'); + } + + getLogRowContextUi?(row: LogRowModel, runContextQuery?: (() => void) | undefined): ReactNode { + throw new Error('Method not implemented.'); + } + /** * Returns false if the query should be skipped */ @@ -740,3 +820,36 @@ function luceneEscape(value: string) { return value.replace(/([\!\*\+\-\=<>\s\&\|\(\)\[\]\{\}\^\~\?\:\\/"])/g, '\\$1'); } + +function createContextTimeRange(rowTimeEpochMs: number, direction: string, intervalPattern: Interval | undefined) { + const offset = 7; + // For log context, we want to request data from 7 subsequent/previous indices + if (intervalPattern) { + const intervalInfo = intervalMap[intervalPattern]; + if (direction === LogRowContextQueryDirection.Forward) { + return { + from: dateTime(rowTimeEpochMs).utc(), + to: dateTime(rowTimeEpochMs).add(offset, intervalInfo.amount).utc().startOf(intervalInfo.startOf), + }; + } else { + return { + from: dateTime(rowTimeEpochMs).subtract(offset, intervalInfo.amount).utc().startOf(intervalInfo.startOf), + to: dateTime(rowTimeEpochMs).utc(), + }; + } + // If we don't have an interval pattern, we can't do this, so we just request data from 7h before/after + } else { + if (direction === LogRowContextQueryDirection.Forward) { + return { + from: dateTime(rowTimeEpochMs).utc(), + to: dateTime(rowTimeEpochMs).add(offset, 'hours').utc(), + }; + } else { + return { + from: dateTime(rowTimeEpochMs).subtract(offset, 'hours').utc(), + to: dateTime(rowTimeEpochMs).utc(), + }; + } + } +} +