11import { HttpService } from '@nestjs/axios' ;
22import { EngineExtension } from './engine.abstract' ;
3- import { stdout } from 'process' ;
4-
5- export type ChatStreamEvent = {
6- type : 'data' | 'error' | 'end' ;
7- data ?: any ;
8- error ?: any ;
9- } ;
3+ import stream from 'stream' ;
104
115export abstract class OAIEngineExtension extends EngineExtension {
126 abstract apiUrl : string ;
@@ -15,120 +9,43 @@ export abstract class OAIEngineExtension extends EngineExtension {
159 super ( ) ;
1610 }
1711
18- inference (
12+ override async inferenceStream (
1913 createChatDto : any ,
2014 headers : Record < string , string > ,
21- writableStream : WritableStream < ChatStreamEvent > ,
22- res ?: any ,
23- ) {
24- if ( createChatDto . stream === true ) {
25- if ( res ) {
26- res . writeHead ( 200 , {
27- 'Content-Type' : 'text/event-stream' ,
28- 'Cache-Control' : 'no-cache' ,
29- Connection : 'keep-alive' ,
30- 'Access-Control-Allow-Origin' : '*' ,
31- } ) ;
32- this . httpService
33- . post ( this . apiUrl , createChatDto , {
34- headers : {
35- 'Content-Type' : headers [ 'content-type' ] ?? 'application/json' ,
36- Authorization : headers [ 'authorization' ] ,
37- } ,
38- responseType : 'stream' ,
39- } )
40- . toPromise ( )
41- . then ( ( response ) => {
42- response ?. data . pipe ( res ) ;
43- } ) ;
44- } else {
45- const decoder = new TextDecoder ( 'utf-8' ) ;
46- const defaultWriter = writableStream . getWriter ( ) ;
47- defaultWriter . ready . then ( ( ) => {
48- this . httpService
49- . post ( this . apiUrl , createChatDto , {
50- headers : {
51- 'Content-Type' : headers [ 'content-type' ] ?? 'application/json' ,
52- Authorization : headers [ 'authorization' ] ,
53- } ,
54- responseType : 'stream' ,
55- } )
56- . subscribe ( {
57- next : ( response ) => {
58- response . data . on ( 'data' , ( chunk : any ) => {
59- let content = '' ;
60- const text = decoder . decode ( chunk ) ;
61- const lines = text . trim ( ) . split ( '\n' ) ;
62- let cachedLines = '' ;
63- for ( const line of lines ) {
64- try {
65- const toParse = cachedLines + line ;
66- if ( ! line . includes ( 'data: [DONE]' ) ) {
67- const data = JSON . parse ( toParse . replace ( 'data: ' , '' ) ) ;
68- content += data . choices [ 0 ] ?. delta ?. content ?? '' ;
69-
70- if ( content . startsWith ( 'assistant: ' ) ) {
71- content = content . replace ( 'assistant: ' , '' ) ;
72- }
73-
74- if ( content !== '' ) {
75- defaultWriter . write ( {
76- type : 'data' ,
77- data : content ,
78- } ) ;
79- }
80- }
81- } catch {
82- cachedLines = line ;
83- }
84- }
85- } ) ;
86-
87- response . data . on ( 'error' , ( error : any ) => {
88- defaultWriter . write ( {
89- type : 'error' ,
90- error,
91- } ) ;
92- } ) ;
15+ ) : Promise < stream . Readable > {
16+ const response = await this . httpService
17+ . post ( this . apiUrl , createChatDto , {
18+ headers : {
19+ 'Content-Type' : headers [ 'content-type' ] ?? 'application/json' ,
20+ Authorization : headers [ 'authorization' ] ,
21+ } ,
22+ responseType : 'stream' ,
23+ } )
24+ . toPromise ( ) ;
25+
26+ if ( ! response ) {
27+ throw new Error ( 'No response' ) ;
28+ }
9329
94- response . data . on ( 'end' , ( ) => {
95- // stdout.write('Stream end');
96- defaultWriter . write ( {
97- type : 'end' ,
98- } ) ;
99- } ) ;
100- } ,
30+ return response . data ;
31+ }
10132
102- error : ( error ) => {
103- stdout . write ( 'Stream error: ' + error ) ;
104- } ,
105- } ) ;
106- } ) ;
107- }
108- } else {
109- const defaultWriter = writableStream . getWriter ( ) ;
110- defaultWriter . ready . then ( ( ) => {
111- this . httpService
112- . post ( this . apiUrl , createChatDto , {
113- headers : {
114- 'Content-Type' : headers [ 'content-type' ] ?? 'application/json' ,
115- Authorization : headers [ 'authorization' ] ,
116- } ,
117- } )
118- . toPromise ( )
119- . then ( ( response ) => {
120- defaultWriter . write ( {
121- type : 'data' ,
122- data : response ?. data ,
123- } ) ;
124- } )
125- . catch ( ( error : any ) => {
126- defaultWriter . write ( {
127- type : 'error' ,
128- error,
129- } ) ;
130- } ) ;
131- } ) ;
33+ override async inference (
34+ createChatDto : any ,
35+ headers : Record < string , string > ,
36+ ) : Promise < any > {
37+ const response = await this . httpService
38+ . post ( this . apiUrl , createChatDto , {
39+ headers : {
40+ 'Content-Type' : headers [ 'content-type' ] ?? 'application/json' ,
41+ Authorization : headers [ 'authorization' ] ,
42+ } ,
43+ } )
44+ . toPromise ( ) ;
45+ if ( ! response ) {
46+ throw new Error ( 'No response' ) ;
13247 }
48+
49+ return response . data ;
13350 }
13451}
0 commit comments