/
observatory.ts
162 lines (144 loc) · 5.41 KB
/
observatory.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import { URL } from 'url';
// @ts-ignore
import { Client } from 'rpc-websockets';
import { deserialize } from '../../../finder/nodejs/lib/deserializer';
import { FlutterDriver } from '../driver';
import { log } from '../logger';
const MAX_RETRY_COUNT = 10;
const RETRY_BACKOFF = 300000;
class WebSocketDummy {}
export type NullableWebSocketDummy = WebSocketDummy | null;
// SOCKETS
export const connectSocket = async (dartObservatoryURL: string) => {
let retryCount = 0;
let connectedSocket: NullableWebSocketDummy = null;
while (retryCount < MAX_RETRY_COUNT && !connectedSocket) {
if (retryCount > 0) {
log.info(
`Waiting ` + RETRY_BACKOFF / 1000 + ` seconds before trying...`,
);
await new Promise((r) => setTimeout(r, RETRY_BACKOFF));
}
log.info(`Attempt #` + (retryCount + 1));
const connectedPromise = new Promise<NullableWebSocketDummy>((resolve) => {
log.info(
`Connecting to Dart Observatory: ${dartObservatoryURL}`,
);
const socket = new Client(dartObservatoryURL);
const removeListenerAndResolve = (r: NullableWebSocketDummy) => {
socket.removeListener(`error`, onErrorListener);
socket.removeListener(`timeout`, onTimeoutListener);
socket.removeListener(`open`, onOpenListener);
resolve(r);
};
// Add an 'error' event handler for the client socket
const onErrorListener = (ex) => {
log.error(ex);
log.error(
`Check Dart Observatory URI ${dartObservatoryURL}`,
);
removeListenerAndResolve(null);
};
socket.on(`error`, onErrorListener);
// Add a 'close' event handler for the client socket
socket.on(`close`, () => {
log.info(`Connection to ${dartObservatoryURL} closed`);
// @todo do we need to set this.socket = null?
});
// Add a 'timeout' event handler for the client socket
const onTimeoutListener = () => {
log.error(`Connection to ${dartObservatoryURL} timed out`);
removeListenerAndResolve(null);
};
socket.on(`timeout`, onTimeoutListener);
const onOpenListener = async () => {
try {
// tslint:disable-next-line:ban-types
const originalSocketCall: Function = socket.call;
socket.call = async (...args: any) => {
try {
// `await` is needed so that rejected promise will be thrown and caught
return await originalSocketCall.apply(socket, args);
} catch (e) {
log.errorAndThrow(JSON.stringify(e));
}
};
log.info(`Connected to ${dartObservatoryURL}`);
const vm = await socket.call(`getVM`);
socket.isolateId = vm.isolates[0].id;
// @todo check extension and do health check
const isolate = await socket.call(`getIsolate`, {
isolateId: `${socket.isolateId}`,
});
if (!isolate) {
log.errorAndThrow(`Cannot get Dart Isolate`);
}
if (!Array.isArray(isolate.extensionRPCs)) {
log.errorAndThrow(`Cannot get Dart extensionRPCs from isolate ${JSON.stringify(isolate)}`);
}
if (isolate.extensionRPCs.indexOf(`ext.flutter.driver`) < 0) {
const msg = `"ext.flutter.driver" is not found in "extensionRPCs" ${JSON.stringify(isolate.extensionRPCs)}`;
log.errorAndThrow(msg);
}
removeListenerAndResolve(socket);
} catch (e) {
removeListenerAndResolve(null);
log.error(`Cannot get Dart Isolate`);
log.errorAndThrow(e);
}
};
socket.on(`open`, onOpenListener);
});
retryCount++;
connectedSocket = await connectedPromise;
if (!connectedSocket && retryCount === MAX_RETRY_COUNT - 1) {
log.errorAndThrow(
`Failed to connect ` + MAX_RETRY_COUNT + ` times. Aborting.`,
);
}
}
retryCount = 0;
return connectedSocket;
};
export const executeElementCommand = async function(
this: FlutterDriver,
command: string,
elementBase64?: string,
extraArgs = {}) {
const elementObject = elementBase64 ? deserialize(elementBase64) : {};
const serializedCommand = { command, ...elementObject, ...extraArgs };
log.debug(`>>> ${JSON.stringify(serializedCommand)}`);
const data = await executeSocketCommand(this.socket, serializedCommand);
log.debug(`<<< ${JSON.stringify(data)} | previous command ${command}`);
if (data.isError) {
throw new Error(
`Cannot execute command ${command}, server reponse ${JSON.stringify(data, null, 2)}`,
);
}
return data.response;
};
export const executeSocketCommand = async (socket, cmd) =>
// call an RPC method with parameters
socket.call(`ext.flutter.driver`, {
...cmd,
isolateId: socket.isolateId,
});
export const processLogToGetobservatory = (adbLogs: Array<{message: string}>) => {
const observatoryUriRegEx = new RegExp(
`Observatory listening on ((http|\/\/)[a-zA-Z0-9:/=_\\-\.\\[\\]]+)`,
);
// @ts-ignore
const observatoryMatch = adbLogs
.map((e) => e.message)
.reverse()
.find((e) => e.match(observatoryUriRegEx))
.match(observatoryUriRegEx);
if (!observatoryMatch) {
throw new Error(`can't find Observatory`);
}
const dartObservatoryURI = observatoryMatch[1];
const dartObservatoryURL = new URL(dartObservatoryURI);
dartObservatoryURL.protocol = `ws`;
dartObservatoryURL.pathname += `ws`;
return dartObservatoryURL;
};