Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion cortex-js/src/infrastructure/controllers/threads.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@ import {
Param,
Delete,
UseInterceptors,
HttpCode,
Query,
DefaultValuePipe,
} from '@nestjs/common';
import { ThreadsUsecases } from '@/usecases/threads/threads.usecases';
import { CreateThreadDto } from '@/infrastructure/dtos/threads/create-thread.dto';
import { UpdateThreadDto } from '@/infrastructure/dtos/threads/update-thread.dto';
import { DeleteThreadResponseDto } from '@/infrastructure/dtos/threads/delete-thread.dto';
import { GetThreadResponseDto } from '@/infrastructure/dtos/threads/get-thread.dto';
import { ApiOperation, ApiParam, ApiTags, ApiResponse } from '@nestjs/swagger';
import {
ApiOperation,
ApiParam,
ApiTags,
ApiResponse,
ApiQuery,
} from '@nestjs/swagger';
import { TransformInterceptor } from '../interceptors/transform.interceptor';
import { ListMessagesResponseDto } from '../dtos/messages/list-message.dto';

@ApiTags('Threads')
@Controller('threads')
Expand All @@ -41,6 +51,74 @@ export class ThreadsController {
return this.threadsService.findAll();
}

@HttpCode(200)
@ApiResponse({
status: 200,
description: 'A list of message objects.',
type: ListMessagesResponseDto,
})
@ApiOperation({
summary: 'List messages',
description: 'Returns a list of messages for a given thread.',
})
@ApiParam({
name: 'id',
required: true,
description: 'The ID of the thread the messages belong to.',
})
@ApiQuery({
name: 'limit',
type: Number,
required: false,
description:
'A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20.',
})
@ApiQuery({
name: 'order',
type: String,
required: false,
description:
'Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order.',
})
@ApiQuery({
name: 'after',
type: String,
required: false,
description:
'A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list.',
})
@ApiQuery({
name: 'before',
type: String,
required: false,
description:
'A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list.',
})
@ApiQuery({
name: 'run_id',
type: String,
required: false,
description: 'Filter messages by the run ID that generated them.',
})
@Get(':id/messages')
getMessagesOfThread(
@Param('id') id: string,
@Query('limit', new DefaultValuePipe(20)) limit: number,
@Query('order', new DefaultValuePipe('desc')) order: 'asc' | 'desc',
@Query('after') after?: string,
@Query('before') before?: string,
@Query('run_id') runId?: string,
) {
return this.threadsService.getMessagesOfThread(
id,
limit,
order,
after,
before,
runId,
);
}

@ApiResponse({
status: 200,
description: 'Ok',
Expand Down
28 changes: 28 additions & 0 deletions cortex-js/src/infrastructure/dtos/page.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ApiProperty } from '@nestjs/swagger';
import { IsArray } from 'class-validator';

export class PageDto<T> {
@ApiProperty()
readonly object: string;

@IsArray()
@ApiProperty({ isArray: true })
readonly data: T[];

@ApiProperty()
readonly first_id: string | undefined;

@ApiProperty()
readonly last_id: string | undefined;

@ApiProperty()
readonly has_more: boolean;

constructor(data: T[], hasMore: boolean, firstId?: string, lastId?: string) {
this.object = 'list';
this.data = data;
this.first_id = firstId;
this.last_id = lastId;
this.has_more = hasMore;
}
}
1 change: 1 addition & 0 deletions cortex-js/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async function bootstrap() {

app.useGlobalPipes(
new ValidationPipe({
transform: true,
enableDebugMessages: true,
}),
);
Expand Down
49 changes: 48 additions & 1 deletion cortex-js/src/usecases/threads/threads.usecases.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { Inject, Injectable } from '@nestjs/common';
import { Inject, Injectable, NotFoundException } from '@nestjs/common';
import { CreateThreadDto } from '@/infrastructure/dtos/threads/create-thread.dto';
import { UpdateThreadDto } from '@/infrastructure/dtos/threads/update-thread.dto';
import { ThreadEntity } from '@/infrastructure/entities/thread.entity';
import { Repository } from 'typeorm';
import { v4 as uuidv4 } from 'uuid';
import { MessageEntity } from '@/infrastructure/entities/message.entity';
import { PageDto } from '@/infrastructure/dtos/page.dto';

@Injectable()
export class ThreadsUsecases {
constructor(
@Inject('THREAD_REPOSITORY')
private threadRepository: Repository<ThreadEntity>,
@Inject('MESSAGE_REPOSITORY')
private messageRepository: Repository<MessageEntity>,
) {}

async create(createThreadDto: CreateThreadDto): Promise<ThreadEntity> {
Expand All @@ -29,6 +33,49 @@ export class ThreadsUsecases {
return this.threadRepository.find();
}

async getMessagesOfThread(
id: string,
limit: number,
order: 'asc' | 'desc',
after?: string,
before?: string,
runId?: string,
) {
const thread = await this.findOne(id);
if (!thread) {
throw new NotFoundException(`Thread with id ${id} not found`);
}

const queryBuilder = this.messageRepository.createQueryBuilder();
const normalizedOrder = order === 'asc' ? 'ASC' : 'DESC';

queryBuilder
.where('thread_id = :id', { id })
.orderBy('created', normalizedOrder)
.take(limit + 1); // Fetch one more record than the limit

if (after) {
queryBuilder.andWhere('id > :after', { after });
}

if (before) {
queryBuilder.andWhere('id < :before', { before });
}

const { entities: messages } = await queryBuilder.getRawAndEntities();

let hasMore = false;
if (messages.length > limit) {
hasMore = true;
messages.pop(); // Remove the extra record
}

const firstId = messages[0]?.id ?? undefined;
const lastId = messages[messages.length - 1]?.id ?? undefined;

return new PageDto(messages, hasMore, firstId, lastId);
}

findOne(id: string) {
return this.threadRepository.findOne({ where: { id } });
}
Expand Down