Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/src/entities/ai/ai-use-cases.interface.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { InTransactionEnum } from '../../enums/in-transaction.enum.js';
import { FindOneConnectionDs } from '../connection/application/data-structures/find-one-connection.ds.js';
import { RequestAISettingsCreationDs } from './application/data-structures/request-ai-settings-creation.ds.js';
import { RequestInfoFromTableDSV2 } from './application/data-structures/request-info-from-table.ds.js';

export interface IRequestInfoFromTableV2 {
execute(inputData: RequestInfoFromTableDSV2, inTransaction: InTransactionEnum): Promise<void>;
}

export interface IAISettingsAndWidgetsCreation {
execute(connectionData: FindOneConnectionDs, inTransaction: InTransactionEnum): Promise<void>;
execute(inputData: RequestAISettingsCreationDs, inTransaction: InTransactionEnum): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Response } from 'express';
import { FindOneConnectionDs } from '../../../connection/application/data-structures/find-one-connection.ds.js';

export class RequestAISettingsCreationDs extends FindOneConnectionDs {
response: Response;
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { BadRequestException, Inject, Injectable, Scope } from '@nestjs/common';
import Sentry from '@sentry/minimal';
import { Response } from 'express';
import AbstractUseCase from '../../../common/abstract-use.case.js';
import { IGlobalDatabaseContext } from '../../../common/application/global-database-context.interface.js';
import { BaseType } from '../../../common/data-injection.tokens.js';
import { Messages } from '../../../exceptions/text/messages.js';
import { FindOneConnectionDs } from '../../connection/application/data-structures/find-one-connection.ds.js';
import { getErrorMessage } from '../../../helpers/get-error-message.js';
import { SharedJobsService } from '../../shared-jobs/shared-jobs.service.js';
import { IAISettingsAndWidgetsCreation } from '../ai-use-cases.interface.js';
import { RequestAISettingsCreationDs } from '../application/data-structures/request-ai-settings-creation.ds.js';

@Injectable({ scope: Scope.REQUEST })
export class RequestAISettingsAndWidgetsCreationUseCase
extends AbstractUseCase<FindOneConnectionDs, void>
extends AbstractUseCase<RequestAISettingsCreationDs, void>
implements IAISettingsAndWidgetsCreation
{
constructor(
Expand All @@ -20,14 +23,43 @@ export class RequestAISettingsAndWidgetsCreationUseCase
super();
}

public async implementation(connectionData: FindOneConnectionDs): Promise<void> {
const { connectionId, masterPwd } = connectionData;
public async implementation(inputData: RequestAISettingsCreationDs): Promise<void> {
const { connectionId, masterPwd, response } = inputData;

const connection = await this._dbContext.connectionRepository.findAndDecryptConnection(connectionId, masterPwd);
if (!connection) {
throw new BadRequestException(Messages.CONNECTION_NOT_FOUND);
}

await this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection);
this.setupResponseHeaders(response);

try {
await this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection, (chunk) =>
this.writeChunk(response, chunk),
);
this.writeChunk(response, { type: 'complete' });
response.end();
} catch (error) {
Sentry.captureException(error);
if (!response.headersSent) {
response.status(500).send({ error: 'An error occurred while processing your request.' });
return;
}
this.writeChunk(response, { type: 'error', message: getErrorMessage(error) });
response.end();
Comment on lines +37 to +49
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify write/end calls and absence of writable-state guards in this use case.
rg -n --type=ts -C2 'response\.write\(|response\.end\(|headersSent|writableEnded|destroyed' \
  backend/src/entities/ai/use-cases/request-ai-settings-and-widgets-creation.use.case.ts

Repository: rocket-admin/rocketadmin

Length of output: 676


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="backend/src/entities/ai/use-cases/request-ai-settings-and-widgets-creation.use.case.ts"
echo "== $FILE =="
wc -l "$FILE"
sed -n '1,140p' "$FILE" | nl -ba | sed -n '1,140p'

Repository: rocket-admin/rocketadmin

Length of output: 312


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="backend/src/entities/ai/use-cases/request-ai-settings-and-widgets-creation.use.case.ts"
echo "== $FILE (line count) =="
wc -l "$FILE"

echo "== Implementation + catch (lines ~1-80) =="
awk 'NR>=1 && NR<=80 {print NR ":" $0}' "$FILE"

echo "== writeChunk + helpers (lines ~50-120) =="
awk 'NR>=45 && NR<=120 {print NR ":" $0}' "$FILE"

Repository: rocket-admin/rocketadmin

Length of output: 3790


Guard streaming writes/ends when the client disconnects; also replace string concatenation in writeChunk.

  • implementation calls writeChunk(...) and response.end() with no writable-state checks (including in the catch path), so a mid-scan disconnect can trigger write after end-style failures and break error handling.
  • writeChunk uses string concatenation (JSON.stringify(chunk) + '\n') instead of a template literal.
Proposed fix
 	public async implementation(inputData: RequestAISettingsCreationDs): Promise<void> {
 		const { connectionId, masterPwd, response } = inputData;
@@
 		try {
 			await this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection, (chunk) =>
 				this.writeChunk(response, chunk),
 			);
-			this.writeChunk(response, { type: 'complete' });
-			response.end();
+			if (this.isWritable(response)) {
+				this.writeChunk(response, { type: 'complete' });
+				response.end();
+			}
 		} catch (error) {
 			Sentry.captureException(error);
+			if (!this.isWritable(response)) {
+				return;
+			}
 			if (!response.headersSent) {
 				response.status(500).send({ error: 'An error occurred while processing your request.' });
 				return;
 			}
 			this.writeChunk(response, { type: 'error', message: getErrorMessage(error) });
 			response.end();
 		}
 	}
@@
 	private writeChunk(
 		response: Response,
 		chunk: { type: 'message'; text: string } | { type: 'complete' } | { type: 'error'; message: string },
 	): void {
-		response.write(JSON.stringify(chunk) + '\n');
+		if (!this.isWritable(response)) {
+			return;
+		}
+		response.write(`${JSON.stringify(chunk)}\n`);
 	}
+
+	private isWritable(response: Response): boolean {
+		return !response.writableEnded && !response.destroyed;
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection, (chunk) =>
this.writeChunk(response, chunk),
);
this.writeChunk(response, { type: 'complete' });
response.end();
} catch (error) {
Sentry.captureException(error);
if (!response.headersSent) {
response.status(500).send({ error: 'An error occurred while processing your request.' });
return;
}
this.writeChunk(response, { type: 'error', message: getErrorMessage(error) });
response.end();
await this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection, (chunk) =>
this.writeChunk(response, chunk),
);
if (this.isWritable(response)) {
this.writeChunk(response, { type: 'complete' });
response.end();
}
} catch (error) {
Sentry.captureException(error);
if (!this.isWritable(response)) {
return;
}
if (!response.headersSent) {
response.status(500).send({ error: 'An error occurred while processing your request.' });
return;
}
this.writeChunk(response, { type: 'error', message: getErrorMessage(error) });
response.end();
}
}
private writeChunk(
response: Response,
chunk: { type: 'message'; text: string } | { type: 'complete' } | { type: 'error'; message: string },
): void {
if (!this.isWritable(response)) {
return;
}
response.write(`${JSON.stringify(chunk)}\n`);
}
private isWritable(response: Response): boolean {
return !response.writableEnded && !response.destroyed;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@backend/src/entities/ai/use-cases/request-ai-settings-and-widgets-creation.use.case.ts`
around lines 37 - 49, The try/catch must avoid writing or ending the HTTP stream
after the client disconnects: before calling this.writeChunk(...) or
response.end() (both in the happy path and inside the catch) check the response
writable state (e.g., response.writableEnded || response.finished ||
response.destroyed) and bail out early if closed; apply this guard around uses
of sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI callbacks as
well so the chunk callback returns immediately when the response is no longer
writable. Also update writeChunk to use a template literal instead of string
concatenation when serializing (replace JSON.stringify(chunk) + '\n' with a
template literal) so serialization is consistent.

}
}

private setupResponseHeaders(response: Response): void {
response.setHeader('Content-Type', 'text/event-stream');
response.setHeader('Cache-Control', 'no-cache');
response.setHeader('Connection', 'keep-alive');
}

private writeChunk(
response: Response,
chunk: { type: 'message'; text: string } | { type: 'complete' } | { type: 'error'; message: string },
): void {
response.write(JSON.stringify(chunk) + '\n');
}
}
9 changes: 6 additions & 3 deletions backend/src/entities/ai/user-ai-requests-v2.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { isTest } from '../../helpers/app/is-test.js';
import { ValidationHelper } from '../../helpers/validators/validation-helper.js';
import { SentryInterceptor } from '../../interceptors/sentry.interceptor.js';
import { IAISettingsAndWidgetsCreation, IRequestInfoFromTableV2 } from './ai-use-cases.interface.js';
import { RequestAISettingsCreationDs } from './application/data-structures/request-ai-settings-creation.ds.js';
import { RequestInfoFromTableDSV2 } from './application/data-structures/request-info-from-table.ds.js';
import { RequestInfoFromTableBodyDTO } from './application/dto/request-info-from-table-body.dto.js';

Expand Down Expand Up @@ -88,7 +89,7 @@ export class UserAIRequestsControllerV2 {
})
@ApiResponse({
status: 200,
description: 'AI settings and widgets creation job has been queued.',
description: 'Streams progress of the AI settings and widgets creation job as newline-delimited JSON chunks.',
})
@UseGuards(ConnectionEditGuard)
@Timeout(!isTest() ? TimeoutDefaults.AI : TimeoutDefaults.AI_TEST)
Expand All @@ -97,12 +98,14 @@ export class UserAIRequestsControllerV2 {
@SlugUuid('connectionId') connectionId: string,
@MasterPassword() masterPassword: string,
@UserId() userId: string,
@Res({ passthrough: true }) response: Response,
): Promise<void> {
const connectionData = {
const inputData: RequestAISettingsCreationDs = {
connectionId,
masterPwd: masterPassword,
cognitoUserName: userId,
response,
};
return await this.requestAISettingsAndWidgetsCreationUseCase.execute(connectionData, InTransactionEnum.OFF);
return await this.requestAISettingsAndWidgetsCreationUseCase.execute(inputData, InTransactionEnum.OFF);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { IGlobalDatabaseContext } from '../../../common/application/global-datab
import { BaseType } from '../../../common/data-injection.tokens.js';
import { AccessLevelEnum } from '../../../enums/access-level.enum.js';
import { Messages } from '../../../exceptions/text/messages.js';
import { isTest } from '../../../helpers/app/is-test.js';
import { Encryptor } from '../../../helpers/encryption/encryptor.js';
import { isConnectionTypeAgent } from '../../../helpers/is-connection-entity-agent.js';
import { slackPostMessage } from '../../../helpers/slack/slack-post-message.js';
Expand Down Expand Up @@ -125,7 +126,12 @@ export class CreateConnectionUseCase
const connectionRO = buildCreatedConnectionDs(savedConnection, token, masterPwd);
return connectionRO;
} finally {
if (connectionCopy && isConnectionTestedSuccessfully && !isConnectionTypeAgent(connectionCopy.type)) {
if (
connectionCopy &&
isConnectionTestedSuccessfully &&
!isConnectionTypeAgent(connectionCopy.type) &&
!isTest()
) {
// Fire-and-forget: run AI scan in background without blocking response
this.sharedJobsService.scanDatabaseAndCreateSettingsAndWidgetsWithAI(connectionCopy).catch((error) => {
console.error('Background AI scan failed:', error);
Expand Down
42 changes: 39 additions & 3 deletions backend/src/entities/shared-jobs/shared-jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import PQueue from 'p-queue';
import { IGlobalDatabaseContext } from '../../common/application/global-database-context.interface.js';
import { BaseType } from '../../common/data-injection.tokens.js';
import { WidgetTypeEnum } from '../../enums/widget-type.enum.js';
import { isTest } from '../../helpers/app/is-test.js';
import { getErrorMessage } from '../../helpers/get-error-message.js';
import { ValidationHelper } from '../../helpers/validators/validation-helper.js';
import { AiService } from '../ai/ai.service.js';
Expand All @@ -18,6 +17,11 @@ import { TableSettingsEntity } from '../table-settings/common-table-settings/tab
import { buildEmptyTableSettings } from '../table-settings/common-table-settings/utils/build-empty-table-settings.js';
import { buildNewTableSettingsEntity } from '../table-settings/common-table-settings/utils/build-new-table-settings-entity.js';
import { TableWidgetEntity } from '../widget/table-widget.entity.js';
import { emitSettingsMessages } from './utils/emit-settings-messages.util.js';

export type AIScanProgressChunk = { type: 'message'; text: string };

export type AIScanProgressCallback = (chunk: AIScanProgressChunk) => void;

@Injectable()
export class SharedJobsService {
Expand All @@ -27,11 +31,22 @@ export class SharedJobsService {
private readonly aiService: AiService,
) {}

public async scanDatabaseAndCreateSettingsAndWidgetsWithAI(connection: ConnectionEntity): Promise<void> {
if (!connection || isTest()) {
public async scanDatabaseAndCreateSettingsAndWidgetsWithAI(
connection: ConnectionEntity,
onProgress?: AIScanProgressCallback,
): Promise<void> {
if (!connection) {
return;
}
const emit: AIScanProgressCallback = (chunk) => {
if (onProgress) {
onProgress(chunk);
}
};
const message = (text: string): void => emit({ type: 'message', text });

console.info(`Starting AI scan for connection with id "${connection.id}"`);
message(`Starting AI scan for connection "${connection.title || connection.id}"`);
try {
const existingTableSettings = await this._dbContext.tableSettingsRepository.find({
where: {
Expand All @@ -43,14 +58,18 @@ export class SharedJobsService {

const existingTableNames = new Set(existingTableSettings.map((setting) => setting.table_name));
const dao = getDataAccessObject(connection);
message('Fetching tables from database');
const tables: Array<TableDS> = await dao.getTablesFromDB();
const tablesToScan = tables.filter((table) => !existingTableNames.has(table.tableName));

if (tablesToScan.length === 0) {
console.info(`No new tables to scan for connection with id "${connection.id}"`);
message('No new tables to scan — all tables already have settings');
return;
}

message(`Found ${tablesToScan.length} new ${tablesToScan.length === 1 ? 'table' : 'tables'} to scan`);

const queue = new PQueue({ concurrency: 4 });
const tablesInformationResults = await Promise.all(
tablesToScan.map((table) =>
Expand All @@ -59,13 +78,15 @@ export class SharedJobsService {
const structure = await dao.getTableStructure(table.tableName, null);
const primaryColumns = await dao.getTablePrimaryColumns(table.tableName, null);
const foreignKeys = await dao.getTableForeignKeys(table.tableName, null);
message(`Inspected structure of table "${table.tableName}"`);
return {
table_name: table.tableName,
structure,
primaryColumns,
foreignKeys,
};
} catch (error) {
message(`Failed to inspect table "${table.tableName}": ${getErrorMessage(error)}`);
console.error(`Error getting table information for "${table.tableName}": ${getErrorMessage(error)}`);
return null;
}
Expand All @@ -77,18 +98,26 @@ export class SharedJobsService {

if (tablesInformation.length === 0) {
console.info(`No valid tables to process for connection with id "${connection.id}"`);
message('No valid tables to process');
return;
}

console.info(`Processing ${tablesInformation.length} tables with AI for connection "${connection.id}"`);
message(
`Generating settings with AI for ${tablesInformation.length} ${tablesInformation.length === 1 ? 'table' : 'tables'}`,
);
const generatedTableSettings = await this.aiService.generateNewTableSettingsWithAI(tablesInformation);

if (generatedTableSettings.length === 0) {
console.info(`No table settings generated by AI for connection with id "${connection.id}"`);
message('AI did not produce any table settings');
return;
}

console.info(`AI generated settings for ${generatedTableSettings.length} tables`);
message(
`AI returned settings for ${generatedTableSettings.length} ${generatedTableSettings.length === 1 ? 'table' : 'tables'}`,
);

const widgetsByTable = new Map<string, Array<TableWidgetEntity>>();
for (const setting of generatedTableSettings) {
Expand All @@ -106,6 +135,7 @@ export class SharedJobsService {
const validateSettingsDS = buildValidateTableSettingsDS(setting);
const errors = await dao.validateSettings(validateSettingsDS, setting.table_name, undefined);
if (errors.length > 0) {
message(`Validation failed for table "${setting.table_name}", skipping`);
console.error(`Validation errors for table "${setting.table_name}":`, errors);
return null;
}
Expand All @@ -119,6 +149,7 @@ export class SharedJobsService {
const savedSettings = await this._dbContext.tableSettingsRepository.save(settingsToSave);
const widgetsToSave: Array<TableWidgetEntity> = [];
for (const savedSetting of savedSettings) {
emitSettingsMessages(savedSetting, message);
const widgets = widgetsByTable.get(savedSetting.table_name);
if (widgets && widgets.length > 0) {
for (const widget of widgets) {
Expand All @@ -131,17 +162,22 @@ export class SharedJobsService {
widgetEntity.description = widget.description || null;
widgetEntity.settings = savedSetting;
widgetsToSave.push(widgetEntity);
message(
`Added ${widget.widget_type} widget for table "${savedSetting.table_name}" on column "${widget.field_name}"`,
);
}
}
}

if (widgetsToSave.length > 0) {
await this._dbContext.tableWidgetsRepository.save(widgetsToSave);
}
message(`Finished setup for ${savedSettings.length} ${savedSettings.length === 1 ? 'table' : 'tables'}`);
}
} catch (error) {
console.error('Error during AI scan and creation of settings/widgets: ', error);
Sentry.captureException(error);
throw error;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { TableSettingsEntity } from '../../table-settings/common-table-settings/table-settings.entity.js';

export function emitSettingsMessages(setting: TableSettingsEntity, emit: (text: string) => void): void {
const tableName = setting.table_name;
const params: Array<[string, string]> = [];
if (setting.display_name) {
params.push(['display_name', `"${setting.display_name}"`]);
}
if (setting.search_fields && setting.search_fields.length > 0) {
params.push(['search_fields', setting.search_fields.join(', ')]);
}
if (setting.readonly_fields && setting.readonly_fields.length > 0) {
params.push(['readonly_fields', setting.readonly_fields.join(', ')]);
}
if (setting.columns_view && setting.columns_view.length > 0) {
params.push(['columns_view', setting.columns_view.join(', ')]);
}
if (setting.ordering) {
params.push(['ordering', String(setting.ordering)]);
}
if (setting.ordering_field) {
params.push(['ordering_field', `"${setting.ordering_field}"`]);
}
if (setting.identity_column) {
params.push(['identity_column', `"${setting.identity_column}"`]);
}

if (params.length === 0) {
emit(`Set up settings for table "${tableName}" with default parameters`);
return;
}
for (const [name, value] of params) {
emit(`Set up settings for table "${tableName}", ${name} parameter set to ${value}`);
}
}
Loading
Loading