Skip to content

Commit

Permalink
fix(replay): fix replay launch in new setup on win
Browse files Browse the repository at this point in the history
  • Loading branch information
blakebyrnes committed Jan 15, 2021
1 parent dcc6002 commit add1b97
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 101 deletions.
10 changes: 7 additions & 3 deletions client/lib/Agent.ts
Expand Up @@ -380,14 +380,18 @@ class SessionConnection {
return err;
});

const session = this.getCoreSessionOrReject();

const defaultShowReplay = Boolean(JSON.parse(process.env.SA_SHOW_REPLAY ?? 'true'));

if (showReplay ?? defaultShowReplay) {
scriptInstance.launchReplay(session);
this._coreSession = this._coreSession.then(async x => {
if (x instanceof Error) return x;
await scriptInstance.launchReplay(x);
return x;
});
}

const session = this.getCoreSessionOrReject();

const coreTab = session.then(x => x.firstTab);
this._activeTab = createTab(this.agent, coreTab);
this._tabs = [this._activeTab];
Expand Down
29 changes: 16 additions & 13 deletions client/lib/ScriptInstance.ts
@@ -1,7 +1,10 @@
import { v1 as uuidv1 } from 'uuid';
import IScriptInstanceMeta from '@secret-agent/core-interfaces/IScriptInstanceMeta';
import Log from '@secret-agent/commons/Logger';
import CoreSession from './CoreSession';

const { log } = Log(module);

export default class ScriptInstance {
public readonly id: string = uuidv1();
public readonly entrypoint: string = process.argv[1];
Expand All @@ -16,21 +19,21 @@ export default class ScriptInstance {
};
}

