Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add lifecycle to record task history & retrieve via daemon #26593

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ import {
} from '../message-types/get-files-in-directory';
import { HASH_GLOB, HandleHashGlobMessage } from '../message-types/hash-glob';
import { NxWorkspaceFiles } from '../../native';
import { TaskRun } from '../../utils/task-history';
import {
HandleGetTaskHistoryForHashesMessage,
HandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';

const DAEMON_ENV_SETTINGS = {
NX_PROJECT_GLOB_CACHE: 'false',
Expand Down Expand Up @@ -312,6 +317,25 @@ export class DaemonClient {
return this.sendToDaemonViaQueue(message);
}

getTaskHistoryForHashes(hashes: string[]): Promise<{
[hash: string]: TaskRun[];
}> {
const message: HandleGetTaskHistoryForHashesMessage = {
type: 'GET_TASK_HISTORY_FOR_HASHES',
hashes,
};

return this.sendToDaemonViaQueue(message);
}

writeTaskRunsToHistory(taskRuns: TaskRun[]): Promise<void> {
const message: HandleWriteTaskRunsToHistoryMessage = {
type: 'WRITE_TASK_RUNS_TO_HISTORY',
taskRuns,
};
return this.sendMessageToDaemon(message);
}

async isServerAvailable(): Promise<boolean> {
return new Promise((resolve) => {
try {
Expand Down
38 changes: 38 additions & 0 deletions packages/nx/src/daemon/message-types/task-history.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { TaskRun } from '../../utils/task-history';

export const GET_TASK_HISTORY_FOR_HASHES =
'GET_TASK_HISTORY_FOR_HASHES' as const;

export type HandleGetTaskHistoryForHashesMessage = {
type: typeof GET_TASK_HISTORY_FOR_HASHES;
hashes: string[];
};

export function isHandleGetTaskHistoryForHashesMessage(
message: unknown
): message is HandleGetTaskHistoryForHashesMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === GET_TASK_HISTORY_FOR_HASHES
);
}

export const WRITE_TASK_RUNS_TO_HISTORY = 'WRITE_TASK_RUNS_TO_HISTORY' as const;

export type HandleWriteTaskRunsToHistoryMessage = {
type: typeof WRITE_TASK_RUNS_TO_HISTORY;
taskRuns: TaskRun[];
};

export function isHandleWriteTaskRunsToHistoryMessage(
message: unknown
): message is HandleWriteTaskRunsToHistoryMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === WRITE_TASK_RUNS_TO_HISTORY
);
}
9 changes: 9 additions & 0 deletions packages/nx/src/daemon/server/handle-get-task-history.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { getHistoryForHashes } from '../../utils/task-history';

export async function handleGetTaskHistoryForHashes(hashes: string[]) {
const history = await getHistoryForHashes(hashes);
return {
response: JSON.stringify(history),
description: 'handleGetTaskHistoryForHashes',
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { TaskRun, writeTaskRunsToHistory } from '../../utils/task-history';

export async function handleWriteTaskRunsToHistory(taskRuns: TaskRun[]) {
await writeTaskRunsToHistory(taskRuns);
return {
response: 'true',
description: 'handleWriteTaskRunsToHistory',
};
}
14 changes: 14 additions & 0 deletions packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ import {
import { handleGetFilesInDirectory } from './handle-get-files-in-directory';
import { HASH_GLOB, isHandleHashGlobMessage } from '../message-types/hash-glob';
import { handleHashGlob } from './handle-hash-glob';
import {
isHandleGetTaskHistoryForHashesMessage,
isHandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';
import { handleGetTaskHistoryForHashes } from './handle-get-task-history';
import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history';

let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined;
Expand Down Expand Up @@ -202,6 +208,14 @@ async function handleMessage(socket, data: string) {
await handleResult(socket, HASH_GLOB, () =>
handleHashGlob(payload.globs, payload.exclude)
);
} else if (isHandleGetTaskHistoryForHashesMessage(payload)) {
await handleResult(socket, 'GET_TASK_HISTORY_FOR_HASHES', () =>
handleGetTaskHistoryForHashes(payload.hashes)
);
} else if (isHandleWriteTaskRunsToHistoryMessage(payload)) {
await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () =>
handleWriteTaskRunsToHistory(payload.taskRuns)
);
} else {
await respondWithErrorAndExit(
socket,
Expand Down
4 changes: 2 additions & 2 deletions packages/nx/src/tasks-runner/default-tasks-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ export const defaultTasksRunner: TasksRunner<
(options as any)['parallel'] = Number((options as any)['maxParallel'] || 3);
}

options.lifeCycle.startCommand();
await options.lifeCycle.startCommand();
try {
return await runAllTasks(tasks, options, context);
} finally {
options.lifeCycle.endCommand();
await options.lifeCycle.endCommand();
}
};

Expand Down
36 changes: 21 additions & 15 deletions packages/nx/src/tasks-runner/life-cycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ export interface TaskMetadata {
}

export interface LifeCycle {
startCommand?(): void;
startCommand?(): void | Promise<void>;

endCommand?(): void;
endCommand?(): void | Promise<void>;

scheduleTask?(task: Task): void;
scheduleTask?(task: Task): void | Promise<void>;

/**
* @deprecated use startTasks
Expand All @@ -33,9 +33,12 @@ export interface LifeCycle {
*/
endTask?(task: Task, code: number): void;

startTasks?(task: Task[], metadata: TaskMetadata): void;
startTasks?(task: Task[], metadata: TaskMetadata): void | Promise<void>;

endTasks?(taskResults: TaskResult[], metadata: TaskMetadata): void;
endTasks?(
taskResults: TaskResult[],
metadata: TaskMetadata
): void | Promise<void>;

printTaskTerminalOutput?(
task: Task,
Expand All @@ -47,26 +50,26 @@ export interface LifeCycle {
export class CompositeLifeCycle implements LifeCycle {
constructor(private readonly lifeCycles: LifeCycle[]) {}

startCommand(): void {
async startCommand(): Promise<void> {
for (let l of this.lifeCycles) {
if (l.startCommand) {
l.startCommand();
await l.startCommand();
}
}
}

endCommand(): void {
async endCommand(): Promise<void> {
for (let l of this.lifeCycles) {
if (l.endCommand) {
l.endCommand();
await l.endCommand();
}
}
}

scheduleTask(task: Task): void {
async scheduleTask(task: Task): Promise<void> {
for (let l of this.lifeCycles) {
if (l.scheduleTask) {
l.scheduleTask(task);
await l.scheduleTask(task);
}
}
}
Expand All @@ -87,20 +90,23 @@ export class CompositeLifeCycle implements LifeCycle {
}
}

startTasks(tasks: Task[], metadata: TaskMetadata): void {
async startTasks(tasks: Task[], metadata: TaskMetadata): Promise<void> {
for (let l of this.lifeCycles) {
if (l.startTasks) {
l.startTasks(tasks, metadata);
await l.startTasks(tasks, metadata);
} else if (l.startTask) {
tasks.forEach((t) => l.startTask(t));
}
}
}

endTasks(taskResults: TaskResult[], metadata: TaskMetadata): void {
async endTasks(
taskResults: TaskResult[],
metadata: TaskMetadata
): Promise<void> {
for (let l of this.lifeCycles) {
if (l.endTasks) {
l.endTasks(taskResults, metadata);
await l.endTasks(taskResults, metadata);
} else if (l.endTask) {
taskResults.forEach((t) => l.endTask(t.task, t.code));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { serializeTarget } from '../../utils/serialize-target';
import { Task } from '../../config/task-graph';
import { output } from '../../utils/output';
import {
getHistoryForHashes,
TaskRun,
writeTaskRunsToHistory as writeTaskRunsToHistory,
} from '../../utils/task-history';
import { LifeCycle, TaskResult } from '../life-cycle';

export class TaskHistoryLifeCycle implements LifeCycle {
private startTimings: Record<string, number> = {};
private taskRuns: TaskRun[] = [];

startTasks(tasks: Task[]): void {
for (let task of tasks) {
this.startTimings[task.id] = new Date().getTime();
}
}

async endTasks(taskResults: TaskResult[]) {
const taskRuns: TaskRun[] = taskResults.map((taskResult) => ({
project: taskResult.task.target.project,
target: taskResult.task.target.target,
configuration: taskResult.task.target.configuration,
hash: taskResult.task.hash,
code: taskResult.code.toString(),
status: taskResult.status,
start: (
taskResult.task.startTime ?? this.startTimings[taskResult.task.id]
).toString(),
end: (taskResult.task.endTime ?? new Date().getTime()).toString(),
}));
this.taskRuns.push(...taskRuns);
}

async endCommand() {
await writeTaskRunsToHistory(this.taskRuns);
const history = await getHistoryForHashes(this.taskRuns.map((t) => t.hash));
const flakyTasks: string[] = [];

// check if any hash has different exit codes => flaky
for (let hash in history) {
if (
history[hash].length > 1 &&
history[hash].some((run) => run.code !== history[hash][0].code)
) {
flakyTasks.push(
serializeTarget(
history[hash][0].project,
history[hash][0].target,
history[hash][0].configuration
)
);
}
}
if (flakyTasks.length > 0) {
output.warn({
title: `Nx detected ${
flakyTasks.length === 1 ? 'a flaky task' : ' flaky tasks'
}`,
bodyLines: [
,
...flakyTasks.map((t) => ` ${t}`),
'',
`Flaky tasks can disrupt your CI pipeline. Automatically retry them with Nx Cloud. Learn more at https://nx.dev/ci/features/flaky-tasks`,
],
});
}
}
}
6 changes: 6 additions & 0 deletions packages/nx/src/tasks-runner/run-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createRunOneDynamicOutputRenderer } from './life-cycles/dynamic-run-one
import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph';
import {
NxJsonConfiguration,
readNxJson,
TargetDefaults,
TargetDependencies,
} from '../config/nx-json';
Expand All @@ -28,6 +29,8 @@ import { hashTasksThatDoNotDependOnOutputsOfOtherTasks } from '../hasher/hash-ta
import { daemonClient } from '../daemon/client/client';
import { StoreRunInformationLifeCycle } from './life-cycles/store-run-information-life-cycle';
import { createTaskHasher } from '../hasher/create-task-hasher';
import { TaskHistoryLifeCycle } from './life-cycles/task-history-life-cycle';
import { isNxCloudUsed } from '../utils/nx-cloud-utils';

async function getTerminalOutputLifeCycle(
initiatingProject: string,
Expand Down Expand Up @@ -325,6 +328,9 @@ function constructLifeCycles(lifeCycle: LifeCycle) {
if (process.env.NX_PROFILE) {
lifeCycles.push(new TaskProfilingLifeCycle(process.env.NX_PROFILE));
}
if (!isNxCloudUsed(readNxJson())) {
lifeCycles.push(new TaskHistoryLifeCycle());
}
return lifeCycles;
}

Expand Down
8 changes: 4 additions & 4 deletions packages/nx/src/tasks-runner/task-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export class TaskOrchestrator {
);
}

this.options.lifeCycle.scheduleTask(task);
await this.options.lifeCycle.scheduleTask(task);

return taskSpecificEnv;
}
Expand All @@ -176,7 +176,7 @@ export class TaskOrchestrator {
this.batchEnv
);
}
this.options.lifeCycle.scheduleTask(task);
await this.options.lifeCycle.scheduleTask(task);
})
);
}
Expand Down Expand Up @@ -520,7 +520,7 @@ export class TaskOrchestrator {

// region Lifecycle
private async preRunSteps(tasks: Task[], metadata: TaskMetadata) {
this.options.lifeCycle.startTasks(tasks, metadata);
await this.options.lifeCycle.startTasks(tasks, metadata);
}

private async postRunSteps(
Expand Down Expand Up @@ -573,7 +573,7 @@ export class TaskOrchestrator {
'cache-results-end'
);
}
this.options.lifeCycle.endTasks(
await this.options.lifeCycle.endTasks(
results.map((result) => {
const code =
result.status === 'success' ||
Expand Down
3 changes: 3 additions & 0 deletions packages/nx/src/utils/serialize-target.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function serializeTarget(project, target, configuration) {
return [project, target, configuration].filter((part) => !!part).join(':');
}
Loading