Skip to content

Commit

Permalink
fix(core): shutdown the daemon when it is invalid (#15550)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrozenPandaz committed Mar 10, 2023
1 parent d5261fa commit ad12ab2
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 93 deletions.
4 changes: 2 additions & 2 deletions packages/nx/src/command-line/reset.ts
Expand Up @@ -3,12 +3,12 @@ import { daemonClient } from '../daemon/client/client';
import { cacheDir, projectGraphCacheDirectory } from '../utils/cache-directory';
import { output } from '../utils/output';

export function resetHandler() {
export async function resetHandler() {
output.note({
title: 'Resetting the Nx workspace cache and stopping the Nx Daemon.',
bodyLines: [`This might take a few minutes.`],
});
daemonClient.stop();
await daemonClient.stop();
removeSync(cacheDir);
if (projectGraphCacheDirectory !== cacheDir) {
removeSync(projectGraphCacheDirectory);
Expand Down
22 changes: 15 additions & 7 deletions packages/nx/src/daemon/cache.ts
@@ -1,26 +1,34 @@
import { existsSync, readJson, unlinkSync, writeJson } from 'fs-extra';
import {
existsSync,
readJson,
readJsonSync,
unlinkSync,
writeJson,
} from 'fs-extra';
import { join } from 'path';
import { DAEMON_DIR_FOR_CURRENT_WORKSPACE } from './tmp-dir';

export interface DaemonProcessJson {
processId: number;
}

const serverProcessJsonPath = join(
export const serverProcessJsonPath = join(
DAEMON_DIR_FOR_CURRENT_WORKSPACE,
'server-process.json'
);

async function readDaemonProcessJsonCache(): Promise<DaemonProcessJson | null> {
export async function readDaemonProcessJsonCache(): Promise<DaemonProcessJson | null> {
if (!existsSync(serverProcessJsonPath)) {
return null;
}
return await readJson(serverProcessJsonPath);
}

function deleteDaemonJsonProcessCache(): void {
export function deleteDaemonJsonProcessCache(): void {
try {
unlinkSync(serverProcessJsonPath);
if (getDaemonProcessIdSync() === process.pid) {
unlinkSync(serverProcessJsonPath);
}
} catch {}
}

Expand All @@ -41,12 +49,12 @@ export async function safelyCleanUpExistingProcess(): Promise<void> {
}

// Must be sync for the help output use case
export function getDaemonProcessId(): number | null {
export function getDaemonProcessIdSync(): number | null {
if (!existsSync(serverProcessJsonPath)) {
return null;
}
try {
const daemonProcessJson = require(serverProcessJsonPath);
const daemonProcessJson = readJsonSync(serverProcessJsonPath);
return daemonProcessJson.processId;
} catch {
return null;
Expand Down
30 changes: 13 additions & 17 deletions packages/nx/src/daemon/client/client.ts
@@ -1,15 +1,11 @@
import { workspaceRoot } from '../../utils/workspace-root';
import { ChildProcess, spawn, spawnSync } from 'child_process';
import { ChildProcess, spawn } from 'child_process';
import { openSync, readFileSync, statSync } from 'fs';
import { ensureDirSync, ensureFileSync } from 'fs-extra';
import { connect, Socket } from 'net';
import { connect } from 'net';
import { join } from 'path';
import { performance } from 'perf_hooks';
import { output } from '../../utils/output';
import {
safelyCleanUpExistingProcess,
writeDaemonJsonProcessCache,
} from '../cache';
import { FULL_OS_SOCKET_PATH, killSocketOrPath } from '../socket-utils';
import {
DAEMON_DIR_FOR_CURRENT_WORKSPACE,
Expand All @@ -24,6 +20,7 @@ import { readNxJson } from '../../config/configuration';
import { PromisedBasedQueue } from '../../utils/promised-based-queue';
import { Workspaces } from '../../config/workspaces';
import { Message, SocketMessenger } from './socket-messenger';
import { safelyCleanUpExistingProcess } from '../cache';

const DAEMON_ENV_SETTINGS = {
...process.env,
Expand Down Expand Up @@ -312,7 +309,6 @@ export class DaemonClient {
}

async startInBackground(): Promise<ChildProcess['pid']> {
await safelyCleanUpExistingProcess();
ensureDirSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE);
ensureFileSync(DAEMON_OUTPUT_LOG_FILE);

Expand All @@ -333,11 +329,6 @@ export class DaemonClient {
backgroundProcess.unref();
//

// Persist metadata about the background process so that it can be cleaned up later if needed
await writeDaemonJsonProcessCache({
processId: backgroundProcess.pid,
});

/**
* Ensure the server is actually available to connect to via IPC before resolving
*/
Expand All @@ -362,11 +353,16 @@ export class DaemonClient {
});
}

stop(): void {
spawnSync(process.execPath, ['../server/stop.js'], {
cwd: __dirname,
stdio: 'inherit',
});
async stop(): Promise<void> {
try {
await safelyCleanUpExistingProcess();
} catch (err) {
output.error({
title:
err?.message ||
'Something unexpected went wrong when stopping the server',
});
}

removeSocketDir();
output.log({ title: 'Daemon Server - Stopped' });
Expand Down
4 changes: 2 additions & 2 deletions packages/nx/src/daemon/client/generate-help-output.ts
@@ -1,5 +1,5 @@
import { spawnSync } from 'child_process';
import { getDaemonProcessId } from '../cache';
import { getDaemonProcessIdSync } from '../cache';
import { DAEMON_OUTPUT_LOG_FILE } from '../tmp-dir';

export function generateDaemonHelpOutput(): string {
Expand All @@ -16,7 +16,7 @@ export function generateDaemonHelpOutput(): string {
return 'Nx Daemon is not running.';
}

const pid = getDaemonProcessId();
const pid = getDaemonProcessIdSync();
return `Nx Daemon is currently running:
- Logs: ${DAEMON_OUTPUT_LOG_FILE}${
pid
Expand Down
110 changes: 66 additions & 44 deletions packages/nx/src/daemon/server/server.ts
Expand Up @@ -17,13 +17,15 @@ import {
respondWithErrorAndExit,
SERVER_INACTIVITY_TIMEOUT_MS,
storeOutputsWatcherSubscription,
storeProcessJsonSubscription,
storeSourceWatcherSubscription,
} from './shutdown-utils';
import {
convertChangeEventsToLogMessage,
subscribeToOutputsChanges,
subscribeToWorkspaceChanges,
FileWatcherCallback,
subscribeToServerProcessJsonChanges,
} from './watcher';
import { addUpdatedAndDeletedFiles } from './project-graph-incremental-recomputation';
import { existsSync, statSync } from 'fs';
Expand All @@ -45,6 +47,10 @@ import {
registeredFileWatcherSockets,
removeRegisteredFileWatcherSocket,
} from './file-watching/file-watcher-sockets';
import { nxVersion } from '../../utils/versions';
import { readJsonFile } from '../../utils/fileutils';
import { PackageJson } from '../../utils/package-json';
import { getDaemonProcessIdSync, writeDaemonJsonProcessCache } from '../cache';

let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined;
Expand Down Expand Up @@ -93,6 +99,8 @@ const server = createServer(async (socket) => {
removeRegisteredFileWatcherSocket(socket);
});
});
registerProcessTerminationListeners();
registerProcessServerJsonTracking();

async function handleMessage(socket, data: string) {
if (workspaceWatcherError) {
Expand All @@ -103,7 +111,7 @@ async function handleMessage(socket, data: string) {
);
}

if (lockFileChanged()) {
if (daemonIsOutdated()) {
await respondWithErrorAndExit(socket, `Lock files changed`, {
name: '',
message: 'LOCK-FILES-CHANGED',
Expand Down Expand Up @@ -175,38 +183,67 @@ function handleInactivityTimeout() {
}
}

process
.on('SIGINT', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGINT',
})
)
.on('SIGTERM', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGTERM',
})
)
.on('SIGHUP', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGHUP',
function registerProcessTerminationListeners() {
process
.on('SIGINT', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGINT',
})
)
.on('SIGTERM', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGTERM',
})
)
.on('SIGHUP', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGHUP',
})
);
}

async function registerProcessServerJsonTracking() {
storeProcessJsonSubscription(
await subscribeToServerProcessJsonChanges(async () => {
if (getDaemonProcessIdSync() !== process.pid) {
await handleServerProcessTermination({
server,
reason: 'this process is no longer the current daemon',
});
}
})
);
}

let existingLockHash: string | undefined;
const hasher = new HashingImpl();

function lockFileChanged(): boolean {
const hash = new HashingImpl();
function daemonIsOutdated(): boolean {
return nxVersionChanged() || lockFileHashChanged();
}

function nxVersionChanged(): boolean {
return nxVersion !== getInstalledNxVersion();
}

const nxPackageJsonPath = require.resolve('nx/package.json');
function getInstalledNxVersion() {
const { version } = readJsonFile<PackageJson>(nxPackageJsonPath);
return version;
}

function lockFileHashChanged(): boolean {
const lockHashes = [
join(workspaceRoot, 'package-lock.json'),
join(workspaceRoot, 'yarn.lock'),
join(workspaceRoot, 'pnpm-lock.yaml'),
]
.filter((file) => existsSync(file))
.map((file) => hash.hashFile(file));
const newHash = hash.hashArray(lockHashes);
.map((file) => hasher.hashFile(file));
const newHash = hasher.hashArray(lockHashes);
if (existingLockHash && newHash != existingLockHash) {
existingLockHash = newHash;
return true;
Expand Down Expand Up @@ -235,7 +272,7 @@ const handleWorkspaceChanges: FileWatcherCallback = async (
try {
resetInactivityTimeout(handleInactivityTimeout);

if (lockFileChanged()) {
if (daemonIsOutdated()) {
await handleServerProcessTermination({
server,
reason: 'Lock file changed',
Expand Down Expand Up @@ -312,6 +349,11 @@ const handleOutputsChanges: FileWatcherCallback = async (err, changeEvents) => {
};

export async function startServer(): Promise<Server> {
// Persist metadata about the background process so that it can be cleaned up later if needed
await writeDaemonJsonProcessCache({
processId: process.pid,
});

// See notes in socket-command-line-utils.ts on OS differences regarding clean up of existings connections.
if (!isWindows) {
killSocketOrPath();
Expand All @@ -323,7 +365,7 @@ export async function startServer(): Promise<Server> {
try {
serverLogger.log(`Started listening on: ${FULL_OS_SOCKET_PATH}`);
// this triggers the storage of the lock file hash
lockFileChanged();
daemonIsOutdated();

if (!getSourceWatcherSubscription()) {
storeSourceWatcherSubscription(
Expand Down Expand Up @@ -356,23 +398,3 @@ export async function startServer(): Promise<Server> {
}
});
}

export async function stopServer(): Promise<void> {
return new Promise((resolve, reject) => {
server.close((err) => {
if (err) {
/**
* If the server is running in a detached background process then server.close()
* will throw this error even if server is actually alive. We therefore only reject
* in case of any other unexpected errors.
*/
if (!err.message.startsWith('Server is not running')) {
return reject(err);
}
}

killSocketOrPath();
return resolve();
});
});
}
22 changes: 18 additions & 4 deletions packages/nx/src/daemon/server/shutdown-utils.ts
Expand Up @@ -3,6 +3,7 @@ import type { Server, Socket } from 'net';
import { serverLogger } from './logger';
import { serializeResult } from '../socket-utils';
import type { AsyncSubscription } from '@parcel/watcher';
import { deleteDaemonJsonProcessCache } from '../cache';

export const SERVER_INACTIVITY_TIMEOUT_MS = 10800000 as const; // 10800000 ms = 3 hours

Expand All @@ -13,18 +14,24 @@ export function getSourceWatcherSubscription() {
return sourceWatcherSubscription;
}

export function getOutputsWatcherSubscription() {
return outputsWatcherSubscription;
}

export function storeSourceWatcherSubscription(s: AsyncSubscription) {
sourceWatcherSubscription = s;
}

export function getOutputsWatcherSubscription() {
return outputsWatcherSubscription;
}

export function storeOutputsWatcherSubscription(s: AsyncSubscription) {
outputsWatcherSubscription = s;
}

let processJsonSubscription: AsyncSubscription | undefined;

export function storeProcessJsonSubscription(s: AsyncSubscription) {
processJsonSubscription = s;
}

interface HandleServerProcessTerminationParams {
server: Server;
reason: string;
Expand All @@ -36,6 +43,7 @@ export async function handleServerProcessTermination({
}: HandleServerProcessTerminationParams) {
try {
server.close();
deleteDaemonJsonProcessCache();
if (sourceWatcherSubscription) {
await sourceWatcherSubscription.unsubscribe();
serverLogger.watcherLog(
Expand All @@ -48,6 +56,12 @@ export async function handleServerProcessTermination({
`Unsubscribed from changes within: ${workspaceRoot} (outputs)`
);
}
if (processJsonSubscription) {
await processJsonSubscription.unsubscribe();
serverLogger.watcherLog(
`Unsubscribed from changes within: ${workspaceRoot} (server-process.json)`
);
}
serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);
Expand Down

1 comment on commit ad12ab2

@vercel
Copy link

@vercel vercel bot commented on ad12ab2 Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

nx-dev – ./

nx-dev-nrwl.vercel.app
nx-five.vercel.app
nx.dev
nx-dev-git-master-nrwl.vercel.app

Please sign in to comment.