public launchReplay(coreSession: Promise<CoreSession>): void {
public async launchReplay(session: CoreSession): Promise<void> {
// eslint-disable-next-line global-require
const { replay } = require('@secret-agent/replay/index');
coreSession
.then(async session => {
return replay({
scriptInstanceId: this.id,
scriptStartDate: this.startDate,
sessionsDataLocation: session.sessionsDataLocation,
replayApiUrl: await session.replayApiUrl,
sessionId: session.sessionId,
sessionName: session.sessionName,
});
})
.catch(() => null);
try {
await replay({
scriptInstanceId: this.id,
scriptStartDate: this.startDate,
sessionsDataLocation: session.sessionsDataLocation,
replayApiUrl: await session.replayApiUrl,
sessionId: session.sessionId,
sessionName: session.sessionName,
});
} catch (error) {
log.warn('Error launching Replay application', { sessionId: session.sessionId, error });
}
}

public generateSessionName(name: string, shouldCleanName = true): string {
Expand Down
6 changes: 5 additions & 1 deletion core/index.ts
Expand Up @@ -66,8 +66,12 @@ export default class Core {

const host = await this.server.address;

log.info('Core started', {
coreHost: await Core.server.address,
sessionId: null,
});
// if started as a subprocess, send back the host
if (process.ppid) process.send(host);
if (process.send) process.send(host);
}

public static async shutdown(force = false): Promise<void> {
Expand Down
47 changes: 24 additions & 23 deletions core/server/ConnectionToReplay.ts
Expand Up @@ -47,7 +47,13 @@ export default class ConnectionToReplay {

this.subscribeToTables();

await waitUntilAllComplete(this.pendingPushes);
let resolved = -1;
// sort of complicated, but we're checking that everything has been sent and completed
while (this.pendingPushes.length > resolved) {
resolved = this.pendingPushes.length;
await Promise.all([...this.pendingPushes]).catch(() => null);
await new Promise(resolve => setTimeout(resolve, 100));
}

await this.send('trailer', { messages: this.pendingPushes.length });
} catch (error) {
Expand All @@ -57,6 +63,8 @@ export default class ConnectionToReplay {
...this.lookupArgs,
});
}
// do one last wait to catch errors and everything else
await Promise.all(this.pendingPushes).catch(() => null);
}

public close(error?: Error) {
Expand All @@ -83,12 +91,6 @@ export default class ConnectionToReplay {
const db = sessionLookup.sessionDb;
this.session = db.session.get();

if (!this.session.closeDate) {
db.frameNavigations.subscribe(() => this.checkState());
db.session.subscribe(() => this.checkState());
this.checkState();
}

db.tabs.subscribe(tabs => {
for (const tab of tabs) {
if (!this.tabs.has(tab.tabId)) {
Expand Down Expand Up @@ -192,21 +194,29 @@ export default class ConnectionToReplay {
headers: resource.responseHeaders ? JSON.parse(resource.responseHeaders) : {},
});
}
if (resourcesToSend.length) this.send('resources', resourcesToSend);
while (resourcesToSend.length) {
const toSend = resourcesToSend.splice(0, 50);
this.send('resources', toSend);
}
});

db.frameNavigations.subscribe(() => this.checkState());
db.session.subscribe(() => this.checkState());
this.checkState();
if (this.session.closeDate) {
this.sessionClosedPromise.resolve();
setImmediate(() => this.sessionClosedPromise.resolve());
}
}

private checkState(): void {
if (!this.sessionLookup?.sessionState || this.sessionClosedPromise.isResolved) return;
if (!this.sessionLookup?.sessionState) return;
const scriptState = this.sessionLookup.sessionState.checkForResponsive();

if (scriptState.closeDate && !this.sessionClosedPromise.isResolved) {
this.send('script-state', scriptState);
// give sqlite time to flush out published changes
setTimeout(() => this.sessionClosedPromise.resolve(), 500);
return;
}

this.lastScriptState = scriptState;
Expand Down Expand Up @@ -234,28 +244,19 @@ export default class ConnectionToReplay {
}

private send(event: string, data: any): void {
if (Array.isArray(data) && data.length === 0) return;
if (Array.isArray(data) && data.length === 0) {
return;
}

const json = JSON.stringify({ event, data }, (_, value) => {
if (value !== null) return value;
});

const sendPromise = this.sendMessage(json);
const sendPromise = this.sendMessage(json).catch(err => err);
if (sendPromise) this.pendingPushes.push(sendPromise);
}
}

async function waitUntilAllComplete(pendingPushes: Promise<any>[]) {
const resolvedPromises = new Set<Promise<any>>();
// sort of complicated, but we're checking that everything has been sent and completed
while (pendingPushes.length > resolvedPromises.size) {
const allPending = [...pendingPushes];
await Promise.all(allPending.map(x => x.catch(err => err)));
for (const pending of allPending) resolvedPromises.add(pending);
await new Promise(setImmediate);
}
}

interface IScriptState {
lastCommandName: string;
lastActivityDate: Date;
Expand Down
14 changes: 10 additions & 4 deletions core/server/index.ts
Expand Up @@ -46,7 +46,10 @@ export default class CoreServer {

public async close(waitForOpenConnections = true): Promise<void> {
try {
log.info('ReplayServer.closeSessions', { waitForOpenConnections, sessionId: null });
const logid = log.info('CoreServer.ClosingSessions', {
waitForOpenConnections,
sessionId: null,
});

const closeReplayPromises = [...this.wsServer.clients].map(async ws => {
if (waitForOpenConnections) {
Expand All @@ -61,6 +64,7 @@ export default class CoreServer {
this.httpServer.close(() => setImmediate(resolve));
}),
]);
log.info('CoreServer.ClosedSessions', { parentLogId: logid, sessionId: null });
} catch (error) {
log.error('Error closing socket connections', {
error,
Expand Down Expand Up @@ -119,9 +123,11 @@ function isOpen(ws: WebSocket) {
return ws.readyState === WebSocket.OPEN;
}

function wsSend(ws: WebSocket, json: string): Promise<void> {
if (!isOpen(ws)) return null;
return new Promise<void>((resolve, reject) => {
async function wsSend(ws: WebSocket, json: string): Promise<void> {
// give it a second to breath
await new Promise(process.nextTick);
if (!isOpen(ws)) return;
await new Promise<void>((resolve, reject) => {
ws.send(json, error => {
if (error) reject(error);
else resolve();
Expand Down
30 changes: 14 additions & 16 deletions core/start.ts
Expand Up @@ -6,20 +6,18 @@ import Core from '.';
const { log } = Log(module);

(async () => {
try {
const startOptions: ICoreConfigureOptions =
process.argv.length > 2 ? JSON.parse(process.argv[2]) : {};
const startOptions: ICoreConfigureOptions =
process.argv.length > 2 ? JSON.parse(process.argv[2]) : {};

Core.onShutdown = () => {
log.info('Exiting Core Process');
process.exit();
};
await Core.start(startOptions, !process.env.SA_TEMPORARY_CORE);
} catch (error) {
log.error('ERROR starting core', {
error,
sessionId: null,
});
process.exit(1);
}
})();
Core.onShutdown = () => {
log.info('Exiting Core Process');
process.exit();
};
await Core.start(startOptions, !process.env.SA_TEMPORARY_CORE);
})().catch(error => {
log.error('ERROR starting core', {
error,
sessionId: null,
});
process.exit(1);
});
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -13,6 +13,7 @@
"build:ci": "yarn tsc && cd build && yarn install",
"copy:node_modules": "shx cp -r node_modules \"build/node_modules\"",
"copy:build": "node copy-build.js",
"replay": "yarn workspace @secret-agent/replay start",
"tsc": "tsc -b tsconfig.json && yarn copy:build && node prepare-build.js && yarn workspace @secret-agent/replay build:backend",
"watch": "tsc-watch -b -w tsconfig.json --onSuccess \"yarn workspace @secret-agent/replay build:backend-paths\"",
"watch:dist": "tsc -b -w tsconfig.dist.json",
Expand Down
1 change: 1 addition & 0 deletions replay/backend/Application.ts
Expand Up @@ -47,6 +47,7 @@ export default class Application {
await this.overlayManager.start();
this.registrationServer = new ScriptRegistrationServer(this.registerScript.bind(this));
Menu.setApplicationMenu(generateAppMenu());
if (process.argv.length <= 2) this.createWindowIfNeeded();
}

public getPageUrl(page: string) {
Expand Down
8 changes: 7 additions & 1 deletion replay/backend/api/ScriptRegistrationServer.ts
@@ -1,8 +1,12 @@
import * as Http from 'http';
import * as Fs from 'fs';
import { IncomingMessage, ServerResponse } from 'http';
import { AddressInfo } from 'net';
import IReplayMeta from '~shared/interfaces/IReplayMeta';
import ReplayApi from '~backend/api/index';
import { getInstallDirectory } from '~install/Utils';

const apiPath = `${getInstallDirectory()}/api.txt`;

export default class ScriptRegistrationServer {
private server: Http.Server;
Expand All @@ -13,11 +17,13 @@ export default class ScriptRegistrationServer {
this.server = new Http.Server(this.handleRequest.bind(this));
this.server.listen(0, () => {
const port = (this.server.address() as AddressInfo).port;
console.error(`REPLAY REGISTRATION API [http://localhost:${port}]`);
console.log('ScriptRegistrationServer.started', port);
Fs.writeFileSync(apiPath, Buffer.from(`http://localhost:${port}`));
});
}

public close() {
Fs.writeFileSync(apiPath, '');
this.server.close();
}

Expand Down
8 changes: 6 additions & 2 deletions replay/backend/api/index.ts
Expand Up @@ -203,12 +203,12 @@ export default class ReplayApi {
const args = [];
if (!this.serverStartPath) {
const replayDir = __dirname.split(`${Path.sep}replay${Path.sep}`).shift();
this.serverStartPath = Path.resolve(replayDir, 'core');
this.serverStartPath = Path.resolve(replayDir, 'core', 'start');
}
if (!this.nodePath) this.nodePath = 'node';
console.log('Launching Replay API Server at %s', this.serverStartPath);
const child = spawn(`${this.nodePath} "${this.serverStartPath}"`, args, {
stdio: ['ignore', 'inherit', 'inherit', 'ipc'],
stdio: ['ignore', 'pipe', 'pipe', 'ipc'],
shell: true,
windowsHide: true,
env: {
Expand All @@ -217,6 +217,10 @@ export default class ReplayApi {
DEBUG: process.env.DEBUG,
},
});

child.on('error', console.error);
child.stdout.pipe(process.stdout);
child.stderr.pipe(process.stderr);
this.serverProcess = child;
this.serverProcess.once('exit', () => {
this.serverProcess = null;
Expand Down
1 change: 1 addition & 0 deletions replay/backend/models/Window.ts
Expand Up @@ -173,6 +173,7 @@ export default class Window {

public async loadReplayTab(id: string) {
await this.replayView.loadTab(id);
await this.fixBounds();
}

public replayOnFocus() {
Expand Down

0 comments on commit add1b97

Please sign in to comment.