Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit ef04bf5

Browse files
committed
chore: clean up chat stream
1 parent e65ad47 commit ef04bf5

File tree

17 files changed

+196
-198
lines changed

17 files changed

+196
-198
lines changed

cortex-js/src/command.module.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import { ModelUpdateCommand } from './infrastructure/commanders/models/model-upd
3333
DatabaseModule,
3434
ModelsModule,
3535
CortexModule,
36-
ChatModule,
3736
ExtensionModule,
3837
HttpModule,
3938
CliUsecasesModule,

cortex-js/src/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { CommandFactory } from 'nest-commander';
33
import { CommandModule } from './command.module';
44

55
async function bootstrap() {
6-
await CommandFactory.run(CommandModule);
6+
await CommandFactory.run(CommandModule, ['warn', 'error']);
77
}
88

99
bootstrap();

cortex-js/src/domain/abstracts/engine.abstract.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
/* eslint-disable no-unused-vars, @typescript-eslint/no-unused-vars */
2+
import stream from 'stream';
23
import { Model, ModelSettingParams } from '../models/model.interface';
34
import { Extension } from './extension.abstract';
45

56
export abstract class EngineExtension extends Extension {
67
abstract provider: string;
78

8-
abstract inference(completion: any, req: any, stream: any, res?: any): void;
9+
abstract inference(dto: any, headers: Record<string, string>): Promise<any>;
10+
11+
abstract inferenceStream(dto: any, headers: any): Promise<stream.Readable>;
912

1013
async loadModel(
1114
model: Model,
Lines changed: 34 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
import { HttpService } from '@nestjs/axios';
22
import { EngineExtension } from './engine.abstract';
3-
import { stdout } from 'process';
4-
5-
export type ChatStreamEvent = {
6-
type: 'data' | 'error' | 'end';
7-
data?: any;
8-
error?: any;
9-
};
3+
import stream from 'stream';
104

115
export abstract class OAIEngineExtension extends EngineExtension {
126
abstract apiUrl: string;
@@ -15,120 +9,43 @@ export abstract class OAIEngineExtension extends EngineExtension {
159
super();
1610
}
1711

18-
inference(
12+
override async inferenceStream(
1913
createChatDto: any,
2014
headers: Record<string, string>,
21-
writableStream: WritableStream<ChatStreamEvent>,
22-
res?: any,
23-
) {
24-
if (createChatDto.stream === true) {
25-
if (res) {
26-
res.writeHead(200, {
27-
'Content-Type': 'text/event-stream',
28-
'Cache-Control': 'no-cache',
29-
Connection: 'keep-alive',
30-
'Access-Control-Allow-Origin': '*',
31-
});
32-
this.httpService
33-
.post(this.apiUrl, createChatDto, {
34-
headers: {
35-
'Content-Type': headers['content-type'] ?? 'application/json',
36-
Authorization: headers['authorization'],
37-
},
38-
responseType: 'stream',
39-
})
40-
.toPromise()
41-
.then((response) => {
42-
response?.data.pipe(res);
43-
});
44-
} else {
45-
const decoder = new TextDecoder('utf-8');
46-
const defaultWriter = writableStream.getWriter();
47-
defaultWriter.ready.then(() => {
48-
this.httpService
49-
.post(this.apiUrl, createChatDto, {
50-
headers: {
51-
'Content-Type': headers['content-type'] ?? 'application/json',
52-
Authorization: headers['authorization'],
53-
},
54-
responseType: 'stream',
55-
})
56-
.subscribe({
57-
next: (response) => {
58-
response.data.on('data', (chunk: any) => {
59-
let content = '';
60-
const text = decoder.decode(chunk);
61-
const lines = text.trim().split('\n');
62-
let cachedLines = '';
63-
for (const line of lines) {
64-
try {
65-
const toParse = cachedLines + line;
66-
if (!line.includes('data: [DONE]')) {
67-
const data = JSON.parse(toParse.replace('data: ', ''));
68-
content += data.choices[0]?.delta?.content ?? '';
69-
70-
if (content.startsWith('assistant: ')) {
71-
content = content.replace('assistant: ', '');
72-
}
73-
74-
if (content !== '') {
75-
defaultWriter.write({
76-
type: 'data',
77-
data: content,
78-
});
79-
}
80-
}
81-
} catch {
82-
cachedLines = line;
83-
}
84-
}
85-
});
86-
87-
response.data.on('error', (error: any) => {
88-
defaultWriter.write({
89-
type: 'error',
90-
error,
91-
});
92-
});
15+
): Promise<stream.Readable> {
16+
const response = await this.httpService
17+
.post(this.apiUrl, createChatDto, {
18+
headers: {
19+
'Content-Type': headers['content-type'] ?? 'application/json',
20+
Authorization: headers['authorization'],
21+
},
22+
responseType: 'stream',
23+
})
24+
.toPromise();
25+
26+
if (!response) {
27+
throw new Error('No response');
28+
}
9329

94-
response.data.on('end', () => {
95-
// stdout.write('Stream end');
96-
defaultWriter.write({
97-
type: 'end',
98-
});
99-
});
100-
},
30+
return response.data;
31+
}
10132

102-
error: (error) => {
103-
stdout.write('Stream error: ' + error);
104-
},
105-
});
106-
});
107-
}
108-
} else {
109-
const defaultWriter = writableStream.getWriter();
110-
defaultWriter.ready.then(() => {
111-
this.httpService
112-
.post(this.apiUrl, createChatDto, {
113-
headers: {
114-
'Content-Type': headers['content-type'] ?? 'application/json',
115-
Authorization: headers['authorization'],
116-
},
117-
})
118-
.toPromise()
119-
.then((response) => {
120-
defaultWriter.write({
121-
type: 'data',
122-
data: response?.data,
123-
});
124-
})
125-
.catch((error: any) => {
126-
defaultWriter.write({
127-
type: 'error',
128-
error,
129-
});
130-
});
131-
});
33+
override async inference(
34+
createChatDto: any,
35+
headers: Record<string, string>,
36+
): Promise<any> {
37+
const response = await this.httpService
38+
.post(this.apiUrl, createChatDto, {
39+
headers: {
40+
'Content-Type': headers['content-type'] ?? 'application/json',
41+
Authorization: headers['authorization'],
42+
},
43+
})
44+
.toPromise();
45+
if (!response) {
46+
throw new Error('No response');
13247
}
48+
49+
return response.data;
13350
}
13451
}

cortex-js/src/infrastructure/commanders/chat.command.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import { ChatUsecases } from '@/usecases/chat/chat.usecases';
21
import { CommandRunner, SubCommand, Option } from 'nest-commander';
32
import { ChatCliUsecases } from './usecases/chat.cli.usecases';
4-
import { CortexUsecases } from '@/usecases/cortex/cortex.usecases';
53
import { exit } from 'node:process';
64

75
type ChatOptions = {
@@ -10,10 +8,7 @@ type ChatOptions = {
108

119
@SubCommand({ name: 'chat', description: 'Start a chat with a model' })
1210
export class ChatCommand extends CommandRunner {
13-
constructor(
14-
private readonly chatUsecases: ChatUsecases,
15-
private readonly cortexUsecases: CortexUsecases,
16-
) {
11+
constructor(private readonly chatCliUsecases: ChatCliUsecases) {
1712
super();
1813
}
1914

@@ -24,11 +19,7 @@ export class ChatCommand extends CommandRunner {
2419
exit(1);
2520
}
2621

27-
const chatCliUsecases = new ChatCliUsecases(
28-
this.chatUsecases,
29-
this.cortexUsecases,
30-
);
31-
return chatCliUsecases.chat(modelId);
22+
return this.chatCliUsecases.chat(modelId);
3223
}
3324

3425
@Option({

cortex-js/src/infrastructure/commanders/prompt-constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//// HF Chat template
2-
export const OPEN_CHAT_3_5_JINJA = ``;
2+
export const OPEN_CHAT_3_5_JINJA = `{{ bos_token }}{% for message in messages %}{{ 'GPT4 Correct ' + message['role'].title() + ': ' + message['content'] + '<|end_of_turn|>'}}{% endfor %}{% if add_generation_prompt %}{{ 'GPT4 Correct Assistant:' }}{% endif %}`;
33

44
export const ZEPHYR_JINJA = `{% for message in messages %}
55
{% if message['role'] == 'user' %}

cortex-js/src/infrastructure/commanders/serve.command.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ type ServeOptions = {
1313
description: 'Providing API endpoint for Cortex backend',
1414
})
1515
export class ServeCommand extends CommandRunner {
16-
constructor() {
17-
super();
18-
}
19-
2016
async run(_input: string[], options?: ServeOptions): Promise<void> {
2117
const host = options?.host || defaultCortexJsHost;
2218
const port = options?.port || defaultCortexJsPort;
@@ -34,15 +30,15 @@ export class ServeCommand extends CommandRunner {
3430
}
3531

3632
@Option({
37-
flags: '--host <host>',
33+
flags: '-h, --host <host>',
3834
description: 'Host to serve the application',
3935
})
4036
parseHost(value: string) {
4137
return value;
4238
}
4339

4440
@Option({
45-
flags: '--port <port>',
41+
flags: '-p, --port <port>',
4642
description: 'Port to serve the application',
4743
})
4844
parsePort(value: string) {

cortex-js/src/infrastructure/commanders/shortcuts/run.command.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { CortexUsecases } from '@/usecases/cortex/cortex.usecases';
22
import { ModelsUsecases } from '@/usecases/models/models.usecases';
33
import { CommandRunner, SubCommand, Option } from 'nest-commander';
44
import { exit } from 'node:process';
5-
import { ChatUsecases } from '@/usecases/chat/chat.usecases';
65
import { ChatCliUsecases } from '../usecases/chat.cli.usecases';
76
import { defaultCortexCppHost, defaultCortexCppPort } from 'constant';
87

@@ -18,7 +17,7 @@ export class RunCommand extends CommandRunner {
1817
constructor(
1918
private readonly modelsUsecases: ModelsUsecases,
2019
private readonly cortexUsecases: CortexUsecases,
21-
private readonly chatUsecases: ChatUsecases,
20+
private readonly chatCliUsecases: ChatCliUsecases,
2221
) {
2322
super();
2423
}
@@ -36,11 +35,7 @@ export class RunCommand extends CommandRunner {
3635
false,
3736
);
3837
await this.modelsUsecases.startModel(modelId);
39-
const chatCliUsecases = new ChatCliUsecases(
40-
this.chatUsecases,
41-
this.cortexUsecases,
42-
);
43-
await chatCliUsecases.chat(modelId);
38+
await this.chatCliUsecases.chat(modelId);
4439
}
4540

4641
@Option({

cortex-js/src/infrastructure/commanders/usecases/chat.cli.usecases.ts

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import { ChatUsecases } from '@/usecases/chat/chat.usecases';
22
import { ChatCompletionRole } from '@/domain/models/message.interface';
33
import { exit, stdin, stdout } from 'node:process';
44
import * as readline from 'node:readline/promises';
5-
import { ChatStreamEvent } from '@/domain/abstracts/oai.abstract';
65
import { ChatCompletionMessage } from '@/infrastructure/dtos/chat/chat-completion-message.dto';
76
import { CreateChatCompletionDto } from '@/infrastructure/dtos/chat/create-chat-completion.dto';
87
import { CortexUsecases } from '@/usecases/cortex/cortex.usecases';
8+
import { Injectable } from '@nestjs/common';
99

10-
// TODO: make this class injectable
10+
@Injectable()
1111
export class ChatCliUsecases {
1212
private exitClause = 'exit()';
1313
private userIndicator = '>> ';
@@ -59,26 +59,34 @@ export class ChatCliUsecases {
5959
top_p: 0.7,
6060
};
6161

62-
let llmFullResponse = '';
63-
const writableStream = new WritableStream<ChatStreamEvent>({
64-
write(chunk) {
65-
if (chunk.type === 'data') {
66-
stdout.write(chunk.data ?? '');
67-
llmFullResponse += chunk.data ?? '';
68-
} else if (chunk.type === 'error') {
69-
console.log('Error!!');
70-
} else {
71-
messages.push({
72-
content: llmFullResponse,
73-
role: ChatCompletionRole.Assistant,
74-
});
75-
llmFullResponse = '';
76-
console.log('\n');
62+
const decoder = new TextDecoder('utf-8');
63+
this.chatUsecases.inferenceStream(chatDto, {}).then((response) => {
64+
response.on('data', (chunk) => {
65+
let content = '';
66+
const text = decoder.decode(chunk);
67+
const lines = text.trim().split('\n');
68+
let cachedLines = '';
69+
for (const line of lines) {
70+
try {
71+
const toParse = cachedLines + line;
72+
if (!line.includes('data: [DONE]')) {
73+
const data = JSON.parse(toParse.replace('data: ', ''));
74+
content += data.choices[0]?.delta?.content ?? '';
75+
76+
if (content.startsWith('assistant: ')) {
77+
content = content.replace('assistant: ', '');
78+
}
79+
80+
if (content.trim().length > 0) {
81+
stdout.write(content);
82+
}
83+
}
84+
} catch {
85+
cachedLines = line;
86+
}
7787
}
78-
},
88+
});
7989
});
80-
81-
this.chatUsecases.createChatCompletions(chatDto, {}, writableStream);
8290
});
8391
}
8492
}

cortex-js/src/infrastructure/commanders/usecases/cli.usecases.module.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import { InitCliUsecases } from './init.cli.usecases';
33
import { HttpModule } from '@nestjs/axios';
44
import { ModelsCliUsecases } from './models.cli.usecases';
55
import { ModelsModule } from '@/usecases/models/models.module';
6+
import { ChatCliUsecases } from './chat.cli.usecases';
7+
import { ChatModule } from '@/usecases/chat/chat.module';
8+
import { CortexModule } from '@/usecases/cortex/cortex.module';
69

710
@Module({
8-
imports: [HttpModule, ModelsModule],
9-
controllers: [],
10-
providers: [InitCliUsecases, ModelsCliUsecases],
11-
exports: [InitCliUsecases, ModelsCliUsecases],
11+
imports: [HttpModule, ModelsModule, ChatModule, CortexModule],
12+
providers: [InitCliUsecases, ModelsCliUsecases, ChatCliUsecases],
13+
exports: [InitCliUsecases, ModelsCliUsecases, ChatCliUsecases],
1214
})
1315
export class CliUsecasesModule {}

0 commit comments

Comments
 (0)