Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Upsert workflow statistics to suppress unnecessary error messages #5863

Merged
merged 2 commits into from Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 83 additions & 27 deletions packages/cli/src/events/WorkflowStatistics.ts
@@ -1,10 +1,79 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import config from '@/config';

enum StatisticsUpsertResult {
insert = 'insert',
update = 'update',
failed = 'failed',
}

async function upsertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
): Promise<StatisticsUpsertResult> {
const dbType = config.getEnv('database.type');
const tablePrefix = config.getEnv('database.tablePrefix');
try {
if (dbType === 'sqlite') {
await Db.collections.WorkflowStatistics
.query(`INSERT INTO "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent")
VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
ON CONFLICT (workflowId, name) DO UPDATE SET
count = count + 1,
latestEvent = CURRENT_TIMESTAMP returning count
`);
// SQLite does not offer a reliable way to know whether or not an insert or update happened.
// We'll use a naive approach in this case. Query again after and it might cause us to miss the
// first production execution sometimes due to concurrency, but it's the only way.

const counter = await Db.collections.WorkflowStatistics.findOne({
where: {
name: eventName,
workflowId,
},
});

if (counter?.count === 1) {
return StatisticsUpsertResult.insert;
}
return StatisticsUpsertResult.update;
} else if (dbType === 'postgresdb') {
const queryResult = (await Db.collections.WorkflowStatistics

Check warning on line 48 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L48

Added line #L48 was not covered by tests
.query(`insert into "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent")
values (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) on conflict ("name", "workflowId")
do update set "count" = "${tablePrefix}workflow_statistics"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP returning *;`)) as Array<{
count: number;
}>;
if (queryResult[0].count === 1) {
return StatisticsUpsertResult.insert;

Check warning on line 55 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L55

Added line #L55 was not covered by tests
}
return StatisticsUpsertResult.update;

Check warning on line 57 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L57

Added line #L57 was not covered by tests
} else {
const queryResult = (await Db.collections.WorkflowStatistics

Check warning on line 59 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L59

Added line #L59 was not covered by tests
.query(`insert into \`${tablePrefix}workflow_statistics\` (count,
latestEvent,
name,
workflowId)
values (1, NOW(), "${eventName}", "${workflowId}") ON DUPLICATE KEY UPDATE count = count + 1, latestEvent = NOW();`)) as {
affectedRows: number;
};
if (queryResult.affectedRows === 1) {
return StatisticsUpsertResult.insert;

Check warning on line 68 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L68

Added line #L68 was not covered by tests
}
// MySQL returns 2 affected rows on update
return StatisticsUpsertResult.update;

Check warning on line 71 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L71

Added line #L71 was not covered by tests
}
} catch (error) {
return StatisticsUpsertResult.failed;

Check warning on line 74 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L74

Added line #L74 was not covered by tests
}
}

export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
Expand All @@ -27,36 +96,23 @@
const workflowId = workflowData.id;
if (!workflowId) return;

// Try insertion and if it fails due to key conflicts then update the existing entry instead
try {
await Db.collections.WorkflowStatistics.insert({
count: 1,
name,
workflowId,
latestEvent: new Date(),
});

// If we're here we can check if we're sending the first production success metric
if (name !== StatisticsNames.productionSuccess) return;

// Get the owner of the workflow so we can send the metric
const owner = await getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
};
const upsertResult = await upsertWorkflowStatistics(name, workflowId);

// Send the metrics
await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
if (!(error instanceof QueryFailedError)) {
throw error;
if (
name === StatisticsNames.productionSuccess &&
upsertResult === StatisticsUpsertResult.insert
) {
const owner = await getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
};
// Send the metrics
await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
}

await Db.collections.WorkflowStatistics.update(
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
);
} catch (error) {
LoggerProxy.verbose('Unable to fire first workflow success telemetry event');

Check warning on line 115 in packages/cli/src/events/WorkflowStatistics.ts

View check run for this annotation

Codecov / codecov/patch

packages/cli/src/events/WorkflowStatistics.ts#L115

Added line #L115 was not covered by tests
}
}

Expand Down
12 changes: 8 additions & 4 deletions packages/cli/test/unit/Events.test.ts
Expand Up @@ -17,7 +17,11 @@ type WorkflowStatisticsRepository = Repository<WorkflowStatistics>;
jest.mock('@/Db', () => {
return {
collections: {
WorkflowStatistics: mock<WorkflowStatisticsRepository>(),
WorkflowStatistics: mock<WorkflowStatisticsRepository>({
findOne: jest.fn(() => ({
count: 1,
})),
}),
},
};
});
Expand Down Expand Up @@ -101,9 +105,9 @@ describe('Events', () => {

test('should not send metrics for updated entries', async () => {
// Call the function with a fail insert, ensure update is called *and* metrics aren't sent
workflowStatisticsRepository.insert.mockImplementationOnce(() => {
throw new QueryFailedError('invalid insert', [], '');
});
workflowStatisticsRepository.findOne.mockImplementationOnce(() => ({
count: 2,
}));
const workflow = {
id: '1',
name: '',
Expand Down