diff --git a/.evergreen/connectivity-tests/run.sh b/.evergreen/connectivity-tests/run.sh index 213bbed0b2d..04895b027c2 100644 --- a/.evergreen/connectivity-tests/run.sh +++ b/.evergreen/connectivity-tests/run.sh @@ -14,6 +14,7 @@ echo running connectivity tests image docker run \ --rm \ + -e DEBUG="${DEBUG}" \ -e E2E_TESTS_ATLAS_HOST="${E2E_TESTS_ATLAS_HOST}" \ -e E2E_TESTS_DATA_LAKE_HOST="${E2E_TESTS_DATA_LAKE_HOST}" \ -e E2E_TESTS_ANALYTICS_NODE_HOST="${E2E_TESTS_ANALYTICS_NODE_HOST}" \ diff --git a/package-lock.json b/package-lock.json index c5e66b13bb0..488832952ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -103745,12 +103745,14 @@ "@mongodb-js/prettier-config-compass": "^0.5.0", "@mongodb-js/tsconfig-compass": "^0.6.0", "@types/chai": "^4.2.21", + "@types/chai-as-promised": "^7.1.4", "@types/debug": "^4.1.7", "@types/mocha": "^9.0.0", "@types/node-fetch": "^2.5.8", "@types/sinon-chai": "^3.2.5", "@types/ssh2": "^0.5.46", "chai": "^4.3.4", + "chai-as-promised": "*", "depcheck": "^1.4.1", "eslint": "^7.25.0", "gen-esm-wrapper": "^1.1.0", @@ -136986,12 +136988,14 @@ "@mongodb-js/prettier-config-compass": "^0.5.0", "@mongodb-js/tsconfig-compass": "^0.6.0", "@types/chai": "^4.2.21", + "@types/chai-as-promised": "^7.1.4", "@types/debug": "^4.1.7", "@types/mocha": "^9.0.0", "@types/node-fetch": "^2.5.8", "@types/sinon-chai": "^3.2.5", "@types/ssh2": "^0.5.46", "chai": "^4.3.4", + "chai-as-promised": "*", "debug": "4.3.0", "depcheck": "^1.4.1", "eslint": "^7.25.0", diff --git a/packages/ssh-tunnel/package.json b/packages/ssh-tunnel/package.json index 73ef28c6ff3..baf0667c16b 100644 --- a/packages/ssh-tunnel/package.json +++ b/packages/ssh-tunnel/package.json @@ -54,12 +54,14 @@ "@mongodb-js/prettier-config-compass": "^0.5.0", "@mongodb-js/tsconfig-compass": "^0.6.0", "@types/chai": "^4.2.21", + "@types/chai-as-promised": "^7.1.4", "@types/debug": "^4.1.7", "@types/mocha": "^9.0.0", "@types/node-fetch": "^2.5.8", "@types/sinon-chai": "^3.2.5", "@types/ssh2": "^0.5.46", "chai": "^4.3.4", + "chai-as-promised": "*", "depcheck": "^1.4.1", "eslint": "^7.25.0", "gen-esm-wrapper": "^1.1.0", diff --git a/packages/ssh-tunnel/src/index.spec.ts b/packages/ssh-tunnel/src/index.spec.ts index 9276e6e2cc6..71fb2490ca8 100644 --- a/packages/ssh-tunnel/src/index.spec.ts +++ b/packages/ssh-tunnel/src/index.spec.ts @@ -1,4 +1,5 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ +import { once } from 'events'; import type { ServerConfig } from 'ssh2'; import { Server as SSHServer } from 'ssh2'; import type { Server as HttpServer } from 'http'; @@ -7,12 +8,16 @@ import { promisify } from 'util'; import { readFileSync } from 'fs'; import path from 'path'; import { Socket } from 'net'; -import fetch from 'node-fetch'; +import fetch, { FetchError } from 'node-fetch'; import { expect } from 'chai'; import { SocksClient } from 'socks'; - +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; import type { SshTunnelConfig } from './index'; import SSHTunnel from './index'; +import sinon from 'sinon'; + +chai.use(chaiAsPromised); function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -94,6 +99,7 @@ async function createTestSshTunnel(config: Partial = {}) { localPort: 0, ...config, }); + sinon.spy(sshTunnel.sshClient, 'connect'); await sshTunnel.listen(); } @@ -106,6 +112,12 @@ async function stopTestSshTunnel() { } } +function breakSshTunnelConnection() { + const promise = once(sshTunnel.sshClient, 'close'); + sshTunnel.sshClient.end(); + return promise; +} + interface Socks5ProxyOptions { proxyHost: string; proxyPort: number; @@ -273,4 +285,129 @@ describe('SSHTunnel', function () { expect(sshTunnel.server.address()).to.equal(null); } }); + + it('does not reconnect if the tunnel is already connected', async function () { + await createTestSshTunnel(); + + const address = `http://localhost:${httpServer.address().port}/`; + const options = { + proxyHost: sshTunnel.config.localAddr, + proxyPort: sshTunnel.config.localPort, + }; + const expected = 'Hello from http server'; + + const res1 = await httpFetchWithSocks5(address, options); + expect(await res1.text()).to.equal(expected); + + const res2 = await httpFetchWithSocks5(address, options); + expect(await res2.text()).to.equal(expected); + + expect(sshTunnel.sshClient.connect.callCount).to.equal(1); + }); + + it('reconnects tunnel if it got accidentally disconnected', async function () { + await createTestSshTunnel(); + + await breakSshTunnelConnection(); + + const address = `http://localhost:${httpServer.address().port}/`; + const options = { + proxyHost: sshTunnel.config.localAddr, + proxyPort: sshTunnel.config.localPort, + }; + const expected = 'Hello from http server'; + + const res1 = await httpFetchWithSocks5(address, options); + expect(await res1.text()).to.equal(expected); + + const res2 = await httpFetchWithSocks5(address, options); + expect(await res2.text()).to.equal(expected); + + expect(sshTunnel.sshClient.connect.callCount).to.equal(2); + }); + + it('reuses the connection promise if a request comes in before the tunnel connects', async function () { + await createTestSshTunnel(); + + await breakSshTunnelConnection(); + + const address = `http://localhost:${httpServer.address().port}/`; + const options = { + proxyHost: sshTunnel.config.localAddr, + proxyPort: sshTunnel.config.localPort, + }; + const expected = 'Hello from http server'; + + const [res1, res2] = await Promise.all([ + httpFetchWithSocks5(address, options), + httpFetchWithSocks5(address, options), + ]); + expect(await res1.text()).to.equal(expected); + expect(await res2.text()).to.equal(expected); + + expect(sshTunnel.sshClient.connect.callCount).to.equal(2); + }); + + it('does not reconnect the tunnel after it was deliberately closed', async function () { + await createTestSshTunnel(); + + // NOTE: normally you'd call close(), but that also closes the server so + // you'd get a different error first. Trying to trigger the race condition + // where the request made it to the socks5 server in time. + await sshTunnel.closeSshClient(); + + const remotePort = httpServer.address().port; + const address = `http://localhost:${remotePort}/`; + + const options = { + proxyHost: sshTunnel.config.localAddr, + proxyPort: sshTunnel.config.localPort, + }; + + const promise = httpFetchWithSocks5(address, options); + + await expect(promise).to.be.rejectedWith( + FetchError, + `request to http://localhost:${remotePort}/ failed, reason: Socket closed` + ); + + expect(sshTunnel.sshClient.connect.callCount).to.equal(1); + }); + + it('reconnects if the ssh connection times out while we try and open the channel', async function () { + await createTestSshTunnel(); + + const forwardOut = sshTunnel.forwardOut; + sinon + .stub(sshTunnel, 'forwardOut') + .callsFake(async function ( + srcAddr: string, + srcPort: number, + dstAddr: string, + dstPort: number + ) { + await breakSshTunnelConnection(); + const promise = forwardOut.call( + this, + srcAddr, + srcPort, + dstAddr, + dstPort + ); + sshTunnel.forwardOut.restore(); + return promise; + }); + + const address = `http://localhost:${httpServer.address().port}/`; + const options = { + proxyHost: sshTunnel.config.localAddr, + proxyPort: sshTunnel.config.localPort, + }; + const expected = 'Hello from http server'; + + const res = await httpFetchWithSocks5(address, options); + expect(await res.text()).to.equal(expected); + + expect(sshTunnel.sshClient.connect.callCount).to.equal(2); + }); }); diff --git a/packages/ssh-tunnel/src/index.ts b/packages/ssh-tunnel/src/index.ts index 419eacf87fc..e3e4c3573eb 100644 --- a/packages/ssh-tunnel/src/index.ts +++ b/packages/ssh-tunnel/src/index.ts @@ -50,6 +50,9 @@ function getSshTunnelConfig(config: Partial): SshTunnelConfig { } export class SshTunnel extends EventEmitter { + private connected = false; + private closed = false; + private connectingPromise?: Promise; private connections: Set = new Set(); private server: any; private rawConfig: SshTunnelConfig; @@ -70,57 +73,16 @@ export class SshTunnel extends EventEmitter { this.sshClient = new SshClient(); + this.sshClient.on('close', () => { + debug('sshClient closed'); + this.connected = false; + }); + this.forwardOut = promisify(this.sshClient.forwardOut.bind(this.sshClient)); - // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.server = socks5Server.createServer( - async ( - info: any, - accept: (intercept: true) => Socket, - deny: () => void - ) => { - debug('receiving socks5 forwarding request', info); - let socket: Socket | null = null; - - try { - const channel = await this.forwardOut( - info.srcAddr, - info.srcPort, - info.dstAddr, - info.dstPort - ); - debug('channel opened, accepting socks5 request', info); - - socket = accept(true); - this.connections.add(socket); - - socket.on('error', (err: ErrorWithOrigin) => { - debug('error on socksv5 socket', info, err); - err.origin = err.origin ?? 'connection'; - this.server.emit('error', err); - }); - - socket.once('close', () => { - debug('socksv5 socket closed, removing from set'); - this.connections.delete(socket as Socket); - }); - - socket.pipe(channel).pipe(socket); - } catch (err) { - debug('caught error, rejecting socks5 request', info, err); - deny(); - if (socket) { - (err as any).origin = 'ssh-client'; - socket.destroy(err as any); - } - } - } - ); + this.server = socks5Server.createServer(this.socks5Request.bind(this)); - if (!this.rawConfig.socks5Username) { - debug('skipping auth setup for this server'); - this.server.useAuth(socks5AuthNone()); - } else { + if (this.rawConfig.socks5Username) { this.server.useAuth( socks5AuthUserPassword( (user: string, pass: string, cb: (success: boolean) => void) => { @@ -132,6 +94,9 @@ export class SshTunnel extends EventEmitter { } ) ); + } else { + debug('skipping auth setup for this server'); + this.server.useAuth(socks5AuthNone()); } this.serverListen = promisify(this.server.listen.bind(this.server)); @@ -159,24 +124,7 @@ export class SshTunnel extends EventEmitter { debug('starting to listen', { localAddr, localPort }); await this.serverListen(localPort, localAddr); - try { - debug('creating SSH connection'); - await Promise.race([ - once(this.sshClient, 'error').then(([err]) => { - throw err; - }), - (() => { - const waitForReady = once(this.sshClient, 'ready') as Promise<[void]>; - this.sshClient.connect(getConnectConfig(this.rawConfig)); - return waitForReady; - })(), - ]); - debug('created SSH connection'); - } catch (err) { - debug('failed to establish SSH connection', err); - await this.serverClose(); - throw err; - } + await this.connectSsh(); } async close(): Promise { @@ -195,12 +143,60 @@ export class SshTunnel extends EventEmitter { } } - private async closeSshClient() { + private async connectSsh(): Promise { + if (this.connected) { + debug('already connected'); + return; + } + + if (this.connectingPromise) { + debug('reusing connectingPromise'); + return this.connectingPromise; + } + + if (this.closed) { + // A socks5 request could come in after we deliberately closed the connection. Don't reconnect in that case. + throw new Error('Disconnected.'); + } + + debug('creating SSH connection'); + + this.connectingPromise = Promise.race([ + once(this.sshClient, 'error').then(([err]) => { + throw err; + }), + (() => { + const waitForReady = once(this.sshClient, 'ready').then(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + this.sshClient.connect(getConnectConfig(this.rawConfig)); + return waitForReady; + })(), + ]); + try { - return once(this.sshClient, 'close'); - } finally { - this.sshClient.end(); + await this.connectingPromise; + } catch (err) { + debug('failed to establish SSH connection', err); + delete this.connectingPromise; + await this.serverClose(); + throw err; + } + + delete this.connectingPromise; + this.connected = true; + debug('created SSH connection'); + } + + private async closeSshClient() { + if (!this.connected) { + return; } + + // don't automatically reconnect if another request comes in + this.closed = true; + + const promise = once(this.sshClient, 'close'); + this.sshClient.end(); + return promise; } private async closeOpenConnections() { @@ -212,6 +208,68 @@ export class SshTunnel extends EventEmitter { await Promise.all(waitForClose); this.connections.clear(); } + + private async socks5Request( + info: any, + accept: (intercept: true) => Socket, + deny: () => void + ): Promise { + debug('receiving socks5 forwarding request', info); + let socket: Socket | null = null; + + try { + await this.connectSsh(); + + let channel; + try { + channel = await this.forwardOut( + info.srcAddr, + info.srcPort, + info.dstAddr, + info.dstPort + ); + } catch (err) { + if ((err as Error).message === 'Not connected') { + this.connected = false; + debug('error forwarding. retrying..', info, err); + await this.connectSsh(); + channel = await this.forwardOut( + info.srcAddr, + info.srcPort, + info.dstAddr, + info.dstPort + ); + } else { + throw err; + } + } + + debug('channel opened, accepting socks5 request', info); + + socket = accept(true); + this.connections.add(socket); + + socket.on('error', (err: ErrorWithOrigin) => { + debug('error on socksv5 socket', info, err); + err.origin = err.origin ?? 'connection'; + this.server.emit('error', err); + }); + + socket.once('close', () => { + debug('socksv5 socket closed, removing from set'); + this.connections.delete(socket as Socket); + }); + + socket.pipe(channel).pipe(socket); + } catch (err) { + debug('caught error, rejecting socks5 request', info, err); + deny(); + if (socket) { + (err as any).origin = 'ssh-client'; + socket.destroy(err as any); + } + } + } } export default SshTunnel;