Skip to content
This repository was archived by the owner on May 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const parseParamsToQueryString = (params: Params) => {
export const LOG_STREAM_LIST_URL = `${API_V1}/logstream`;
export const LOG_STREAMS_SCHEMA_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/schema`;
export const LOG_QUERY_URL = (params?: Params) => `${API_V1}/query` + parseParamsToQueryString(params);
export const LOG_TRINO_QUERY_URL = (params?: Params) => `${API_V1}/trinoquery` + parseParamsToQueryString(params);
export const LOG_STREAMS_ALERTS_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/alert`;
export const LIST_SAVED_FILTERS_URL = (userId: string) => `${API_V1}/filters/${userId}`;
export const LIST_DASHBOARDS = (userId: string) => `${API_V1}/dashboards/${userId}`;
Expand Down
51 changes: 32 additions & 19 deletions src/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import _ from 'lodash';
import { Axios } from './axios';
import { LOG_QUERY_URL } from './constants';
import { LOG_QUERY_URL, LOG_TRINO_QUERY_URL } from './constants';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import timeRangeUtils from '@/utils/timeRangeUtils';

const { formatDateAsCastType } = timeRangeUtils;
type QueryLogs = {
streamName: string;
startTime: Date;
Expand All @@ -20,22 +23,34 @@ const optimizeTime = (date: Date) => {

// ------ Default sql query

const makeDefaultQueryRequestData = (logsQuery: QueryLogs) => {
const { startTime, endTime, streamName, limit, pageOffset } = logsQuery;
const query = `SELECT * FROM ${streamName} LIMIT ${limit} OFFSET ${pageOffset}`;
return { query, startTime: optimizeTime(startTime), endTime: optimizeTime(endTime) };
type FormQueryOptsType = Omit<QueryLogs, 'pageOffset'> & {
pageOffset?: number;
timePartitionColumn?: string;
};

export const timeRangeSQLCondition = (timePartitionColumn: string, startTime: Date, endTime: Date) => {
return `${timePartitionColumn} >= CAST('${formatDateAsCastType(
optimizeTime(startTime),
)}' AS TIMESTAMP) and ${timePartitionColumn} < CAST('${formatDateAsCastType(optimizeTime(endTime))}' AS TIMESTAMP)`;
};

export const formQueryOpts = (logsQuery: FormQueryOptsType) => {
const { startTime, endTime, streamName, limit, pageOffset, timePartitionColumn = 'p_timestamp' } = logsQuery;
const optimizedStartTime = optimizeTime(startTime);
const optimizedEndTime = optimizeTime(endTime);
const orderBy = `ORDER BY ${timePartitionColumn} desc`;
const timestampClause = timeRangeSQLCondition(timePartitionColumn, optimizedStartTime, optimizedEndTime);
const offsetPart = _.isNumber(pageOffset) ? `OFFSET ${pageOffset}` : '';
const query = `SELECT * FROM ${streamName} where ${timestampClause} ${orderBy} ${offsetPart} LIMIT ${limit} `;
return { query, startTime: optimizedStartTime, endTime: optimizedEndTime };
};

export const getQueryLogs = (logsQuery: QueryLogs) => {
return Axios().post<Log[]>(LOG_QUERY_URL(), makeDefaultQueryRequestData(logsQuery), {});
return Axios().post<Log[]>(LOG_TRINO_QUERY_URL(), formQueryOpts(logsQuery), {});
};

export const getQueryLogsWithHeaders = (logsQuery: QueryLogs) => {
return Axios().post<LogsResponseWithHeaders>(
LOG_QUERY_URL({ fields: true }),
makeDefaultQueryRequestData(logsQuery),
{},
);
return Axios().post<LogsResponseWithHeaders>(LOG_TRINO_QUERY_URL({ fields: true }), formQueryOpts(logsQuery), {});
};

// ------ Custom sql query
Expand All @@ -45,14 +60,12 @@ const makeCustomQueryRequestData = (logsQuery: LogsQuery, query: string) => {
return { query, startTime: optimizeTime(startTime), endTime: optimizeTime(endTime) };
};

export const getQueryResult = (logsQuery: LogsQuery, query = '') => {
return Axios().post<Log[]>(LOG_QUERY_URL(), makeCustomQueryRequestData(logsQuery, query), {});
export const getQueryResult = (logsQuery: LogsQuery, query = '', useTrino = true) => {
const endPoint = useTrino ? LOG_TRINO_QUERY_URL() : LOG_QUERY_URL();
return Axios().post<Log[]>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};

export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '') => {
return Axios().post<LogsResponseWithHeaders>(
LOG_QUERY_URL({ fields: true }),
makeCustomQueryRequestData(logsQuery, query),
{},
);
export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '', useTrino = true) => {
const endPoint = useTrino ? LOG_TRINO_QUERY_URL({ fields: true }) : LOG_QUERY_URL({ fields: true });
return Axios().post<LogsResponseWithHeaders>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};
2 changes: 2 additions & 0 deletions src/hooks/useGetStreamInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const useGetStreamInfo = (currentStream: string) => {
isSuccess: getStreamInfoSuccess,
isLoading: getStreamInfoLoading,
refetch: getStreamInfoRefetch,
isRefetching: getStreamInfoRetching
} = useQuery(['stream-info', currentStream], () => getLogStreamInfo(currentStream), {
retry: false,
refetchOnWindowFocus: false,
Expand All @@ -38,5 +39,6 @@ export const useGetStreamInfo = (currentStream: string) => {
getStreamInfoSuccess,
getStreamInfoLoading,
getStreamInfoRefetch,
getStreamInfoRetching
};
};
45 changes: 27 additions & 18 deletions src/hooks/useQueryLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@ import _ from 'lodash';
import { AxiosError } from 'axios';
import jqSearch from '@/utils/jqSearch';
import { useGetStreamSchema } from '@/hooks/useGetLogStreamSchema';
import { useStreamStore } from '@/pages/Stream/providers/StreamProvider';
import { useFilterStore, filterStoreReducers } from '@/pages/Stream/providers/FilterProvider';

const { setLogData } = logsStoreReducers;

type QueryLogs = {
streamName: string;
startTime: Date;
endTime: Date;
limit: number;
pageOffset: number;
};
const { parseQuery } = filterStoreReducers;

const appendOffsetToQuery = (query: string, offset: number) => {
const hasOffset = query.toLowerCase().includes('offset');
Expand All @@ -40,8 +35,9 @@ export const useQueryLogs = () => {
order: SortOrder.DESCENDING,
},
});

const [streamInfo] = useStreamStore((store) => store.info);
const [currentStream] = useAppStore((store) => store.currentStream);
const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp');
const { refetch: refetchSchema } = useGetStreamSchema({ streamName: currentStream || '' });
const [
{
Expand All @@ -51,7 +47,8 @@ export const useQueryLogs = () => {
},
setLogsStore,
] = useLogsStore((store) => store);
const { isQuerySearchActive, custSearchQuery } = custQuerySearchState;
const [appliedQuery] = useFilterStore((store) => store.appliedQuery);
const { isQuerySearchActive, custSearchQuery, activeMode } = custQuerySearchState;

const getColumnFilters = useCallback(
(columnName: string) => {
Expand Down Expand Up @@ -80,19 +77,31 @@ export const useQueryLogs = () => {
endTime: timeRange.endTime,
limit: LOAD_LIMIT,
pageOffset: currentOffset,
timePartitionColumn,
};
const getQueryData = async (logsQuery: QueryLogs = defaultQueryOpts) => {
const getQueryData = async () => {
try {
setLoading(true);
setError(null);
refetchSchema(); // fetch schema parallelly every time we fetch logs
const logsQueryRes = isQuerySearchActive
? await getQueryResultWithHeaders(
{ ...logsQuery, access: [] },
appendOffsetToQuery(custSearchQuery, logsQuery.pageOffset),
)
: await getQueryLogsWithHeaders(logsQuery);

const logsQueryRes = await (async () => {
if (isQuerySearchActive) {
if (activeMode === 'filters') {
const { parsedQuery } = parseQuery(appliedQuery, currentStream || '', {
startTime: timeRange.startTime,
endTime: timeRange.endTime,
timePartitionColumn,
});
const queryStrWithOffset = appendOffsetToQuery(parsedQuery, defaultQueryOpts.pageOffset);
return await getQueryResultWithHeaders({ ...defaultQueryOpts, access: [] }, queryStrWithOffset);
} else {
const queryStrWithOffset = appendOffsetToQuery(custSearchQuery, defaultQueryOpts.pageOffset);
return await getQueryResultWithHeaders({ ...defaultQueryOpts, access: [] }, queryStrWithOffset);
}
} else {
return await getQueryLogsWithHeaders(defaultQueryOpts);
}
})();
const logs = logsQueryRes.data;
const isInvalidResponse = _.isEmpty(logs) || _.isNil(logs) || logsQueryRes.status !== StatusCodes.OK;
if (isInvalidResponse) return setError('Failed to query log');
Expand Down
23 changes: 16 additions & 7 deletions src/hooks/useQueryResult.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ type QueryData = {
logsQuery: LogsQuery;
query: string;
onSuccess?: () => void;
useTrino?: boolean;
};

export const useQueryResult = () => {
const fetchQueryHandler = async (data: QueryData) => {
const response = await getQueryResultWithHeaders(data.logsQuery, data.query);
const response = await getQueryResultWithHeaders(data.logsQuery, data.query, data.useTrino);
if (response.status !== 200) {
throw new Error(response.statusText);
}
Expand Down Expand Up @@ -54,13 +55,21 @@ export const useFetchCount = () => {
const { setTotalCount } = logsStoreReducers;
const [custQuerySearchState] = useLogsStore((store) => store.custQuerySearchState);
const [timeRange, setLogsStore] = useLogsStore((store) => store.timeRange);
const { isQuerySearchActive, custSearchQuery } = custQuerySearchState;
const { isQuerySearchActive, custSearchQuery, activeMode } = custQuerySearchState;

const defaultQuery = `select count(*) as count from ${currentStream}`;
const query = isQuerySearchActive
? custSearchQuery.replace(/SELECT[\s\S]*?FROM/i, 'SELECT COUNT(*) as count FROM')
: defaultQuery;

const query = (() => {
if (isQuerySearchActive) {
const finalQuery = custSearchQuery.replace(/SELECT[\s\S]*?FROM/i, 'SELECT COUNT(*) as count FROM');
if (activeMode === 'filters') {
return finalQuery;
} else {
return finalQuery.replace(/ORDER\s+BY\s+[\w\s,.]+(?:ASC|DESC)?\s*(LIMIT\s+\d+)?\s*;?/i, '');
}
} else {
return defaultQuery;
}
})();
const logsQuery = {
streamName: currentStream || '',
startTime: timeRange.startTime,
Expand All @@ -71,7 +80,7 @@ export const useFetchCount = () => {
isLoading: isCountLoading,
isRefetching: isCountRefetching,
refetch: refetchCount,
} = useQuery(['fetchCount', logsQuery], () => getQueryResult(logsQuery, query), {
} = useQuery(['fetchCount', logsQuery], () => getQueryResult(logsQuery, query, false), {
onSuccess: (resp) => {
const count = _.first(resp.data)?.count;
typeof count === 'number' && setLogsStore((store) => setTotalCount(store, count));
Expand Down
7 changes: 4 additions & 3 deletions src/pages/Stream/Views/Explore/LogsView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import LogTable from './StaticLogTable';
import useLogsFetcher from './useLogsFetcher';
import LogsViewConfig from './LogsViewConfig';

const LogsView = (props: { schemaLoading: boolean }) => {
const { schemaLoading } = props;
const LogsView = (props: { schemaLoading: boolean, infoLoading: boolean }) => {
const { schemaLoading, infoLoading } = props;
const { errorMessage, hasNoData, showTable, isFetchingCount, logsLoading } = useLogsFetcher({
schemaLoading,
infoLoading
});
const [viewMode] = useLogsStore((store) => store.viewMode);
const viewOpts = {
Expand All @@ -21,7 +22,7 @@ const LogsView = (props: { schemaLoading: boolean }) => {

return (
<Box style={{ display: 'flex', flex: 1, overflow: 'hidden' }}>
{viewMode === 'table' && <LogsViewConfig schemaLoading={schemaLoading} logsLoading={logsLoading} />}
{viewMode === 'table' && <LogsViewConfig schemaLoading={schemaLoading} logsLoading={logsLoading} infoLoading={infoLoading}/>}
{viewMode === 'table' ? <LogTable {...viewOpts} /> : <JsonView {...viewOpts} />}
</Box>
);
Expand Down
6 changes: 3 additions & 3 deletions src/pages/Stream/Views/Explore/LogsViewConfig.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,15 @@ const ColumnsList = (props: { isLoading: boolean }) => {
);
};

const LogsViewConfig = (props: { schemaLoading: boolean; logsLoading: boolean }) => {
const LogsViewConfig = (props: { schemaLoading: boolean; logsLoading: boolean, infoLoading: boolean }) => {
const [configViewType] = useLogsStore((store) => store.tableOpts.configViewType);
return (
<Stack style={{ width: LOGS_CONFIG_SIDEBAR_WIDTH }} className={classes.container}>
<Header />
{configViewType === 'schema' ? (
<SchemaList isLoading={props.schemaLoading} />
<SchemaList isLoading={props.schemaLoading || props.infoLoading} />
) : (
<ColumnsList isLoading={props.logsLoading} />
<ColumnsList isLoading={props.logsLoading || props.infoLoading} />
)}
</Stack>
);
Expand Down
12 changes: 8 additions & 4 deletions src/pages/Stream/Views/Explore/useLogsFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { useFetchCount } from '@/hooks/useQueryResult';

const { setCleanStoreForStreamChange } = logsStoreReducers;

const useLogsFetcher = (props: { schemaLoading: boolean }) => {
const { schemaLoading } = props;
const useLogsFetcher = (props: { schemaLoading: boolean, infoLoading: boolean }) => {
const { schemaLoading, infoLoading } = props;
const [currentStream] = useAppStore((store) => store.currentStream);
const [{ tableOpts, timeRange }, setLogsStore] = useLogsStore((store) => store);
const { currentOffset, currentPage, pageData } = tableOpts;
Expand All @@ -22,17 +22,21 @@ const useLogsFetcher = (props: { schemaLoading: boolean }) => {
}, [currentStream]);

useEffect(() => {
if (infoLoading) return;

if (currentPage === 0 && currentOffset === 0) {
getQueryData();
refetchCount();
}
}, [currentPage, currentStream, timeRange]);
}, [currentPage, currentStream, timeRange, infoLoading]);

useEffect(() => {
if (infoLoading) return;

if (currentOffset !== 0 && currentPage !== 0) {
getQueryData();
}
}, [currentOffset]);
}, [currentOffset, infoLoading]);

return {
logsLoading,
Expand Down
19 changes: 12 additions & 7 deletions src/pages/Stream/components/EventTimeLineGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Paper, Skeleton, Stack, Text } from '@mantine/core';
import classes from '../styles/EventTimeLineGraph.module.css';
import { useQueryResult } from '@/hooks/useQueryResult';
import { useCallback, useEffect, useMemo } from 'react';
import { useCallback, useEffect, useMemo, useState } from 'react';
import dayjs from 'dayjs';
import { ChartTooltipProps, AreaChart } from '@mantine/charts';
import { HumanizeNumber } from '@/utils/formatBytes';
Expand Down Expand Up @@ -257,26 +257,31 @@ const EventTimeLineGraph = () => {
const [appliedQuery] = useFilterStore((store) => store.appliedQuery);
const [{ activeMode, custSearchQuery }] = useLogsStore((store) => store.custQuerySearchState);
const [{ interval, startTime, endTime }] = useLogsStore((store) => store.timeRange);
const [localStream, setLocalStream] = useState<string | null>('');

useEffect(() => {
if (!currentStream || currentStream.length === 0) return;
setLocalStream(currentStream);
}, [currentStream]);

useEffect(() => {
if (!localStream || localStream.length === 0) return;
const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);

const logsQuery = {
streamName: currentStream,
streamName: localStream,
startTime: modifiedStartTime,
endTime: modifiedEndTime,
access: [],
};

const whereClause =
activeMode === 'sql' ? extractWhereClause(custSearchQuery) : parseQuery(appliedQuery, currentStream).where;
const query = generateCountQuery(currentStream, modifiedStartTime, modifiedEndTime, compactType, whereClause);
activeMode === 'sql' ? extractWhereClause(custSearchQuery) : parseQuery(appliedQuery, localStream).where;
const query = generateCountQuery(localStream, modifiedStartTime, modifiedEndTime, compactType, whereClause);
fetchQueryMutation.mutate({
logsQuery,
query,
useTrino: false,
});
}, [currentStream, startTime.toISOString(), endTime.toISOString(), custSearchQuery]);
}, [localStream, startTime.toISOString(), endTime.toISOString(), custSearchQuery]);

const isLoading = fetchQueryMutation.isLoading;
const avgEventCount = useMemo(() => calcAverage(fetchQueryMutation?.data), [fetchQueryMutation?.data]);
Expand Down
Loading