@@ -3,13 +3,14 @@ import postgres from 'postgres';
3
3
import {
4
4
chat ,
5
5
message ,
6
+ type MessageDeprecated ,
6
7
messageDeprecated ,
7
8
vote ,
8
9
voteDeprecated ,
9
10
} from '../schema' ;
10
11
import { drizzle } from 'drizzle-orm/postgres-js' ;
11
12
import { inArray } from 'drizzle-orm' ;
12
- import { appendResponseMessages , UIMessage } from 'ai' ;
13
+ import { appendResponseMessages , type UIMessage } from 'ai' ;
13
14
14
15
config ( {
15
16
path : '.env.local' ,
@@ -22,8 +23,8 @@ if (!process.env.POSTGRES_URL) {
22
23
const client = postgres ( process . env . POSTGRES_URL ) ;
23
24
const db = drizzle ( client ) ;
24
25
25
- const BATCH_SIZE = 50 ; // Process 10 chats at a time
26
- const INSERT_BATCH_SIZE = 100 ; // Insert 100 messages at a time
26
+ const BATCH_SIZE = 100 ; // Process 100 chats at a time
27
+ const INSERT_BATCH_SIZE = 1000 ; // Insert 1000 messages at a time
27
28
28
29
type NewMessageInsert = {
29
30
id : string ;
@@ -40,16 +41,66 @@ type NewVoteInsert = {
40
41
isUpvoted : boolean ;
41
42
} ;
42
43
43
- async function createNewTable ( ) {
44
+ interface MessageDeprecatedContentPart {
45
+ type : string ;
46
+ content : unknown ;
47
+ }
48
+
49
+ function getMessageRank ( message : MessageDeprecated ) : number {
50
+ if (
51
+ message . role === 'assistant' &&
52
+ ( message . content as MessageDeprecatedContentPart [ ] ) . some (
53
+ ( contentPart ) => contentPart . type === 'tool-call' ,
54
+ )
55
+ ) {
56
+ return 0 ;
57
+ }
58
+
59
+ if (
60
+ message . role === 'tool' &&
61
+ ( message . content as MessageDeprecatedContentPart [ ] ) . some (
62
+ ( contentPart ) => contentPart . type === 'tool-result' ,
63
+ )
64
+ ) {
65
+ return 1 ;
66
+ }
67
+
68
+ if ( message . role === 'assistant' ) {
69
+ return 2 ;
70
+ }
71
+
72
+ return 3 ;
73
+ }
74
+
75
+ function dedupeParts < T extends { type : string ; [ k : string ] : any } > (
76
+ parts : T [ ] ,
77
+ ) : T [ ] {
78
+ const seen = new Set < string > ( ) ;
79
+ return parts . filter ( ( p ) => {
80
+ const key = `${ p . type } |${ JSON . stringify ( p . content ?? p ) } ` ;
81
+ if ( seen . has ( key ) ) return false ;
82
+ seen . add ( key ) ;
83
+ return true ;
84
+ } ) ;
85
+ }
86
+
87
+ function sanitizeParts < T extends { type : string ; [ k : string ] : any } > (
88
+ parts : T [ ] ,
89
+ ) : T [ ] {
90
+ return parts . filter (
91
+ ( part ) => ! ( part . type === 'reasoning' && part . reasoning === 'undefined' ) ,
92
+ ) ;
93
+ }
94
+
95
+ async function migrateMessages ( ) {
44
96
const chats = await db . select ( ) . from ( chat ) ;
97
+
45
98
let processedCount = 0 ;
46
99
47
- // Process chats in batches
48
100
for ( let i = 0 ; i < chats . length ; i += BATCH_SIZE ) {
49
101
const chatBatch = chats . slice ( i , i + BATCH_SIZE ) ;
50
102
const chatIds = chatBatch . map ( ( chat ) => chat . id ) ;
51
103
52
- // Fetch all messages and votes for the current batch of chats in bulk
53
104
const allMessages = await db
54
105
. select ( )
55
106
. from ( messageDeprecated )
@@ -60,20 +111,25 @@ async function createNewTable() {
60
111
. from ( voteDeprecated )
61
112
. where ( inArray ( voteDeprecated . chatId , chatIds ) ) ;
62
113
63
- // Prepare batches for insertion
64
114
const newMessagesToInsert : NewMessageInsert [ ] = [ ] ;
65
115
const newVotesToInsert : NewVoteInsert [ ] = [ ] ;
66
116
67
- // Process each chat in the batch
68
117
for ( const chat of chatBatch ) {
69
118
processedCount ++ ;
70
119
console . info ( `Processed ${ processedCount } /${ chats . length } chats` ) ;
71
120
72
- // Filter messages and votes for this specific chat
73
- const messages = allMessages . filter ( ( msg ) => msg . chatId === chat . id ) ;
121
+ const messages = allMessages
122
+ . filter ( ( message ) => message . chatId === chat . id )
123
+ . sort ( ( a , b ) => {
124
+ const differenceInTime =
125
+ new Date ( a . createdAt ) . getTime ( ) - new Date ( b . createdAt ) . getTime ( ) ;
126
+ if ( differenceInTime !== 0 ) return differenceInTime ;
127
+
128
+ return getMessageRank ( a ) - getMessageRank ( b ) ;
129
+ } ) ;
130
+
74
131
const votes = allVotes . filter ( ( v ) => v . chatId === chat . id ) ;
75
132
76
- // Group messages into sections
77
133
const messageSection : Array < UIMessage > = [ ] ;
78
134
const messageSections : Array < Array < UIMessage > > = [ ] ;
79
135
@@ -93,7 +149,6 @@ async function createNewTable() {
93
149
messageSections . push ( [ ...messageSection ] ) ;
94
150
}
95
151
96
- // Process each message section
97
152
for ( const section of messageSections ) {
98
153
const [ userMessage , ...assistantMessages ] = section ;
99
154
@@ -121,10 +176,14 @@ async function createNewTable() {
121
176
attachments : [ ] ,
122
177
} as NewMessageInsert ;
123
178
} else if ( message . role === 'assistant' ) {
179
+ const cleanParts = sanitizeParts (
180
+ dedupeParts ( message . parts || [ ] ) ,
181
+ ) ;
182
+
124
183
return {
125
184
id : message . id ,
126
185
chatId : chat . id ,
127
- parts : message . parts || [ ] ,
186
+ parts : cleanParts ,
128
187
role : message . role ,
129
188
createdAt : message . createdAt ,
130
189
attachments : [ ] ,
@@ -134,7 +193,6 @@ async function createNewTable() {
134
193
} )
135
194
. filter ( ( msg ) : msg is NewMessageInsert => msg !== null ) ;
136
195
137
- // Add messages to batch
138
196
for ( const msg of projectedUISection ) {
139
197
newMessagesToInsert . push ( msg ) ;
140
198
@@ -155,11 +213,9 @@ async function createNewTable() {
155
213
}
156
214
}
157
215
158
- // Batch insert messages
159
216
for ( let j = 0 ; j < newMessagesToInsert . length ; j += INSERT_BATCH_SIZE ) {
160
217
const messageBatch = newMessagesToInsert . slice ( j , j + INSERT_BATCH_SIZE ) ;
161
218
if ( messageBatch . length > 0 ) {
162
- // Ensure all required fields are present
163
219
const validMessageBatch = messageBatch . map ( ( msg ) => ( {
164
220
id : msg . id ,
165
221
chatId : msg . chatId ,
@@ -173,7 +229,6 @@ async function createNewTable() {
173
229
}
174
230
}
175
231
176
- // Batch insert votes
177
232
for ( let j = 0 ; j < newVotesToInsert . length ; j += INSERT_BATCH_SIZE ) {
178
233
const voteBatch = newVotesToInsert . slice ( j , j + INSERT_BATCH_SIZE ) ;
179
234
if ( voteBatch . length > 0 ) {
@@ -185,7 +240,7 @@ async function createNewTable() {
185
240
console . info ( `Migration completed: ${ processedCount } chats processed` ) ;
186
241
}
187
242
188
- createNewTable ( )
243
+ migrateMessages ( )
189
244
. then ( ( ) => {
190
245
console . info ( 'Script completed successfully' ) ;
191
246
process . exit ( 0 ) ;
0 commit comments