diff --git a/src/containers/Tenant/ObjectSummary/SchemaTree/SchemaTree.tsx b/src/containers/Tenant/ObjectSummary/SchemaTree/SchemaTree.tsx
index d3331f0c61..6196d8f1c9 100644
--- a/src/containers/Tenant/ObjectSummary/SchemaTree/SchemaTree.tsx
+++ b/src/containers/Tenant/ObjectSummary/SchemaTree/SchemaTree.tsx
@@ -12,6 +12,7 @@ import {
} from '../../../../store/reducers/capabilities/hooks';
import {selectIsDirty, selectUserInput} from '../../../../store/reducers/query/query';
import {schemaApi} from '../../../../store/reducers/schema/schema';
+import {streamingQueriesApi} from '../../../../store/reducers/streamingQuery/streamingQuery';
import {tableSchemaDataApi} from '../../../../store/reducers/tableSchemaData';
import type {EPathType, TEvDescribeSchemeResult} from '../../../../types/api/schema';
import {uiFactory} from '../../../../uiFactory/uiFactory';
@@ -22,6 +23,7 @@ import {getSchemaControls} from '../../utils/controls';
import {
isChildlessPathType,
mapPathTypeToNavigationTreeType,
+ nodeStreamingQueryTypeToPathType,
nodeTableTypeToPathType,
} from '../../utils/schema';
import {getActions} from '../../utils/schemaActions';
@@ -49,6 +51,10 @@ export function SchemaTree(props: SchemaTreeProps) {
getTableSchemaDataQuery,
{currentData: actionsSchemaData, isFetching: isActionsDataFetching},
] = tableSchemaDataApi.useLazyGetTableSchemaDataQuery();
+ const [
+ getStreamingQueryInfo,
+ {currentData: streamingSysData, isFetching: isStreamingInfoFetching},
+ ] = streamingQueriesApi.useLazyGetStreamingQueryInfoQuery();
const isTopicPreviewAvailable = useTopicDataAvailable();
@@ -146,6 +152,8 @@ export function SchemaTree(props: SchemaTreeProps) {
schemaData: actionsSchemaData,
isSchemaDataLoading: isActionsDataFetching,
hasMonitoring: typeof uiFactory.renderMonitoring === 'function',
+ streamingQueryData: streamingSysData,
+ isStreamingQueryTextLoading: isStreamingInfoFetching,
},
databaseFullPath,
database,
@@ -157,9 +165,11 @@ export function SchemaTree(props: SchemaTreeProps) {
input,
isActionsDataFetching,
isDirty,
+ isStreamingInfoFetching,
onActivePathUpdate,
databaseFullPath,
database,
+ streamingSysData,
]);
return (
@@ -188,6 +198,11 @@ export function SchemaTree(props: SchemaTreeProps) {
getTableSchemaDataQuery({path, database, type: pathType, databaseFullPath});
}
+ const streamingPathType = nodeStreamingQueryTypeToPathType[type];
+ if (isOpen && streamingPathType) {
+ getStreamingQueryInfo({database, path}, true); // preferCacheValue = true
+ }
+
return [];
}}
renderAdditionalNodeElements={getSchemaControls(
diff --git a/src/containers/Tenant/Query/NewSQL/NewSQL.tsx b/src/containers/Tenant/Query/NewSQL/NewSQL.tsx
index a0710fb4aa..8331b018e8 100644
--- a/src/containers/Tenant/Query/NewSQL/NewSQL.tsx
+++ b/src/containers/Tenant/Query/NewSQL/NewSQL.tsx
@@ -3,7 +3,13 @@ import React from 'react';
import {ChevronDown, Persons} from '@gravity-ui/icons';
import type {DropdownMenuItem} from '@gravity-ui/uikit';
import {Button, DropdownMenu} from '@gravity-ui/uikit';
-import {AsyncReplicationIcon, TableIcon, TopicIcon, TransferIcon} from 'ydb-ui-components';
+import {
+ AsyncReplicationIcon,
+ StreamingQueryIcon,
+ TableIcon,
+ TopicIcon,
+ TransferIcon,
+} from 'ydb-ui-components';
import {useChangeInputWithConfirmation} from '../../../../utils/hooks/withConfirmation/useChangeInputWithConfirmation';
import {insertSnippetToEditor} from '../../../../utils/monaco/insertSnippet';
@@ -93,6 +99,28 @@ export function NewSQL() {
},
],
},
+ {
+ text: i18n('menu.streaming-query'),
+ iconStart: ,
+ items: [
+ {
+ text: i18n('action.create-streaming-query'),
+ action: actions.createStreamingQuery,
+ },
+ {
+ text: i18n('action.alter-streaming-query-settings'),
+ action: actions.alterStreamingQuerySettings,
+ },
+ {
+ text: i18n('action.alter-streaming-query-text'),
+ action: actions.alterStreamingQueryText,
+ },
+ {
+ text: i18n('action.drop-streaming-query'),
+ action: actions.dropStreamingQuery,
+ },
+ ],
+ },
{
text: i18n('menu.replication'),
iconStart: ,
diff --git a/src/containers/Tenant/Query/NewSQL/i18n/en.json b/src/containers/Tenant/Query/NewSQL/i18n/en.json
index 9e6916686d..e2730d7a3d 100644
--- a/src/containers/Tenant/Query/NewSQL/i18n/en.json
+++ b/src/containers/Tenant/Query/NewSQL/i18n/en.json
@@ -14,6 +14,7 @@
"action.drop-external-table": "Drop external table",
"menu.tables": "Tables",
"menu.topics": "Topics",
+ "menu.streaming-query": "Streaming query",
"menu.capture": "Change data capture",
"menu.replication": "Async replication",
"menu.transfer": "Transfer",
@@ -33,5 +34,9 @@
"action.alter-async-replication": "Alter async replication",
"action.drop-async-replication": "Drop async replication",
"action.alter-transfer": "Alter transfer",
- "action.drop-transfer": "Drop transfer"
+ "action.drop-transfer": "Drop transfer",
+ "action.create-streaming-query": "Create streaming query",
+ "action.alter-streaming-query-settings": "Alter query settings",
+ "action.alter-streaming-query-text": "Alter query text",
+ "action.drop-streaming-query": "Drop query"
}
diff --git a/src/containers/Tenant/i18n/en.json b/src/containers/Tenant/i18n/en.json
index 4ff6ad3ca3..46e11c5be9 100644
--- a/src/containers/Tenant/i18n/en.json
+++ b/src/containers/Tenant/i18n/en.json
@@ -36,6 +36,7 @@
"actions.createAsyncReplication": "Create async replication...",
"actions.createTransfer": "Create transfer...",
"actions.createView": "Create view...",
+ "actions.createStreamingQuery": "Create streaming query...",
"actions.dropTable": "Drop table...",
"actions.dropTopic": "Drop topic...",
"actions.dropView": "Drop view...",
@@ -51,6 +52,9 @@
"actions.alterTransfer": "Alter transfer...",
"actions.dropReplication": "Drop async replicaton...",
"actions.dropTransfer": "Drop transfer...",
+ "actions.dropStreamingQuery": "Drop query...",
+ "actions.alterStreamingQuerySettings": "Alter query settings...",
+ "actions.alterStreamingQueryText": "Alter query text...",
"actions.createDirectory": "Create directory",
"schema.tree.dialog.placeholder": "Relative path",
"schema.tree.dialog.invalid": "Invalid path",
diff --git a/src/containers/Tenant/utils/newSQLQueryActions.ts b/src/containers/Tenant/utils/newSQLQueryActions.ts
index ed1526c24e..61aa045ecf 100644
--- a/src/containers/Tenant/utils/newSQLQueryActions.ts
+++ b/src/containers/Tenant/utils/newSQLQueryActions.ts
@@ -1,6 +1,8 @@
import {
addTableIndex,
alterAsyncReplicationTemplate,
+ alterStreamingQuerySettingsTemplate,
+ alterStreamingQueryText,
alterTableTemplate,
alterTopicTemplate,
alterTransferTemplate,
@@ -9,6 +11,7 @@ import {
createColumnTableTemplate,
createExternalTableTemplate,
createGroupTemplate,
+ createStreamingQueryTemplate,
createTableTemplate,
createTopicTemplate,
createTransferTemplate,
@@ -18,6 +21,7 @@ import {
dropAsyncReplicationTemplate,
dropExternalTableTemplate,
dropGroupTemplate,
+ dropStreamingQueryTemplate,
dropTableIndex,
dropTableTemplate,
dropTopicTemplate,
@@ -39,6 +43,7 @@ export const bindActions = (changeUserInput: (input: string) => void) => {
createRowTable: inputQuery(createTableTemplate),
createColumnTable: inputQuery(createColumnTableTemplate),
createAsyncReplication: inputQuery(createAsyncReplicationTemplate),
+ createStreamingQuery: inputQuery(createStreamingQueryTemplate),
alterAsyncReplication: inputQuery(alterAsyncReplicationTemplate),
dropAsyncReplication: inputQuery(dropAsyncReplicationTemplate),
createTransfer: inputQuery(createTransferTemplate),
@@ -56,6 +61,9 @@ export const bindActions = (changeUserInput: (input: string) => void) => {
dropTable: inputQuery(dropTableTemplate),
deleteRows: inputQuery(deleteRowsTemplate),
updateTable: inputQuery(updateTableTemplate),
+ alterStreamingQueryText: inputQuery(alterStreamingQueryText),
+ alterStreamingQuerySettings: inputQuery(alterStreamingQuerySettingsTemplate),
+ dropStreamingQuery: inputQuery(dropStreamingQueryTemplate),
createUser: inputQuery(createUserTemplate),
createGroup: inputQuery(createGroupTemplate),
createCdcStream: inputQuery(createCdcStreamTemplate),
diff --git a/src/containers/Tenant/utils/schema.ts b/src/containers/Tenant/utils/schema.ts
index ed833b2bb0..54a822fa1b 100644
--- a/src/containers/Tenant/utils/schema.ts
+++ b/src/containers/Tenant/utils/schema.ts
@@ -57,6 +57,11 @@ export const nodeTableTypeToPathType: Partial> =
+ {
+ streaming_query: EPathType.EPathTypeStreamingQuery,
+ };
+
export const mapPathTypeToNavigationTreeType = (
type: EPathType = EPathType.EPathTypeDir,
subType?: EPathSubType,
diff --git a/src/containers/Tenant/utils/schemaActions.tsx b/src/containers/Tenant/utils/schemaActions.tsx
index 27a8be54a3..4b63ddf2f7 100644
--- a/src/containers/Tenant/utils/schemaActions.tsx
+++ b/src/containers/Tenant/utils/schemaActions.tsx
@@ -11,6 +11,7 @@ import {
TENANT_QUERY_TABS_ID,
} from '../../../store/reducers/tenant/constants';
import {setDiagnosticsTab, setQueryTab, setTenantPage} from '../../../store/reducers/tenant/tenant';
+import type {IQueryResult} from '../../../types/store/query';
import createToast from '../../../utils/createToast';
import {insertSnippetToEditor} from '../../../utils/monaco/insertSnippet';
import {transformPath} from '../ObjectSummary/transformPath';
@@ -21,6 +22,8 @@ import type {TemplateFn} from './schemaQueryTemplates';
import {
addTableIndex,
alterAsyncReplicationTemplate,
+ alterStreamingQuerySettingsTemplate,
+ alterStreamingQueryText,
alterTableTemplate,
alterTopicTemplate,
alterTransferTemplate,
@@ -28,12 +31,14 @@ import {
createCdcStreamTemplate,
createColumnTableTemplate,
createExternalTableTemplate,
+ createStreamingQueryTemplate,
createTableTemplate,
createTopicTemplate,
createTransferTemplate,
createViewTemplate,
dropAsyncReplicationTemplate,
dropExternalTableTemplate,
+ dropStreamingQueryTemplate,
dropTableIndex,
dropTableTemplate,
dropTopicTemplate,
@@ -53,6 +58,8 @@ interface ActionsAdditionalParams {
schemaData?: SchemaData[];
isSchemaDataLoading?: boolean;
hasMonitoring?: boolean;
+ streamingQueryData?: IQueryResult;
+ isStreamingQueryTextLoading?: boolean;
}
interface BindActionParams {
@@ -74,6 +81,7 @@ const bindActions = (
getConfirmation,
getConnectToDBDialog,
schemaData,
+ streamingQueryData,
} = additionalEffects;
const inputQuery = (tmpl: TemplateFn) => () => {
@@ -82,7 +90,7 @@ const bindActions = (
dispatch(setTenantPage(TENANT_PAGES_IDS.query));
dispatch(setQueryTab(TENANT_QUERY_TABS_ID.newQuery));
setActivePath(params.path);
- insertSnippetToEditor(tmpl({...params, schemaData}));
+ insertSnippetToEditor(tmpl({...params, schemaData, streamingQueryData}));
};
if (getConfirmation) {
const confirmedPromise = getConfirmation();
@@ -129,6 +137,10 @@ const bindActions = (
dropTopic: inputQuery(dropTopicTemplate),
createView: inputQuery(createViewTemplate),
dropView: inputQuery(dropViewTemplate),
+ createStreamingQuery: inputQuery(createStreamingQueryTemplate),
+ alterStreamingQuerySettings: inputQuery(alterStreamingQuerySettingsTemplate),
+ alterStreamingQueryText: inputQuery(alterStreamingQueryText),
+ dropStreamingQuery: inputQuery(dropStreamingQueryTemplate),
dropIndex: inputQuery(dropTableIndex),
addTableIndex: inputQuery(addTableIndex),
createCdcStream: inputQuery(createCdcStreamTemplate),
@@ -219,6 +231,7 @@ export const getActions =
},
{text: i18n('actions.createTopic'), action: actions.createTopic},
{text: i18n('actions.createView'), action: actions.createView},
+ {text: i18n('actions.createStreamingQuery'), action: actions.createStreamingQuery},
];
const alterTableGroupItem = {
@@ -334,6 +347,25 @@ export const getActions =
[copyItem, {text: i18n('actions.dropIndex'), action: actions.dropIndex}],
];
+ const STREAMING_QUERY_SET: ActionsSet = [
+ [copyItem],
+ [
+ {
+ text: i18n('actions.alterStreamingQuerySettings'),
+ action: actions.alterStreamingQuerySettings,
+ },
+ getActionWithLoader({
+ text: i18n('actions.alterStreamingQueryText'),
+ action: actions.alterStreamingQueryText,
+ isLoading: additionalEffects.isStreamingQueryTextLoading,
+ }),
+ {
+ text: i18n('actions.dropStreamingQuery'),
+ action: actions.dropStreamingQuery,
+ },
+ ],
+ ];
+
const JUST_COPY: ActionsSet = [copyItem];
// verbose mapping to guarantee a correct actions set for new node types
@@ -362,7 +394,7 @@ export const getActions =
view: VIEW_SET,
- streaming_query: JUST_COPY,
+ streaming_query: STREAMING_QUERY_SET,
};
return nodeTypeToActions[type];
diff --git a/src/containers/Tenant/utils/schemaQueryTemplates.ts b/src/containers/Tenant/utils/schemaQueryTemplates.ts
index bcbbded193..c28a664ec8 100644
--- a/src/containers/Tenant/utils/schemaQueryTemplates.ts
+++ b/src/containers/Tenant/utils/schemaQueryTemplates.ts
@@ -1,9 +1,12 @@
+import type {IQueryResult} from '../../../types/store/query';
+import {getStringifiedData} from '../../../utils/dataFormatters/dataFormatters';
import type {SchemaData} from '../Schema/SchemaViewer/types';
export interface SchemaQueryParams {
path: string;
relativePath: string;
schemaData?: SchemaData[];
+ streamingQueryData?: IQueryResult;
}
export type TemplateFn = (params?: SchemaQueryParams) => string;
@@ -12,6 +15,21 @@ function normalizeParameter(param: string) {
return param.replace(/\$/g, '\\$');
}
+function toLF(str: string) {
+ return str.replace(/\r\n?/g, '\n');
+}
+
+function stripAllIndent(str: string) {
+ return str
+ .split('\n')
+ .map((line) => line.trim())
+ .join('\n');
+}
+
+function indentBlock(str: string, pad = ' ') {
+ return str.replace(/^/gm, pad);
+}
+
export const createTableTemplate = (params?: SchemaQueryParams) => {
const tableName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}/my_row_table\``
@@ -298,6 +316,59 @@ ALTER TRANSFER ${path}
SET USING \\$l;`;
};
+export const createStreamingQueryTemplate = (params?: SchemaQueryParams) => {
+ const streamingQueryName = params?.relativePath
+ ? `\`${normalizeParameter(params.relativePath)}/my_streaming_query\``
+ : '${1:}';
+ return `CREATE STREAMING QUERY ${streamingQueryName} WITH (
+ RUN = TRUE -- Run the query after creation
+) AS
+DO BEGIN
+ INSERT INTO \${2:}.\${3:}
+ SELECT * FROM \${2:}.\${4:};
+END DO;`;
+};
+
+export const alterStreamingQuerySettingsTemplate = (params?: SchemaQueryParams) => {
+ const streamingQueryName = params?.relativePath
+ ? `\`${normalizeParameter(params.relativePath)}\``
+ : '${1:}';
+ return `ALTER STREAMING QUERY ${streamingQueryName} SET (
+ RUN = FALSE, -- Stop query execution
+ RESOURCE_POOL = "default" -- Workload manager pool for query
+);`;
+};
+
+export const alterStreamingQueryText = (params?: SchemaQueryParams) => {
+ const streamingQueryName = params?.relativePath
+ ? `\`${normalizeParameter(params.relativePath)}\``
+ : '${1:}';
+
+ const sysData = params?.streamingQueryData;
+ const rawQueryText = getStringifiedData(sysData?.resultSets?.[0]?.result?.[0]?.Text);
+ let queryText = toLF(rawQueryText);
+ queryText = queryText.trim();
+ queryText = stripAllIndent(queryText);
+ queryText = normalizeParameter(queryText);
+
+ const bodyQueryText = queryText
+ ? indentBlock(queryText)
+ : indentBlock('${2:}');
+ return `ALTER STREAMING QUERY ${streamingQueryName} SET (
+ FORCE = TRUE, -- Allow to drop last query checkpoint if query state can't be loaded
+) AS
+DO BEGIN
+${bodyQueryText}
+END DO;`;
+};
+
+export const dropStreamingQueryTemplate = (params?: SchemaQueryParams) => {
+ const streamingQueryName = params?.relativePath
+ ? `\`${normalizeParameter(params.relativePath)}\``
+ : '${1:}';
+ return `DROP STREAMING QUERY ${streamingQueryName};`;
+};
+
export const addTableIndex = (params?: SchemaQueryParams) => {
const path = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}\``
diff --git a/src/store/reducers/streamingQuery/streamingQuery.ts b/src/store/reducers/streamingQuery/streamingQuery.ts
new file mode 100644
index 0000000000..9901d23bfb
--- /dev/null
+++ b/src/store/reducers/streamingQuery/streamingQuery.ts
@@ -0,0 +1,46 @@
+import {QUERY_TECHNICAL_MARK} from '../../../utils/constants';
+import {isQueryErrorResponse, parseQueryAPIResponse} from '../../../utils/query';
+import {api} from '../api';
+
+function getStreamingQueryInfoSQL(path: string) {
+ const safePath = path.replace(/'/g, "''");
+ return `${QUERY_TECHNICAL_MARK}
+SELECT
+ Status AS State,
+ Issues AS Error,
+ Text
+FROM \`.sys/streaming_queries\`
+WHERE Path = '${safePath}'
+LIMIT 1`;
+}
+
+export const streamingQueriesApi = api.injectEndpoints({
+ endpoints: (build) => ({
+ getStreamingQueryInfo: build.query({
+ queryFn: async ({database, path}: {database: string; path: string}, {signal}) => {
+ try {
+ const response = await window.api.viewer.sendQuery(
+ {
+ query: getStreamingQueryInfoSQL(path),
+ database,
+ action: 'execute-scan',
+ internal_call: true,
+ },
+ {signal, withRetries: true},
+ );
+
+ if (isQueryErrorResponse(response)) {
+ return {error: response};
+ }
+
+ const data = parseQueryAPIResponse(response);
+ return {data};
+ } catch (error) {
+ return {error};
+ }
+ },
+ providesTags: ['All'],
+ }),
+ }),
+ overrideExisting: 'throw',
+});