diff --git a/src/services/api/streaming.ts b/src/services/api/streaming.ts index 08dc2ba636..dd9a8f0f3e 100644 --- a/src/services/api/streaming.ts +++ b/src/services/api/streaming.ts @@ -2,13 +2,19 @@ import {parseMultipart} from '@mjackson/multipart-parser'; import qs from 'qs'; import { + isErrorChunk, isKeepAliveChunk, isQueryResponseChunk, isSessionChunk, isStreamDataChunk, } from '../../store/reducers/query/utils'; import type {Actions, StreamQueryParams} from '../../types/api/query'; -import type {QueryResponseChunk, SessionChunk, StreamDataChunk} from '../../types/store/streaming'; +import type { + QueryResponseChunk, + SessionChunk, + StreamDataChunk, + StreamingChunk, +} from '../../types/store/streaming'; import { BINARY_DATA_IN_PLAIN_TEXT_DISPLAY, DEV_ENABLE_TRACING_FOR_ALL_REQUESTS, @@ -94,24 +100,34 @@ export class StreamingAPI extends BaseYdbAPI { const traceId = response.headers.get('traceresponse')?.split('-')[1]; await parseMultipart(response.body, {boundary: BOUNDARY}, async (part) => { + const text = await part.text(); + + let chunk: unknown; try { - const chunk = JSON.parse(await part.text()); - - if (isSessionChunk(chunk)) { - const sessionChunk = chunk; - sessionChunk.meta.trace_id = traceId; - options.onSessionChunk(chunk); - } else if (isStreamDataChunk(chunk)) { - options.onStreamDataChunk(chunk); - } else if (isQueryResponseChunk(chunk)) { - options.onQueryResponseChunk(chunk); - } else if (isKeepAliveChunk(chunk)) { - // Logging for debug purposes - console.info('Received keep alive chunk'); - } + chunk = JSON.parse(text); } catch (e) { throw new Error(`Error parsing chunk: ${e}`); } + + if (isErrorChunk(chunk)) { + await response.body?.cancel().catch(() => {}); + throw chunk; + } + + const streamingChunk = chunk as StreamingChunk; + + if (isSessionChunk(streamingChunk)) { + const sessionChunk = streamingChunk; + sessionChunk.meta.trace_id = traceId; + options.onSessionChunk(streamingChunk); + } else if (isStreamDataChunk(streamingChunk)) { + options.onStreamDataChunk(streamingChunk); + } else if (isQueryResponseChunk(streamingChunk)) { + options.onQueryResponseChunk(streamingChunk); + } else if (isKeepAliveChunk(streamingChunk)) { + // Logging for debug purposes + console.info('Received keep alive chunk'); + } }); } } diff --git a/src/store/reducers/query/utils.ts b/src/store/reducers/query/utils.ts index 7acc1c2025..dc1f111d1c 100644 --- a/src/store/reducers/query/utils.ts +++ b/src/store/reducers/query/utils.ts @@ -1,4 +1,4 @@ -import type {Actions} from '../../../types/api/query'; +import type {Actions, ErrorResponse} from '../../../types/api/query'; import type {QueryAction, QueryMode, QuerySyntax} from '../../../types/store/query'; import type { QueryResponseChunk, @@ -51,6 +51,12 @@ export function isKeepAliveChunk(content: StreamingChunk): content is SessionChu return content?.meta?.event === 'KeepAlive'; } +export function isErrorChunk(content: unknown): content is ErrorResponse { + return Boolean( + content && typeof content === 'object' && ('error' in content || 'issues' in content), + ); +} + export const prepareQueryWithPragmas = (query: string, pragmas?: string): string => { if (!pragmas || !pragmas.trim()) { return query;