From 2af12b2e5ddbd4719db1191f83f39ea554d8683d Mon Sep 17 00:00:00 2001 From: Zhao Xiaohong Date: Mon, 12 Jan 2015 00:26:20 +0800 Subject: [PATCH] pause/resume socket properly closes issue #2 --- README.md | 4 ++-- config.json | 1 + local.js | 27 ++++++++++++++++++++------- server.js | 21 +++++++++++++++++---- src/local.coffee | 19 ++++++++++++++----- src/server.coffee | 17 +++++++++++++---- 6 files changed, 67 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 3befa4dd..de0cae39 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,8 @@ ws@0.6.4 node_modules/ws Then run: ``` -$ node local.js -s still-tor-8707.herokuapp.com -l 1080 -m rc4 -k foobar -shadowsocks-heroku v0.9.6 +$ node local.js -s still-tor-8707.herokuapp.com -l 1080 -m rc4 -k foobar -r 80 +server listening at { address: '0.0.0.0', family: 'IPv4', port: 1080 } ``` Change proxy settings of your browser into: diff --git a/config.json b/config.json index d548e058..ffef71be 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,6 @@ { "server": "127.0.0.1", + "local_address": "127.0.0.1", "local_port": 1080, "remote_port": 8080, "password": "`try*(^^$some^$%^complex>:<>?~password/", diff --git a/local.js b/local.js index cd9345f1..385a2dcb 100644 --- a/local.js +++ b/local.js @@ -90,9 +90,14 @@ var addrtype, buf, cmd, e, reply, tempBuf; if (stage === 5) { data = encryptor.encrypt(data); - ws.send(data, { - binary: true - }); + if (ws.readyState === WebSocket.OPEN) { + ws.send(data, { + binary: true + }); + if (ws.bufferedAmount > 0) { + connection.pause(); + } + } return; } if (stage === 0) { @@ -159,10 +164,15 @@ ping = setInterval(function() { return ws.ping("", null, true); }, 50 * 1000); + ws._socket.on("drain", function() { + return connection.resume(); + }); }); ws.on("message", function(data, flags) { data = encryptor.decrypt(data); - return connection.write(data); + if (!connection.write(data)) { + return ws._socket.pause(); + } }); ws.on("close", function() { clearInterval(ping); @@ -197,7 +207,7 @@ connection.on("end", function() { console.log("local disconnected"); if (ws) { - ws.close(); + ws.terminate(); } return server.getConnections(function(err, count) { console.log("concurrent connections:", count); @@ -206,17 +216,20 @@ connection.on("error", function(e) { console.log("local error: " + e); if (ws) { - ws.close(); + ws.terminate(); } return server.getConnections(function(err, count) { console.log("concurrent connections:", count); }); }); + connection.on("drain", function() { + return ws.resume(); + }); return connection.setTimeout(timeout, function() { console.log("local timeout"); connection.destroy(); if (ws) { - return ws.close(); + return ws.terminate(); } }); }); diff --git a/server.js b/server.js index 6f33dd08..6392a897 100644 --- a/server.js +++ b/server.js @@ -94,7 +94,9 @@ var addrtype, buf, e; data = encryptor.decrypt(data); if (stage === 5) { - remote.write(data); + if (!remote.write(data)) { + ws.pause(); + } return; } if (stage === 0) { @@ -131,17 +133,23 @@ remote.on("data", function(data) { data = encryptor.encrypt(data); if (ws.readyState === WebSocket.OPEN) { - return ws.send(data, { + ws.send(data, { binary: true }); + if (ws.bufferedAmount > 0) { + remote.pause(); + } } }); remote.on("end", function() { - ws.emit("close"); + ws.close(); return console.log("remote disconnected"); }); + remote.on("drain", function() { + return ws.resume(); + }); remote.on("error", function(e) { - ws.emit("close"); + ws.close(); return console.log("remote: " + e); }); remote.setTimeout(timeout, function() { @@ -173,6 +181,11 @@ ws.on("ping", function() { return ws.pong('', null, true); }); + ws._socket.on("drain", function() { + if (remote) { + return remote.resume(); + } + }); ws.on("close", function() { console.log("server disconnected"); console.log("concurrent connections:", wss.clients.length); diff --git a/src/local.coffee b/src/local.coffee index 72aebb37..12d65f32 100644 --- a/src/local.coffee +++ b/src/local.coffee @@ -63,7 +63,9 @@ server = net.createServer (connection) -> if stage is 5 # pipe sockets data = encryptor.encrypt data - ws.send data, { binary: true } + if ws.readyState is WebSocket.OPEN + ws.send data, { binary: true } + connection.pause() if ws.bufferedAmount > 0 return if stage is 0 tempBuf = new Buffer(2) @@ -130,11 +132,15 @@ server = net.createServer (connection) -> ping = setInterval(-> ws.ping "", null, true , 50 * 1000) + + ws._socket.on "drain", -> + connection.resume() + return ws.on "message", (data, flags) -> data = encryptor.decrypt data - connection.write(data) + ws._socket.pause() unless connection.write(data) ws.on "close", -> clearInterval ping @@ -165,22 +171,25 @@ server = net.createServer (connection) -> connection.on "end", -> console.log "local disconnected" - ws.close() if ws + ws.terminate() if ws server.getConnections (err, count) -> console.log "concurrent connections:", count return connection.on "error", (e)-> console.log "local error: #{e}" - ws.close() if ws + ws.terminate() if ws server.getConnections (err, count) -> console.log "concurrent connections:", count return + connection.on "drain", -> + ws.resume() + connection.setTimeout timeout, -> console.log "local timeout" connection.destroy() - ws.close() if ws + ws.terminate() if ws server.listen PORT, LOCAL_ADDRESS, -> address = server.address() diff --git a/src/server.coffee b/src/server.coffee index 14794173..7e5a1f0f 100644 --- a/src/server.coffee +++ b/src/server.coffee @@ -57,7 +57,7 @@ wss.on "connection", (ws) -> ws.on "message", (data, flags) -> data = encryptor.decrypt data if stage is 5 - remote.write(data) + ws.pause() unless remote.write(data) return if stage is 0 try @@ -92,14 +92,20 @@ wss.on "connection", (ws) -> ) remote.on "data", (data) -> data = encryptor.encrypt data - ws.send data, { binary: true } if ws.readyState is WebSocket.OPEN + if ws.readyState is WebSocket.OPEN + ws.send data, { binary: true } + remote.pause() if ws.bufferedAmount > 0 + return remote.on "end", -> - ws.emit "close" + ws.close() console.log "remote disconnected" + remote.on "drain", -> + ws.resume() + remote.on "error", (e)-> - ws.emit "close" + ws.close() console.log "remote: #{e}" remote.setTimeout timeout, -> @@ -127,6 +133,9 @@ wss.on "connection", (ws) -> ws.on "ping", -> ws.pong '', null, true + ws._socket.on "drain", -> + remote.resume() if remote + ws.on "close", -> console.log "server disconnected" console.log "concurrent connections:", wss.clients.length