Skip to content

Commit

Permalink
fix(mitm): directly handle reused socket closing
Browse files Browse the repository at this point in the history
  • Loading branch information
blakebyrnes committed Mar 31, 2021
1 parent 2d794d9 commit 8651445
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
9 changes: 6 additions & 3 deletions mitm-socket/index.ts
Expand Up @@ -33,14 +33,15 @@ export default class MitmSocket extends TypedEventEmitter<{
public dialTime: Date;
public connectTime: Date;
public closeTime: Date;

public isConnected = false;
public isReused = false;
public isClosing = false;

public get pid(): number | undefined {
return this.child?.pid;
}

private isClosing = false;
private isConnected = false;
private child: ChildProcess;
private connectError?: string;
private readonly callStack: string;
Expand Down Expand Up @@ -223,7 +224,9 @@ export default class MitmSocket extends TypedEventEmitter<{
host: this.connectOpts?.host,
clientHello: this.connectOpts?.clientHelloId,
});
} else if (message) {
} else if (message.startsWith('[DomainSocketPiper.Closed]')) {
this.close();
} else {
this.logger.info('SocketHandler.onData', {
message,
host: this.connectOpts?.host,
Expand Down
6 changes: 3 additions & 3 deletions mitm-socket/lib/domain_socket_piper.go
Expand Up @@ -78,9 +78,9 @@ func (piper *DomainSocketPiper) Pipe(remoteConn net.Conn, sigc chan os.Signal) {
if readErr == io.EOF {
if n == 0 {
atomic.AddUint32(&piper.completeCounter, 1)
if piper.debug {
fmt.Println("DomainSocket -> 0 Byte EOF")
}

fmt.Println("[DomainSocketPiper.Closed] -> 0 Byte EOF")

return
}
time.Sleep(200 * time.Millisecond)
Expand Down
13 changes: 9 additions & 4 deletions mitm/lib/MitmRequestAgent.ts
Expand Up @@ -301,7 +301,12 @@ export default class MitmRequestAgent {
request.once('response', x => {
response = x;
});
const rebroadcast = (event: string, handler: (result: any) => void): http.ClientRequest => {

// we have to rebroadcast because this function is async, so the handlers can register late
const rebroadcastMissedEvent = (
event: string,
handler: (result: any) => void,
): http.ClientRequest => {
if (event === 'response' && response) {
handler(response);
response = null;
Expand All @@ -314,19 +319,19 @@ export default class MitmRequestAgent {
const originalOnce = request.once.bind(request);
request.on = function onOverride(event, handler): http.ClientRequest {
originalOn(event, handler);
return rebroadcast(event, handler);
return rebroadcastMissedEvent(event, handler);
};
request.once = function onOverride(event, handler): http.ClientRequest {
originalOnce(event, handler);
return rebroadcast(event, handler);
return rebroadcastMissedEvent(event, handler);
};

// if re-using, we need to make sure the connection can still be written to by probing it
if (ctx.proxyToServerMitmSocket.isReused) {
if (!request.headersSent) request.flushHeaders();
// give this 100 ms to flush (go is on a wait timer right now)
await new Promise(resolve => setTimeout(resolve, 100));
if (didHaveFlushErrors) {
if (didHaveFlushErrors || ctx.proxyToServerMitmSocket.isClosing) {
await this.assignSocket(ctx, requestSettings);
return this.http1Request(ctx, requestSettings);
}
Expand Down
21 changes: 18 additions & 3 deletions mitm/test/MitmRequestAgent.test.ts
Expand Up @@ -5,6 +5,7 @@ import * as HttpProxyAgent from 'http-proxy-agent';
import { IncomingHttpHeaders, IncomingMessage } from 'http';
import { URL } from 'url';
import * as https from 'https';
import * as net from 'net';
import MitmServer from '../lib/MitmProxy';
import RequestSession from '../handlers/RequestSession';
import HeadersHandler from '../handlers/HeadersHandler';
Expand Down Expand Up @@ -115,10 +116,13 @@ test('should create new connections as needed when no keepalive', async () => {
});

test('should be able to handle a reused socket that closes on server', async () => {
let serverSocket: net.Socket;
const sockets = new Set<net.Socket>();
const server = await Helpers.runHttpsServer(async (req, res) => {
res.writeHead(200, { Connection: 'keep-alive' });

res.end('Looks good');
serverSocket = res.socket;
sockets.add(res.socket);
});
const mitmServer = await startMitmServer();

Expand All @@ -144,8 +148,15 @@ test('should be able to handle a reused socket that closes on server', async ()
expect(response).toBe('Looks good');
}

// node seems to default reset the connection at 5 seconds
await new Promise(resolve => setTimeout(resolve, 5e3));
// @ts-ignore
const originalFn = session.requestAgent.http1Request.bind(session.requestAgent);

const httpRequestSpy = jest.spyOn<any, any>(session.requestAgent, 'http1Request');
httpRequestSpy.mockImplementationOnce(async (ctx, settings) => {
serverSocket.destroy();
await new Promise(setImmediate);
return await originalFn(ctx, settings);
});

{
const request = https.request({
Expand Down Expand Up @@ -173,6 +184,10 @@ test('should be able to handle a reused socket that closes on server', async ()
}
expect(body.join('')).toBe('Looks good');
}

expect(sockets.size).toBe(2);
expect(httpRequestSpy).toHaveBeenCalledTimes(2);
httpRequestSpy.mockClear();
});

test('it should not put upgrade connections in a pool', async () => {
Expand Down

0 comments on commit 8651445

Please sign in to comment.