From 93dfd86dc7c6440378179aed92744a0d60dfbb71 Mon Sep 17 00:00:00 2001 From: Anton Standrik Date: Wed, 17 Sep 2025 20:48:25 +0300 Subject: [PATCH 1/3] fix: shard overload handle --- src/services/api/streaming.ts | 45 ++++++++++++++++++++----------- src/store/reducers/query/utils.ts | 8 +++++- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/services/api/streaming.ts b/src/services/api/streaming.ts index 08dc2ba636..be9c9d169b 100644 --- a/src/services/api/streaming.ts +++ b/src/services/api/streaming.ts @@ -2,13 +2,19 @@ import {parseMultipart} from '@mjackson/multipart-parser'; import qs from 'qs'; import { + isErrorChunk, isKeepAliveChunk, isQueryResponseChunk, isSessionChunk, isStreamDataChunk, } from '../../store/reducers/query/utils'; import type {Actions, StreamQueryParams} from '../../types/api/query'; -import type {QueryResponseChunk, SessionChunk, StreamDataChunk} from '../../types/store/streaming'; +import type { + QueryResponseChunk, + SessionChunk, + StreamDataChunk, + StreamingChunk, +} from '../../types/store/streaming'; import { BINARY_DATA_IN_PLAIN_TEXT_DISPLAY, DEV_ENABLE_TRACING_FOR_ALL_REQUESTS, @@ -94,24 +100,33 @@ export class StreamingAPI extends BaseYdbAPI { const traceId = response.headers.get('traceresponse')?.split('-')[1]; await parseMultipart(response.body, {boundary: BOUNDARY}, async (part) => { + const text = await part.text(); + + let chunk: unknown; try { - const chunk = JSON.parse(await part.text()); - - if (isSessionChunk(chunk)) { - const sessionChunk = chunk; - sessionChunk.meta.trace_id = traceId; - options.onSessionChunk(chunk); - } else if (isStreamDataChunk(chunk)) { - options.onStreamDataChunk(chunk); - } else if (isQueryResponseChunk(chunk)) { - options.onQueryResponseChunk(chunk); - } else if (isKeepAliveChunk(chunk)) { - // Logging for debug purposes - console.info('Received keep alive chunk'); - } + chunk = JSON.parse(text); } catch (e) { throw new Error(`Error parsing chunk: ${e}`); } + + if (isErrorChunk(chunk)) { + throw chunk; + } + + const streamingChunk = chunk as unknown as StreamingChunk; + + if (isSessionChunk(streamingChunk)) { + const sessionChunk = streamingChunk; + sessionChunk.meta.trace_id = traceId; + options.onSessionChunk(streamingChunk); + } else if (isStreamDataChunk(streamingChunk)) { + options.onStreamDataChunk(streamingChunk); + } else if (isQueryResponseChunk(streamingChunk)) { + options.onQueryResponseChunk(streamingChunk); + } else if (isKeepAliveChunk(streamingChunk)) { + // Logging for debug purposes + console.info('Received keep alive chunk'); + } }); } } diff --git a/src/store/reducers/query/utils.ts b/src/store/reducers/query/utils.ts index 7acc1c2025..dc1f111d1c 100644 --- a/src/store/reducers/query/utils.ts +++ b/src/store/reducers/query/utils.ts @@ -1,4 +1,4 @@ -import type {Actions} from '../../../types/api/query'; +import type {Actions, ErrorResponse} from '../../../types/api/query'; import type {QueryAction, QueryMode, QuerySyntax} from '../../../types/store/query'; import type { QueryResponseChunk, @@ -51,6 +51,12 @@ export function isKeepAliveChunk(content: StreamingChunk): content is SessionChu return content?.meta?.event === 'KeepAlive'; } +export function isErrorChunk(content: unknown): content is ErrorResponse { + return Boolean( + content && typeof content === 'object' && ('error' in content || 'issues' in content), + ); +} + export const prepareQueryWithPragmas = (query: string, pragmas?: string): string => { if (!pragmas || !pragmas.trim()) { return query; From 34cc7a78ea2ae2863bc84b642d0c9618059a1e71 Mon Sep 17 00:00:00 2001 From: Anton Standrik Date: Wed, 17 Sep 2025 20:57:36 +0300 Subject: [PATCH 2/3] fix: nanofix --- src/services/api/streaming.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/api/streaming.ts b/src/services/api/streaming.ts index be9c9d169b..2ac7b0345b 100644 --- a/src/services/api/streaming.ts +++ b/src/services/api/streaming.ts @@ -113,7 +113,7 @@ export class StreamingAPI extends BaseYdbAPI { throw chunk; } - const streamingChunk = chunk as unknown as StreamingChunk; + const streamingChunk = chunk as StreamingChunk; if (isSessionChunk(streamingChunk)) { const sessionChunk = streamingChunk; From 01a853fec0c1573afb184c1231785488aac500b7 Mon Sep 17 00:00:00 2001 From: Anton Standrik Date: Thu, 18 Sep 2025 11:45:37 +0300 Subject: [PATCH 3/3] fix: abort connection --- src/services/api/streaming.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/api/streaming.ts b/src/services/api/streaming.ts index 2ac7b0345b..dd9a8f0f3e 100644 --- a/src/services/api/streaming.ts +++ b/src/services/api/streaming.ts @@ -110,6 +110,7 @@ export class StreamingAPI extends BaseYdbAPI { } if (isErrorChunk(chunk)) { + await response.body?.cancel().catch(() => {}); throw chunk; }