diff --git a/local.js b/local.js index 4518933..4b0effe 100755 --- a/local.js +++ b/local.js @@ -1,6 +1,6 @@ // Generated by CoffeeScript 1.3.3 (function() { - var KEY, PORT, REMOTE_PORT, SERVER, config, configContent, decryptTable, encrypt, encryptTable, fs, getServer, inetAton, inetNtoa, net, server, tables, timeout; + var KEY, PORT, REMOTE_PORT, SERVER, config, configContent, decryptTable, encrypt, encryptTable, fs, getServer, inetAton, inetNtoa, myScheduler, net, scheduler, server, tables, timeout; inetNtoa = function(buf) { return buf[0] + "." + buf[1] + "." + buf[2] + "." + buf[3]; @@ -24,6 +24,8 @@ fs = require("fs"); + scheduler = require('./scheduler'); + configContent = fs.readFileSync("config.json"); config = JSON.parse(configContent); @@ -38,12 +40,10 @@ timeout = Math.floor(config.timeout * 1000); + myScheduler = new scheduler.Scheduler(SERVER); + getServer = function() { - if (SERVER instanceof Array) { - return SERVER[Math.floor(Math.random() * SERVER.length)]; - } else { - return SERVER; - } + return myScheduler.getServer(); }; net = require("net"); @@ -59,8 +59,8 @@ decryptTable = tables[1]; server = net.createServer(function(connection) { - var addrLen, addrToSend, cachedPieces, headerLength, remote, remoteAddr, remotePort, stage; - console.log("server connected"); + var addrLen, addrToSend, cachedPieces, headerLength, remote, remoteAddr, remotePort, serverUsing, stage; + console.log("local connected"); console.log("concurrent connections: " + server.connections); stage = 0; headerLength = 0; @@ -70,8 +70,9 @@ remoteAddr = null; remotePort = null; addrToSend = ""; + serverUsing = getServer(); connection.on("data", function(data) { - var aServer, addrtype, buf, cmd, reply, tempBuf; + var addrtype, buf, cmd, reply, tempBuf; if (stage === 5) { encrypt.encrypt(encryptTable, data); if (!remote.write(data)) { @@ -120,10 +121,9 @@ buf.write("\u0000\u0000\u0000\u0000", 4, 4, "binary"); buf.writeInt16BE(remotePort, 8); connection.write(buf); - aServer = getServer(); - remote = net.connect(REMOTE_PORT, aServer, function() { + remote = net.connect(REMOTE_PORT, serverUsing, function() { var addrToSendBuf, i, piece; - console.log("connecting " + remoteAddr + " via " + aServer); + console.log("connecting " + remoteAddr + " via " + serverUsing); addrToSendBuf = new Buffer(addrToSend, "binary"); encrypt.encrypt(encryptTable, addrToSendBuf); remote.write(addrToSendBuf); @@ -149,13 +149,14 @@ return console.log("concurrent connections: " + server.connections); }); remote.on("error", function() { + myScheduler.serverFailed(serverUsing); if (stage === 4) { console.warn("remote connection refused"); connection.destroy(); - return; + } else { + console.warn("remote error"); + connection.end(); } - console.warn("remote error"); - connection.end(); return console.log("concurrent connections: " + server.connections); }); remote.on("drain", function() { @@ -186,14 +187,15 @@ } }); connection.on("end", function() { - console.log("server disconnected"); + myScheduler.serverSucceeded(serverUsing); + console.log("local disconnected"); if (remote) { - remote.destroy(); + remote.end(); } return console.log("concurrent connections: " + server.connections); }); connection.on("error", function() { - console.warn("server error"); + console.warn("local error"); if (remote) { remote.destroy(); } diff --git a/scheduler.js b/scheduler.js new file mode 100755 index 0000000..d83eb8c --- /dev/null +++ b/scheduler.js @@ -0,0 +1,63 @@ +// Generated by CoffeeScript 1.3.3 +(function() { + var Scheduler; + + Scheduler = (function() { + + function Scheduler(servers) { + if (servers instanceof Array) { + this._servers = servers; + } else { + this._servers = [servers]; + } + } + + Scheduler.prototype._servers = []; + + Scheduler.prototype._failureCount = {}; + + Scheduler.prototype._successCount = {}; + + Scheduler.prototype._ping = {}; + + Scheduler.prototype.toString = function() { + return "[" + this._servers.join(',') + "]"; + }; + + Scheduler.prototype._increaseCounter = function(counter, key) { + if (key in counter) { + return counter[key]++; + } else { + return counter[key] = 1; + } + }; + + Scheduler.prototype.serverFailed = function(server) { + console.log("" + server + " failed"); + return this._increaseCounter(this._failureCount, server); + }; + + Scheduler.prototype.serverSucceeded = function(server) { + console.log("" + server + " succeeded"); + return this._increaseCounter(this._successCount, server); + }; + + Scheduler.prototype.updatePing = function(server, ping) { + if (server in _ping) { + return _ping[server] = ping; + } else { + return _ping[server] = _ping[server] * 0.8 + ping * 0.2; + } + }; + + Scheduler.prototype.getServer = function() { + return this._servers[Math.floor(Math.random() * this._servers.length)]; + }; + + return Scheduler; + + })(); + + exports.Scheduler = Scheduler; + +}).call(this); diff --git a/server.js b/server.js index 196fdf7..28693cb 100755 --- a/server.js +++ b/server.js @@ -149,7 +149,7 @@ connection.on("end", function() { console.log("server disconnected"); if (remote) { - remote.destroy(); + remote.end(); } return console.log("concurrent connections: " + server.connections); }); diff --git a/src/local.coffee b/src/local.coffee index 5221e7a..7b22686 100755 --- a/src/local.coffee +++ b/src/local.coffee @@ -34,6 +34,7 @@ inetAton = (ipStr) -> buf fs = require("fs") +scheduler = require('./scheduler') configContent = fs.readFileSync("config.json") config = JSON.parse(configContent) SERVER = config.server @@ -42,11 +43,9 @@ PORT = config.local_port KEY = config.password timeout = Math.floor(config.timeout * 1000) +myScheduler = new scheduler.Scheduler SERVER getServer = -> - if SERVER instanceof Array - SERVER[Math.floor(Math.random() * SERVER.length)] - else - SERVER + myScheduler.getServer() net = require("net") encrypt = require("./encrypt") @@ -56,7 +55,7 @@ encryptTable = tables[0] decryptTable = tables[1] server = net.createServer((connection) -> - console.log "server connected" + console.log "local connected" console.log "concurrent connections: " + server.connections stage = 0 headerLength = 0 @@ -66,6 +65,7 @@ server = net.createServer((connection) -> remoteAddr = null remotePort = null addrToSend = "" + serverUsing = getServer() connection.on "data", (data) -> if stage is 5 # pipe sockets @@ -118,9 +118,8 @@ server = net.createServer((connection) -> buf.writeInt16BE remotePort, 8 connection.write buf # connect remote server - aServer = getServer() - remote = net.connect(REMOTE_PORT, aServer, -> - console.log "connecting #{remoteAddr} via #{aServer}" + remote = net.connect(REMOTE_PORT, serverUsing, -> + console.log "connecting #{remoteAddr} via #{serverUsing}" addrToSendBuf = new Buffer(addrToSend, "binary") encrypt.encrypt encryptTable, addrToSendBuf remote.write addrToSendBuf @@ -144,12 +143,13 @@ server = net.createServer((connection) -> console.log "concurrent connections: " + server.connections remote.on "error", -> + myScheduler.serverFailed(serverUsing) if stage is 4 console.warn "remote connection refused" connection.destroy() - return - console.warn "remote error" - connection.end() + else + console.warn "remote error" + connection.end() console.log "concurrent connections: " + server.connections remote.on "drain", -> @@ -176,12 +176,13 @@ server = net.createServer((connection) -> # make sure no data is lost connection.on "end", -> - console.log "server disconnected" - remote.destroy() if remote + myScheduler.serverSucceeded(serverUsing) + console.log "local disconnected" + remote.end() if remote console.log "concurrent connections: " + server.connections connection.on "error", -> - console.warn "server error" + console.warn "local error" remote.destroy() if remote console.log "concurrent connections: " + server.connections diff --git a/src/scheduler.coffee b/src/scheduler.coffee new file mode 100755 index 0000000..3f1ebc0 --- /dev/null +++ b/src/scheduler.coffee @@ -0,0 +1,40 @@ + +class Scheduler + constructor: (servers) -> + if servers instanceof Array + this._servers = servers + else + this._servers = [servers] + + _servers: [] + _failureCount: {} + _successCount: {} + _ping: {} + + toString: -> + "[" + this._servers.join(',') + "]" + + _increaseCounter: (counter, key) -> + if key of counter + counter[key]++ + else + counter[key] = 1 + + serverFailed: (server) -> + console.log "#{server} failed" + this._increaseCounter(this._failureCount, server) + + serverSucceeded: (server) -> + console.log "#{server} succeeded" + this._increaseCounter(this._successCount, server) + + updatePing: (server, ping)-> + if server of _ping + _ping[server] = ping + else + _ping[server] = (_ping[server] * 0.8 + ping * 0.2) + + getServer: -> + this._servers[Math.floor(Math.random() * this._servers.length)] + +exports.Scheduler = Scheduler \ No newline at end of file diff --git a/src/server.coffee b/src/server.coffee index 12d3ef6..4ec675a 100755 --- a/src/server.coffee +++ b/src/server.coffee @@ -136,7 +136,7 @@ server = net.createServer((connection) -> connection.on "end", -> console.log "server disconnected" - remote.destroy() if remote + remote.end() if remote console.log "concurrent connections: " + server.connections connection.on "error", ->