Skip to content

Commit

Permalink
fix(core): Prevent error messages due to statistics about data loading (
Browse files Browse the repository at this point in the history
#7824)

Statistics collection about the first time a workflow loads data simply
attempts an insert to db, and if it fails, we just ignore.

This was causing this query to fire against production workflows
multiple times, and since we want to insert only and detect whether the
insertion failed, performing a select first provides gains both in terms
of performance, as it's usually faster than trying an insertion as well
as preventing unnecessary noise in logs.

Github issue / Community forum post (link here to close automatically):

https://community.n8n.io/t/duplicate-key-value-violates-unique-constraint-workflow-statistics-pkey-still-happening/29283
#7256
https://community.n8n.io/t/error-log-arriving-in-postgres/30191
#7256

https://community.n8n.io/t/cant-launch-webhooks-unable-to-find-data-of-execution/31867

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
  • Loading branch information
krynble and netroy committed Nov 29, 2023
1 parent 6a8e7b1 commit c7d600a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import config from '@/config';
import type { StatisticsNames } from '../entities/WorkflowStatistics';
import { WorkflowStatistics } from '../entities/WorkflowStatistics';

type StatisticsInsertResult = 'insert' | 'failed';
type StatisticsInsertResult = 'insert' | 'failed' | 'alreadyExists';
type StatisticsUpsertResult = StatisticsInsertResult | 'update';

@Service()
Expand All @@ -21,6 +21,13 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
): Promise<StatisticsInsertResult> {
// Try to insert the data loaded statistic
try {
const exists = await this.findOne({
where: {
workflowId,
name: eventName,
},
});
if (exists) return 'alreadyExists';
await this.insert({
workflowId,
name: eventName,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/services/events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class EventsService extends EventEmitter {
StatisticsNames.dataLoaded,
workflowId,
);
if (insertResult === 'failed') return;
if (insertResult === 'failed' || insertResult === 'alreadyExists') return;

// Compile the metrics since this was a new data loaded event
const owner = await this.ownershipService.getWorkflowOwnerCached(workflowId);
Expand Down
53 changes: 53 additions & 0 deletions packages/cli/test/unit/repositories/workflowStatistics.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository';
import { DataSource, EntityManager, InsertResult, QueryFailedError } from 'typeorm';
import { mockInstance } from '../../shared/mocking';
import { mock, mockClear } from 'jest-mock-extended';
import { StatisticsNames, WorkflowStatistics } from '@/databases/entities/WorkflowStatistics';

const entityManager = mockInstance(EntityManager);
const dataSource = mockInstance(DataSource, { manager: entityManager });
dataSource.getMetadata.mockReturnValue(mock());
Object.assign(entityManager, { connection: dataSource });
const workflowStatisticsRepository = new WorkflowStatisticsRepository(dataSource);

describe('insertWorkflowStatistics', () => {
beforeEach(() => {
mockClear(entityManager.insert);
});
it('Successfully inserts data when it is not yet present', async () => {
entityManager.findOne.mockResolvedValueOnce(null);
entityManager.insert.mockResolvedValueOnce(mockInstance(InsertResult));

const insertionResult = await workflowStatisticsRepository.insertWorkflowStatistics(
StatisticsNames.dataLoaded,
'workflowId',
);

expect(insertionResult).toBe('insert');
});

it('Does not insert when data is present', async () => {
entityManager.findOne.mockResolvedValueOnce(mockInstance(WorkflowStatistics));
const insertionResult = await workflowStatisticsRepository.insertWorkflowStatistics(
StatisticsNames.dataLoaded,
'workflowId',
);

expect(insertionResult).toBe('alreadyExists');
expect(entityManager.insert).not.toHaveBeenCalled();
});

it('throws an error when insertion fails', async () => {
entityManager.findOne.mockResolvedValueOnce(null);
entityManager.insert.mockImplementation(async () => {
throw new QueryFailedError('Query', [], 'driver error');
});

const insertionResult = await workflowStatisticsRepository.insertWorkflowStatistics(
StatisticsNames.dataLoaded,
'workflowId',
);

expect(insertionResult).toBe('failed');
});
});

0 comments on commit c7d600a

Please sign in to comment.