Skip to content
This repository was archived by the owner on May 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/@types/parseable/api/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export type Log = {
[key: string]: string | number | null | Date;
};

export type LogsResponseWithHeaders = {
fields: string[];
records: Log[];
} | null;

export type LogSelectedTimeRange = {
state: 'fixed' | 'custom';
value: string;
Expand Down
2 changes: 1 addition & 1 deletion src/api/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export const getIngestorInfo = (domain_name: string | null, startTime: Date, end
const query = `SELECT * FROM pmeta where address = '${domain_name}' ORDER BY event_time DESC LIMIT 10 OFFSET 0`;

return Axios().post<IngestorQueryRecord[]>(
LOG_QUERY_URL,
LOG_QUERY_URL(),
{
query,
startTime,
Expand Down
19 changes: 18 additions & 1 deletion src/api/constants.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import _ from 'lodash';

const API_V1 = 'api/v1';

export type Params = Record<string, string> | null | {} | undefined;

const parseParamsToQueryString = (params: Params) => {
if (_.isEmpty(params) || _.isNil(params) || !params) return '';

return _.reduce(
params,
(acc, value, key) => {
const slugPartPrefix = acc === '?' ? '' : '&';
return acc + slugPartPrefix + key + '=' + value;
},
'?',
);
};

// Streams Management
export const LOG_STREAM_LIST_URL = `${API_V1}/logstream`;
export const LOG_STREAMS_SCHEMA_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/schema`;
export const LOG_QUERY_URL = `${API_V1}/query`;
export const LOG_QUERY_URL = (params?: Params) => `${API_V1}/query` + parseParamsToQueryString(params);
export const LOG_STREAMS_ALERTS_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/alert`;
export const LIST_SAVED_FILTERS_URL = (userId: string) => `${API_V1}/filters/${userId}`;
export const UPDATE_SAVED_FILTERS_URL = (filterId: string) => `${API_V1}/filters/filter/${filterId}`;
Expand Down
47 changes: 28 additions & 19 deletions src/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dayjs from 'dayjs';
import { Axios } from './axios';
import { LOG_QUERY_URL } from './constants';
import { LogsQuery } from '@/@types/parseable/api/query';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';

type QueryLogs = {
streamName: string;
Expand All @@ -11,38 +11,47 @@ type QueryLogs = {
pageOffset: number;
};

// to optimize performace, it has been decided to round off the time at the given level
// to optimize query performace, it has been decided to round off the time at the given level
// so making the end-time inclusive
const optimizeEndTime = (endTime: Date) => {
return dayjs(endTime).add(1, 'minute').toDate();
};

export const getQueryLogs = (logsQuery: QueryLogs) => {
const { startTime, endTime, streamName, limit, pageOffset } = logsQuery;
// ------ Default sql query

const makeDefaultQueryRequestData = (logsQuery: QueryLogs) => {
const { startTime, endTime, streamName, limit, pageOffset } = logsQuery;
const query = `SELECT * FROM ${streamName} LIMIT ${limit} OFFSET ${pageOffset}`;
return { query, startTime, endTime: optimizeEndTime(endTime) };
};

export const getQueryLogs = (logsQuery: QueryLogs) => {
return Axios().post<Log[]>(LOG_QUERY_URL(), makeDefaultQueryRequestData(logsQuery), {});
};

return Axios().post(
LOG_QUERY_URL,
{
query,
startTime,
endTime: optimizeEndTime(endTime),
},
export const getQueryLogsWithHeaders = (logsQuery: QueryLogs) => {
return Axios().post<LogsResponseWithHeaders>(
LOG_QUERY_URL({ fields: true }),
makeDefaultQueryRequestData(logsQuery),
{},
);
};

export const getQueryResult = (logsQuery: LogsQuery, query = '') => {
// ------ Custom sql query

const makeCustomQueryRequestData = (logsQuery: LogsQuery, query: string) => {
const { startTime, endTime } = logsQuery;
return { query, startTime, endTime: optimizeEndTime(endTime) };
};

export const getQueryResult = (logsQuery: LogsQuery, query = '') => {
return Axios().post<Log[]>(LOG_QUERY_URL(), makeCustomQueryRequestData(logsQuery, query), {});
};

return Axios().post(
LOG_QUERY_URL,
{
query,
startTime,
endTime: optimizeEndTime(endTime),
},
export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '') => {
return Axios().post<LogsResponseWithHeaders>(
LOG_QUERY_URL({ fields: true }),
makeCustomQueryRequestData(logsQuery, query),
{},
);
};
4 changes: 0 additions & 4 deletions src/hooks/useGetLogStreamSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ import { StatusCodes } from 'http-status-codes';
import useMountedState from './useMountedState';
import { Field } from '@/@types/parseable/dataType';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import { useLogsStore, logsStoreReducers } from '@/pages/Stream/providers/LogsProvider';
import { useStreamStore, streamStoreReducers } from '@/pages/Stream/providers/StreamProvider';
import { AxiosError } from 'axios';
import _ from 'lodash';

const { setStreamSchema } = streamStoreReducers;
const { setTableHeaders } = logsStoreReducers;

export const useGetLogStreamSchema = () => {
const [data, setData] = useMountedState<LogStreamSchemaData | null>(null);
const [error, setError] = useMountedState<string | null>(null);
const [loading, setLoading] = useMountedState<boolean>(false);
const [currentStream] = useAppStore((store) => store.currentStream);
const [, setLogsStore] = useLogsStore((_store) => null);
const [, setStreamStore] = useStreamStore((_store) => null);

const getDataSchema = async (stream: string | null = currentStream) => {
Expand All @@ -34,7 +31,6 @@ export const useGetLogStreamSchema = () => {

setData(schema);
setStreamStore((store) => setStreamSchema(store, schema));
setLogsStore((store) => setTableHeaders(store, schema));
break;
}
default: {
Expand Down
45 changes: 25 additions & 20 deletions src/hooks/useQueryLogs.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { SortOrder, type Log, type LogsData, type LogsSearch } from '@/@types/parseable/api/query';
import { getQueryLogs, getQueryResult } from '@/api/query';
import { getQueryLogsWithHeaders, getQueryResultWithHeaders } from '@/api/query';
import { StatusCodes } from 'http-status-codes';
import useMountedState from './useMountedState';
import { useCallback, useEffect, useRef } from 'react';
import { useLogsStore, logsStoreReducers, LOAD_LIMIT, isJqSearch } from '@/pages/Stream/providers/LogsProvider';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import { useQueryResult } from './useQueryResult';
import _ from 'lodash';
import { useStreamStore } from '@/pages/Stream/providers/StreamProvider';
import { AxiosError } from 'axios';
import jqSearch from '@/utils/jqSearch';
import { useGetLogStreamSchema } from './useGetLogStreamSchema';

const { setData, setTotalCount } = logsStoreReducers;
const { setLogData, setTotalCount } = logsStoreReducers;

type QueryLogs = {
streamName: string;
Expand All @@ -34,6 +34,7 @@ export const useQueryLogs = () => {
const [loading, setLoading] = useMountedState<boolean>(false);
const [isFetchingCount, setIsFetchingCount] = useMountedState<boolean>(false);
const [pageLogData, setPageLogData] = useMountedState<LogsData | null>(null);
const { getDataSchema } = useGetLogStreamSchema();
const [querySearch, setQuerySearch] = useMountedState<LogsSearch>({
search: '',
filters: {},
Expand All @@ -43,7 +44,6 @@ export const useQueryLogs = () => {
},
});
const [currentStream] = useAppStore((store) => store.currentStream);
const [schema] = useStreamStore((store) => store.schema);
const [
{
timeRange,
Expand Down Expand Up @@ -86,36 +86,41 @@ export const useQueryLogs = () => {
try {
setLoading(true);
setError(null);

getDataSchema(); // fetch schema parallelly every time we fetch logs
const logsQueryRes = isQuerySearchActive
? await getQueryResult({ ...logsQuery, access: [] }, appendOffsetToQuery(custSearchQuery, logsQuery.pageOffset))
: await getQueryLogs(logsQuery);
? await getQueryResultWithHeaders(
{ ...logsQuery, access: [] },
appendOffsetToQuery(custSearchQuery, logsQuery.pageOffset),
)
: await getQueryLogsWithHeaders(logsQuery);

const data = logsQueryRes.data;
const logs = logsQueryRes.data;
const isInvalidResponse = _.isEmpty(logs) || _.isNil(logs) || logsQueryRes.status !== StatusCodes.OK;
if (isInvalidResponse) return setError('Failed to query log');

if (logsQueryRes.status === StatusCodes.OK) {
const jqFilteredData = isJqSearch(instantSearchValue) ? await jqSearch(data, instantSearchValue) : [];
return setLogsStore((store) => setData(store, data, schema, jqFilteredData));
}
if (typeof data === 'string' && data.includes('Stream is not initialized yet')) {
return setLogsStore((store) => setData(store, [], schema));
}
setError('Failed to query log');
const { records, fields } = logs;
const jqFilteredData = isJqSearch(instantSearchValue) ? await jqSearch(records, instantSearchValue) : [];
return setLogsStore((store) => setLogData(store, records, fields, jqFilteredData));
} catch (e) {
const axiosError = e as AxiosError;
const errorMessage = axiosError?.response?.data;
setError(_.isString(errorMessage) && !_.isEmpty(errorMessage) ? errorMessage : 'Failed to query log');
return setLogsStore((store) => setData(store, [], schema));
return setLogsStore((store) => setLogData(store, [], []));
} finally {
setLoading(false);
}
};

// fetchQueryMutation is used only on fetching count
// refactor this hook if you want to use mutation anywhere else
const { fetchQueryMutation } = useQueryResult();

useEffect(() => {
const response = _.first(fetchQueryMutation?.data) as { count: number };
if (response) {
setLogsStore((store) => setTotalCount(store, response?.count));
const { fields = [], records = [] } = fetchQueryMutation.data || {};
const firstRecord = _.first(records);
if (_.includes(fields, 'count') && _.includes(_.keys(firstRecord), 'count')) {
const count = _.get(firstRecord, 'count', 0);
setLogsStore((store) => setTotalCount(store, _.toInteger(count)));
}
}, [fetchQueryMutation.data]);

Expand Down
4 changes: 2 additions & 2 deletions src/hooks/useQueryResult.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getQueryResult } from '@/api/query';
import { getQueryResultWithHeaders } from '@/api/query';
import { LogsQuery } from '@/@types/parseable/api/query';
import { notifications } from '@mantine/notifications';
import { isAxiosError, AxiosError } from 'axios';
Expand All @@ -13,7 +13,7 @@ type QueryData = {

export const useQueryResult = () => {
const fetchQueryHandler = async (data: QueryData) => {
const response = await getQueryResult(data.logsQuery, data.query);
const response = await getQueryResultWithHeaders(data.logsQuery, data.query);
if (response.status !== 200) {
throw new Error(response.statusText);
}
Expand Down
37 changes: 23 additions & 14 deletions src/pages/Stream/components/EventTimeLineGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { HumanizeNumber } from '@/utils/formatBytes';
import { logsStoreReducers, useLogsStore } from '../providers/LogsProvider';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import { useFilterStore, filterStoreReducers } from '../providers/FilterProvider';
import { LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import _ from 'lodash';
const { setTimeRange } = logsStoreReducers;

const { parseQuery } = filterStoreReducers;
Expand Down Expand Up @@ -156,29 +158,36 @@ const NoDataView = () => {
);
};

type GraphRecord = {
date_bin_timestamp: string;
log_count: number;
};
const calcAverage = (data: LogsResponseWithHeaders | undefined) => {
if (!data || !Array.isArray(data?.records)) return 0;

const calcAverage = (data: GraphRecord[]) => {
if (!Array.isArray(data) || data.length === 0) return 0;
const { fields, records } = data;
if (_.isEmpty(records) || !_.includes(fields, 'log_count')) return 0;

const total = data.reduce((acc, d) => {
return acc + d.log_count;
const total = records.reduce((acc, d) => {
return acc + _.toNumber(d.log_count) || 0;
}, 0);
return parseInt(Math.abs(total / data.length).toFixed(0));
return parseInt(Math.abs(total / records.length).toFixed(0));
};

// date_bin removes tz info
// filling data with empty values where there is no rec
const parseGraphData = (data: GraphRecord[] = [], avg: number, startTime: Date, endTime: Date, interval: number) => {
if (!Array.isArray(data)) return [];
const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const parseGraphData = (
data: LogsResponseWithHeaders | undefined,
avg: number,
startTime: Date,
endTime: Date,
interval: number,
) => {
if (!data || !Array.isArray(data?.records)) return [];

const { fields, records } = data;
if (_.isEmpty(records) || !_.includes(fields, 'log_count') || !_.includes(fields, 'date_bin_timestamp')) return [];

const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const allTimestamps = getAllIntervals(modifiedStartTime, modifiedEndTime, compactType);
const parsedData = allTimestamps.map((ts) => {
const countData = data.find((d) => {
const countData = records.find((d) => {
return new Date(`${d.date_bin_timestamp}Z`).toISOString() === ts.toISOString();
});

Expand All @@ -190,7 +199,7 @@ const parseGraphData = (data: GraphRecord[] = [], avg: number, startTime: Date,
compactType,
};
} else {
const aboveAvgCount = countData.log_count - avg;
const aboveAvgCount = _.toNumber(countData.log_count) - avg;
const aboveAvgPercent = parseInt(((aboveAvgCount / avg) * 100).toFixed(2));
return {
events: countData.log_count,
Expand Down
2 changes: 1 addition & 1 deletion src/pages/Stream/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const Logs: FC = () => {
}, [currentStream]);

useEffect(() => {
if (!_.isEmpty(currentStream)) {
if (!_.isEmpty(currentStream) && view !== 'explore') {
fetchSchema();
}
}, [currentStream]);
Expand Down
Loading