-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathindex.ts
More file actions
159 lines (136 loc) · 4.32 KB
/
index.ts
File metadata and controls
159 lines (136 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import {
type Connection,
type ConnectionContext,
Server,
type WSMessage,
routePartykitRequest,
} from "partyserver";
import { nanoid } from "nanoid";
import { EventSourceParserStream } from "eventsource-parser/stream";
import type { ChatMessage, Message } from "../shared";
type Env = {
Ai: Ai;
};
export class Chat extends Server<Env> {
static options = { hibernate: true };
messages = [] as ChatMessage[];
broadcastMessage(message: Message, exclude?: string[]) {
this.broadcast(JSON.stringify(message), exclude);
}
async onStart() {
// this is where you can initialize things that need to be done before the server starts
// for example, load previous messages from a database or a service
// create the messages table if it doesn't exist
this.ctx.storage.sql.exec(
`CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY, user TEXT, role TEXT, content TEXT)`
);
// load the messages from the database
this.messages = this.ctx.storage.sql
.exec(`SELECT * FROM messages`)
.toArray() as ChatMessage[];
}
async onConnect(connection: Connection, ctx: ConnectionContext) {
connection.send(
JSON.stringify({
type: "all",
messages: this.messages,
} satisfies Message)
);
}
saveMessage(message: ChatMessage) {
// check if the message already exists
const existingMessage = this.messages.find((m) => m.id === message.id);
if (existingMessage) {
this.messages = this.messages.map((m) => {
if (m.id === message.id) {
return message;
}
return m;
});
} else {
this.messages.push(message);
}
this.ctx.storage.sql.exec(
`INSERT INTO messages (id, user, role, content) VALUES (?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET content = ?`,
message.id,
message.user,
message.role,
message.content,
message.content
);
}
async onMessage(connection: Connection, message: WSMessage) {
// let's broadcast the raw message to everyone else
this.broadcast(message);
// let's update our local messages store
const parsed = JSON.parse(message as string) as Message;
if (parsed.type === "add") {
// add the message to the local store
this.saveMessage(parsed);
// let's ask AI to respond as well for fun
const aiMessage = {
id: nanoid(8),
content: "...",
user: "AI",
role: "assistant",
} as const;
this.broadcastMessage({
type: "add",
...aiMessage,
});
const aiMessageStream = (await this.env.Ai.run(
"@cf/meta/llama-2-7b-chat-int8",
{
stream: true,
messages: this.messages.map((m) => ({
content: m.content,
role: m.role,
})),
}
)) as ReadableStream;
this.saveMessage(aiMessage);
const eventStream = aiMessageStream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
// We want the AI to respond to the message in real-time
// so we're going to stream every chunk as an "update" message
let buffer = "";
for await (const event of eventStream) {
if (event.data !== "[DONE]") {
// let's append the response to the buffer
buffer += JSON.parse(event.data).response;
// and broadcast the buffer as an update
this.broadcastMessage({
type: "update",
...aiMessage,
content: buffer + "...", // let's add an ellipsis to show it's still typing
});
} else {
// the AI is done responding
// we update our local messages store with the final response
this.saveMessage({
...aiMessage,
content: buffer,
});
// let's update the message with the final response
this.broadcastMessage({
type: "update",
...aiMessage,
content: buffer,
});
}
}
} else if (parsed.type === "update") {
// update the message in the local store
this.saveMessage(parsed);
}
}
}
export default {
async fetch(request, env) {
return (
(await routePartykitRequest(request, env)) ||
new Response("Not Found", { status: 404 })
);
},
} satisfies ExportedHandler<Env>;