@@ -67,6 +67,19 @@ class Stream extends Base {
6767 progressiveChunkSize_ : false
6868 }
6969
70+ /**
71+ * @member {AbortController|null} abortController=null
72+ * @protected
73+ */
74+ abortController = null
75+
76+ /**
77+ * Aborts the current stream request.
78+ */
79+ abort ( ) {
80+ this . abortController ?. abort ( )
81+ }
82+
7083 /**
7184 * @param {Object } operation
7285 * @returns {Promise }
@@ -84,76 +97,90 @@ class Stream extends Base {
8497 throw new Error ( 'No URL specified' )
8598 }
8699
100+ me . abortController = new AbortController ( ) ;
87101 me . store . isStreaming = true ;
88- const response = await fetch ( url ) ;
89102
90- if ( ! response . ok ) {
91- throw new Error ( `HTTP error! status: ${ response . status } ` )
92- }
103+ try {
104+ const response = await fetch ( url , { signal : me . abortController . signal } ) ;
93105
94- if ( ! response . body ) {
95- throw new Error ( 'ReadableStream not supported in this environment.' )
96- }
106+ if ( ! response . ok ) {
107+ throw new Error ( `HTTP error! status: ${ response . status } ` )
108+ }
97109
98- const reader = response . body . getReader ( ) ;
99- const decoder = new TextDecoder ( ) ;
110+ if ( ! response . body ) {
111+ throw new Error ( 'ReadableStream not supported in this environment.' )
112+ }
100113
101- let buffer = '' ;
102- let loaded = 0 ;
103- const total = parseInt ( response . headers . get ( 'content-length' ) || 0 , 10 ) ;
114+ const reader = response . body . getReader ( ) ;
115+ const decoder = new TextDecoder ( ) ;
104116
105- while ( true ) {
106- const { value, done } = await reader . read ( ) ;
117+ let buffer = '' ;
118+ let loaded = 0 ;
119+ const total = parseInt ( response . headers . get ( 'content-length' ) || 0 , 10 ) ;
107120
108- if ( done ) {
109- // Process any remaining buffer
110- if ( buffer . trim ( ) ) {
111- me . processLine ( buffer , chunk ) ;
112- count ++
113- }
114- // Flush remaining chunk
115- if ( chunk . length > 0 ) {
116- me . fire ( 'data' , chunk ) ;
117- me . store . isStreaming = false
121+ while ( true ) {
122+ const { value, done} = await reader . read ( ) ;
123+
124+ if ( done ) {
125+ // Process any remaining buffer
126+ if ( buffer . trim ( ) ) {
127+ me . processLine ( buffer , chunk ) ;
128+ count ++
129+ }
130+ // Flush remaining chunk
131+ if ( chunk . length > 0 ) {
132+ me . fire ( 'data' , chunk ) ;
133+ me . store . isStreaming = false
134+ }
135+ break
118136 }
119- break
120- }
121137
122- loaded += value . byteLength ;
123- me . fire ( 'progress' , { loaded, total} ) ;
138+ loaded += value . byteLength ;
139+ me . fire ( 'progress' , { loaded, total} ) ;
124140
125- buffer += decoder . decode ( value , { stream : true } ) ;
126- const lines = buffer . split ( '\n' ) ;
127- // Keep the last partial line in the buffer
128- buffer = lines . pop ( ) ;
141+ buffer += decoder . decode ( value , { stream : true } ) ;
142+ const lines = buffer . split ( '\n' ) ;
143+ // Keep the last partial line in the buffer
144+ buffer = lines . pop ( ) ;
129145
130- for ( const line of lines ) {
131- if ( line . trim ( ) ) {
132- me . processLine ( line , chunk ) ;
133- count ++ ;
146+ for ( const line of lines ) {
147+ if ( line . trim ( ) ) {
148+ me . processLine ( line , chunk ) ;
149+ count ++ ;
134150
135- if ( chunk . length >= currentChunkSize ) {
136- me . fire ( 'data' , chunk ) ;
151+ if ( chunk . length >= currentChunkSize ) {
152+ me . fire ( 'data' , chunk ) ;
137153
138- burstCount ++ ;
154+ burstCount ++ ;
139155
140- if ( progressiveChunkSize ) {
141- currentChunkSize = me . getProgressiveChunkSize ( count ) ;
142- } else if ( burstCount >= me . initialBurstCount ) {
143- currentChunkSize = chunkSize
144- }
156+ if ( progressiveChunkSize ) {
157+ currentChunkSize = me . getProgressiveChunkSize ( count ) ;
158+ } else if ( burstCount >= me . initialBurstCount ) {
159+ currentChunkSize = chunkSize
160+ }
145161
146- // Give the App Worker a minimal amount of time to breathe,
147- // so that logic can act upon events (e.g. sending out vdom updates).
148- await me . timeout ( 1 ) ;
162+ // Give the App Worker a minimal amount of time to breathe,
163+ // so that logic can act upon events (e.g. sending out vdom updates).
164+ await me . timeout ( 1 ) ;
149165
150- chunk = [ ]
166+ chunk = [ ]
167+ }
151168 }
152169 }
153170 }
154- }
155171
156- return { success : true , count}
172+ return { success : true , count}
173+ } catch ( e ) {
174+ if ( e . name === 'AbortError' ) {
175+ console . log ( 'Stream request aborted' ) ;
176+ me . store . isStreaming = false ;
177+ // Treat abort as a valid "partial success"
178+ return { success : true , count, aborted : true }
179+ }
180+ throw e
181+ } finally {
182+ me . abortController = null
183+ }
157184 }
158185
159186 /**
0 commit comments