Skip to content
This repository was archived by the owner on May 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/@types/parseable/api/about.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export type QueryEngineType = 'Trino' | 'Parseable' | undefined;

export type AboutData = {
commit: string;
deploymentId: string;
Expand All @@ -17,4 +19,5 @@ export type AboutData = {
analytics: {
clarityTag: string;
};
queryEngine: QueryEngineType;
};
2 changes: 2 additions & 0 deletions src/@types/parseable/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { QueryEngineType } from '@/@types/parseable/api/about';
export type LogsQuery = {
queryEngine?: QueryEngineType;
streamName: string;
startTime: Date;
endTime: Date;
Expand Down
6 changes: 3 additions & 3 deletions src/api/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ const parseParamsToQueryString = (params: Params) => {
// Streams Management
export const LOG_STREAM_LIST_URL = `${API_V1}/logstream`;
export const LOG_STREAMS_SCHEMA_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/schema`;
export const LOG_QUERY_URL = (params?: Params) => `${API_V1}/query` + parseParamsToQueryString(params);
export const LOG_TRINO_QUERY_URL = (params?: Params) => `${API_V1}/trinoquery` + parseParamsToQueryString(params);
export const LOG_QUERY_URL = (params?: Params, resourcePath: string = 'query') =>
`${API_V1}/${resourcePath}` + parseParamsToQueryString(params);
export const LOG_STREAMS_ALERTS_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/alert`;
export const LIST_SAVED_FILTERS_URL = `${API_V1}/filters`;
export const LIST_DASHBOARDS = `${API_V1}/dashboards`;
export const LIST_DASHBOARDS = `${API_V1}/dashboards`;
export const UPDATE_SAVED_FILTERS_URL = (filterId: string) => `${API_V1}/filters/${filterId}`;
export const UPDATE_DASHBOARDS_URL = (dashboardId: string) => `${API_V1}/dashboards/${dashboardId}`;
export const DELETE_DASHBOARDS_URL = (dashboardId: string) => `${API_V1}/dashboards/${dashboardId}`;
Expand Down
35 changes: 20 additions & 15 deletions src/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import _ from 'lodash';
import { Axios } from './axios';
import { LOG_QUERY_URL, LOG_TRINO_QUERY_URL } from './constants';
import { LOG_QUERY_URL } from './constants';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import { QueryEngineType } from '@/@types/parseable/api/about';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { QueryBuilder } from '@/utils/queryBuilder';

const { formatDateAsCastType } = timeRangeUtils;
type QueryEngine = QueryEngineType;
type QueryLogs = {
queryEngine: QueryEngine;
streamName: string;
startTime: Date;
endTime: Date;
Expand Down Expand Up @@ -35,22 +39,22 @@ export const timeRangeSQLCondition = (timePartitionColumn: string, startTime: Da
};

export const formQueryOpts = (logsQuery: FormQueryOptsType) => {
const { startTime, endTime, streamName, limit, pageOffset, timePartitionColumn = 'p_timestamp' } = logsQuery;
const optimizedStartTime = optimizeTime(startTime);
const optimizedEndTime = optimizeTime(endTime);
const orderBy = `ORDER BY ${timePartitionColumn} desc`;
const timestampClause = timeRangeSQLCondition(timePartitionColumn, optimizedStartTime, optimizedEndTime);
const offsetPart = _.isNumber(pageOffset) ? `OFFSET ${pageOffset}` : '';
const query = `SELECT * FROM \"${streamName}\" where ${timestampClause} ${orderBy} ${offsetPart} LIMIT ${limit} `;
return { query, startTime: optimizedStartTime, endTime: optimizedEndTime };
const queryBuilder = new QueryBuilder(logsQuery);
Copy link
Contributor

@nativ-labs nativ-labs Sep 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd we did a class implementation ? The created instance was never passed outside this function. So there is no need for a class here.

Give a read on Separation of concern.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem we want to solve is that earlier there was this useTrino flag passed around everywhere which is error prone. How do you propose to fix that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more than happy if you can propose a better / idiomatic approach on how to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Even now, we are sending the 'queryEngine' in place of 'useTrino'. So no difference.
  2. App initializes only one time. We should assume the query engine the first time itself and save it as a boolean (useTrino: boolean, since we are not going to include any other connector anytime soon) in the AppProvider. So we dont have to compare the string everytime, everywhere.
  3. trinoquery & query endpoint definition should remain separate.

Stating all the above, we dont really need a class. Instead of passing the string everywhere, pass just the boolean since thats what we need at the end. Revert back all the changes and just set and set the useTrino (boolean) in the app provider.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing the string everywhere, pass just the boolean since thats what we need at the end.

We have to avoid this. It has to dynamically decide which engine to use based on API response.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a very valid point if we are going to add more connectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now there is one, but we have to make it modular so we can extend as needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about the endpoint def. 1 function to dynamically decide or separate endpoints for every connectors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to include more connectors:

Then the class should provide everything for making a request.
All the logic related to query engine should be inside the class. Define separate urls for separate engines and the class should decide and return the url also.

const query = queryBuilder.getQuery();
const startTime = queryBuilder.getStartTime();
const endTime = queryBuilder.getEndTime();

return { query, startTime, endTime };
};

export const getQueryLogs = (logsQuery: QueryLogs) => {
return Axios().post<Log[]>(LOG_TRINO_QUERY_URL(), formQueryOpts(logsQuery), {});
return Axios().post<Log[]>(LOG_QUERY_URL(), formQueryOpts(logsQuery), {});
};

export const getQueryLogsWithHeaders = (logsQuery: QueryLogs) => {
return Axios().post<LogsResponseWithHeaders>(LOG_TRINO_QUERY_URL({ fields: true }), formQueryOpts(logsQuery), {});
const queryBuilder = new QueryBuilder(logsQuery);
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, formQueryOpts(logsQuery), {});
};

// ------ Custom sql query
Expand All @@ -60,12 +64,13 @@ const makeCustomQueryRequestData = (logsQuery: LogsQuery, query: string) => {
return { query, startTime: optimizeTime(startTime), endTime: optimizeTime(endTime) };
};

export const getQueryResult = (logsQuery: LogsQuery, query = '', useTrino = true) => {
const endPoint = useTrino ? LOG_TRINO_QUERY_URL() : LOG_QUERY_URL();
export const getQueryResult = (logsQuery: LogsQuery, query = '') => {
const endPoint = LOG_QUERY_URL();
return Axios().post<Log[]>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};

export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '', useTrino = true) => {
const endPoint = useTrino ? LOG_TRINO_QUERY_URL({ fields: true }) : LOG_QUERY_URL({ fields: true });
export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '') => {
const queryBuilder = new QueryBuilder(logsQuery);
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};
11 changes: 6 additions & 5 deletions src/hooks/useGetStreamInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import { useQuery } from 'react-query';

const { setStreamInfo } = streamStoreReducers;

export const useGetStreamInfo = (currentStream: string) => {
export const useGetStreamInfo = (currentStream: string, initialFetch: boolean) => {
const [, setStreamStore] = useStreamStore((_store) => null);
const {
data: getStreamInfoData,
isError: getStreamInfoError,
isSuccess: getStreamInfoSuccess,
isLoading: getStreamInfoLoading,
refetch: getStreamInfoRefetch,
isRefetching: getStreamInfoRetching
isRefetching: getStreamInfoRefetching,
} = useQuery(['stream-info', currentStream], () => getLogStreamInfo(currentStream), {
retry: false,
refetchOnWindowFocus: false,
refetchOnMount: true,
enabled: currentStream !== '',
enabled: initialFetch,
// currentStream !== '',
onSuccess: (data) => {
setStreamStore((store) => setStreamInfo(store, data))
setStreamStore((store) => setStreamInfo(store, data));
},
onError: (data: AxiosError) => {
if (isAxiosError(data) && data.response) {
Expand All @@ -39,6 +40,6 @@ export const useGetStreamInfo = (currentStream: string) => {
getStreamInfoSuccess,
getStreamInfoLoading,
getStreamInfoRefetch,
getStreamInfoRetching
getStreamInfoRefetching,
};
};
4 changes: 3 additions & 1 deletion src/hooks/useQueryLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const useQueryLogs = () => {
order: SortOrder.DESCENDING,
},
});
const [queryEngine] = useAppStore((store) => store.instanceConfig?.queryEngine);
const [streamInfo] = useStreamStore((store) => store.info);
const [currentStream] = useAppStore((store) => store.currentStream);
const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp');
Expand Down Expand Up @@ -72,6 +73,7 @@ export const useQueryLogs = () => {

// refactor
const defaultQueryOpts = {
queryEngine,
streamName: currentStream || '',
startTime: timeRange.startTime,
endTime: timeRange.endTime,
Expand All @@ -87,7 +89,7 @@ export const useQueryLogs = () => {
const logsQueryRes = await (async () => {
if (isQuerySearchActive) {
if (activeMode === 'filters') {
const { parsedQuery } = parseQuery(appliedQuery, currentStream || '', {
const { parsedQuery } = parseQuery(queryEngine, appliedQuery, currentStream || '', {
startTime: timeRange.startTime,
endTime: timeRange.endTime,
timePartitionColumn,
Expand Down
33 changes: 23 additions & 10 deletions src/hooks/useQueryResult.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import { logsStoreReducers, useLogsStore } from '@/pages/Stream/providers/LogsPr
import _ from 'lodash';
import { useAppStore } from '@/layouts/MainLayout/providers/AppProvider';
import { notifyError } from '@/utils/notification';
import { QueryEngineType } from '@/@types/parseable/api/about';

type QueryData = {
queryEngine: QueryEngineType;
logsQuery: LogsQuery;
query: string;
onSuccess?: () => void;
Expand All @@ -18,7 +20,7 @@ type QueryData = {

export const useQueryResult = () => {
const fetchQueryHandler = async (data: QueryData) => {
const response = await getQueryResultWithHeaders(data.logsQuery, data.query, data.useTrino);
const response = await getQueryResultWithHeaders(data.logsQuery, data.query);
if (response.status !== 200) {
throw new Error(response.statusText);
}
Expand Down Expand Up @@ -62,9 +64,14 @@ export const useFetchCount = () => {
if (isQuerySearchActive) {
const finalQuery = custSearchQuery.replace(/SELECT[\s\S]*?FROM/i, 'SELECT COUNT(*) as count FROM');
if (activeMode === 'filters') {
return finalQuery;
return finalQuery
.replace(/LIMIT\s*\d+\s*(OFFSET\s*\d+)?\s*;?/i, '') // Removes LIMIT and optional OFFSET
.replace(/OFFSET\s*\d+\s*;?/i, '');
} else {
return finalQuery.replace(/ORDER\s+BY\s+[\w\s,.]+(?:ASC|DESC)?\s*(LIMIT\s+\d+)?\s*;?/i, '');
return finalQuery
.replace(/ORDER\s+BY\s+[\w\s,.]+(?:ASC|DESC)?\s*(LIMIT\s*\d+)?\s*(OFFSET\s*\d+)?\s*;?/i, '') // Removes ORDER BY, LIMIT, and OFFSET
.replace(/LIMIT\s*\d+\s*(OFFSET\s*\d+)?\s*;?/i, '') // Removes LIMIT and optional OFFSET
.replace(/OFFSET\s*\d+\s*;?/i, '');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove LIMIT OFFSET in normal query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this for the count query

}
} else {
return defaultQuery;
Expand All @@ -80,15 +87,21 @@ export const useFetchCount = () => {
isLoading: isCountLoading,
isRefetching: isCountRefetching,
refetch: refetchCount,
} = useQuery(['fetchCount', logsQuery], () => getQueryResult(logsQuery, query, false), {
onSuccess: (resp) => {
const count = _.first(resp.data)?.count;
} = useQuery(
['fetchCount', logsQuery],
async () => {
const data = await getQueryResult(logsQuery, query);
const count = _.first(data.data)?.count;
typeof count === 'number' && setLogsStore((store) => setTotalCount(store, count));
return data;
},
refetchOnWindowFocus: false,
retry: false,
enabled: false,
});
{
// query for count should always hit the endpoint for parseable query
refetchOnWindowFocus: false,
retry: false,
enabled: false,
},
);

return {
isCountLoading,
Expand Down
8 changes: 5 additions & 3 deletions src/pages/Stream/Views/Explore/LogsView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import LogTable from './StaticLogTable';
import useLogsFetcher from './useLogsFetcher';
import LogsViewConfig from './LogsViewConfig';

const LogsView = (props: { schemaLoading: boolean, infoLoading: boolean }) => {
const LogsView = (props: { schemaLoading: boolean; infoLoading: boolean }) => {
const { schemaLoading, infoLoading } = props;
const { errorMessage, hasNoData, showTable, isFetchingCount, logsLoading } = useLogsFetcher({
schemaLoading,
infoLoading
infoLoading,
});
const [viewMode] = useLogsStore((store) => store.viewMode);
const viewOpts = {
Expand All @@ -22,7 +22,9 @@ const LogsView = (props: { schemaLoading: boolean, infoLoading: boolean }) => {

return (
<Box style={{ display: 'flex', flex: 1, overflow: 'hidden' }}>
{viewMode === 'table' && <LogsViewConfig schemaLoading={schemaLoading} logsLoading={logsLoading} infoLoading={infoLoading}/>}
{viewMode === 'table' && (
<LogsViewConfig schemaLoading={schemaLoading} logsLoading={logsLoading} infoLoading={infoLoading} />
)}
{viewMode === 'table' ? <LogTable {...viewOpts} /> : <JsonView {...viewOpts} />}
</Box>
);
Expand Down
2 changes: 1 addition & 1 deletion src/pages/Stream/Views/Explore/useLogsFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const useLogsFetcher = (props: { schemaLoading: boolean; infoLoading: boolean })
}, [currentOffset, infoLoading]);

return {
logsLoading,
logsLoading: infoLoading || logsLoading,
errorMessage,
hasContentLoaded,
hasNoData,
Expand Down
10 changes: 6 additions & 4 deletions src/pages/Stream/Views/Manage/Management.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ const Management = (props: { schemaLoading: boolean }) => {
const getStreamAlertsConfig = useAlertsQuery(currentStream || '');
const getStreamStats = useLogStreamStats(currentStream || '');
const getRetentionConfig = useRetentionQuery(currentStream || '');
const getStreamInfo = useGetStreamInfo(currentStream || '');
const getStreamInfo = useGetStreamInfo(currentStream || '', currentStream !== null);
const hotTierFetch = useHotTier(currentStream || '');

// todo - handle loading and error states separately
const isAlertsLoading = getStreamAlertsConfig.isError || getStreamAlertsConfig.isLoading;
const isRetentionLoading =
getRetentionConfig.getLogRetentionIsLoading || instanceConfig === null;
const isRetentionLoading = getRetentionConfig.getLogRetentionIsLoading || instanceConfig === null;
const isHotTierLoading = hotTierFetch.getHotTierInfoLoading;

return (
<Stack style={{ padding: '1rem', paddingTop: '0', height: '90%' }}>
<DeleteStreamModal />
<Stack style={{ flexDirection: 'row', height: '40%' }} gap={24}>
<Stats isLoading={getStreamStats.getLogStreamStatsDataIsLoading} isError={getStreamStats.getLogStreamStatsDataIsError} />
<Stats
isLoading={getStreamStats.getLogStreamStatsDataIsLoading}
isError={getStreamStats.getLogStreamStatsDataIsError}
/>
<Info isLoading={getStreamInfo.getStreamInfoLoading} isError={getStreamInfo.getStreamInfoError} />
</Stack>
<Stack style={{ flexDirection: 'row', height: '57%' }} gap={24}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default function UpdateCustomPartitionField(props: { timePartition: strin
const [showEditField, setShowEditField] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
const { updateLogStreamMutation } = useLogStream();
const { getStreamInfoRefetch } = useGetStreamInfo(props.currentStream);
const { getStreamInfoRefetch } = useGetStreamInfo(props.currentStream, props.currentStream !== null);

useEffect(() => {
const customPartition: string = _.get(info, 'custom_partition', 'EMPTY_VALUE');
Expand Down
2 changes: 1 addition & 1 deletion src/pages/Stream/Views/Manage/UpdateTimePartitionLimit.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function UpdateTimePartitionLimit(props: { timePartition: string; currentStream:
const [error, setError] = useState<string | null>(null);
const [showEditField, setShowEditField] = useState<boolean>(false);
const { updateLogStreamMutation } = useLogStream();
const { getStreamInfoRefetch } = useGetStreamInfo(props.currentStream);
const { getStreamInfoRefetch } = useGetStreamInfo(props.currentStream, props.currentStream !== null);

useEffect(() => {
setValue(timePartitonLimit);
Expand Down
7 changes: 5 additions & 2 deletions src/pages/Stream/components/EventTimeLineGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ function ChartTooltip({ payload }: ChartTooltipProps) {
const EventTimeLineGraph = () => {
const { fetchQueryMutation } = useQueryResult();
const [currentStream] = useAppStore((store) => store.currentStream);
const [queryEngine] = useAppStore((store) => store.instanceConfig?.queryEngine);
const [appliedQuery] = useFilterStore((store) => store.appliedQuery);
const [{ activeMode, custSearchQuery }] = useLogsStore((store) => store.custQuerySearchState);
const [{ interval, startTime, endTime }] = useLogsStore((store) => store.timeRange);
Expand All @@ -274,12 +275,14 @@ const EventTimeLineGraph = () => {
access: [],
};
const whereClause =
activeMode === 'sql' ? extractWhereClause(custSearchQuery) : parseQuery(appliedQuery, localStream).where;
activeMode === 'sql'
? extractWhereClause(custSearchQuery)
: parseQuery(queryEngine, appliedQuery, localStream).where;
const query = generateCountQuery(localStream, modifiedStartTime, modifiedEndTime, compactType, whereClause);
fetchQueryMutation.mutate({
queryEngine: 'Parseable', // query for graph should always hit the endpoint for parseable query
logsQuery,
query,
useTrino: false,
});
}, [localStream, startTime.toISOString(), endTime.toISOString(), custSearchQuery]);

Expand Down
8 changes: 7 additions & 1 deletion src/pages/Stream/components/Querier/QueryCodeEditor.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import React, { FC, MutableRefObject, useCallback, useEffect, useState, useRef } from 'react';
import Editor from '@monaco-editor/react';
import { Box, Button, Flex, ScrollArea, Stack, Text, TextInput } from '@mantine/core';
import { QueryEngineType } from '@/@types/parseable/api/about';
import { ErrorMarker, errChecker } from '../ErrorMarker';
import useMountedState from '@/hooks/useMountedState';
import { notify } from '@/utils/notification';
Expand Down Expand Up @@ -32,13 +33,15 @@ const genColumnConfig = (fields: Field[]) => {
};

export const defaultCustSQLQuery = (
queryEngine: QueryEngineType,
streamName: string | null,
startTime: Date,
endTime: Date,
timePartitionColumn: string,
) => {
if (streamName && streamName.length > 0) {
const { query } = formQueryOpts({
queryEngine,
streamName: streamName || '',
limit: LOAD_LIMIT,
startTime,
Expand All @@ -57,7 +60,9 @@ const QueryCodeEditor: FC<{
onSqlSearchApply: (query: string) => void;
onClear: () => void;
}> = (props) => {
const [llmActive] = useAppStore((store) => store.instanceConfig?.llmActive);
const [instanceConfig] = useAppStore((store) => store.instanceConfig);
const llmActive = _.get(instanceConfig, 'llmActive', false);
const queryEngine = _.get(instanceConfig, 'queryEngine', 'Parseable');
const [streamInfo] = useStreamStore((store) => store.info);
const timePartitionColumn = _.get(streamInfo, 'time_partition', 'p_timestamp');
const [{ isQuerySearchActive, activeMode, savedFilterId, custSearchQuery }] = useLogsStore(
Expand All @@ -80,6 +85,7 @@ const QueryCodeEditor: FC<{
useEffect(() => {
if (props.queryCodeEditorRef.current === '' || currentStream !== localStreamName) {
props.queryCodeEditorRef.current = defaultCustSQLQuery(
queryEngine,
currentStream,
timeRange.startTime,
timeRange.endTime,
Expand Down
Loading
Loading