diff --git a/cluster/handler.go b/cluster/handler.go index 6ef01ad..47fb58e 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -53,6 +53,9 @@ var ( type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool) +// CustomerRemoteServiceRoute customer remote service route +type CustomerRemoteServiceRoute func(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo + func cache() { hrdata := map[string]interface{}{ "code": 200, @@ -336,14 +339,29 @@ func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Mess } // Select a remote service address - // 1. Use the service address directly if the router contains binding item - // 2. Select a remote service address randomly and bind to router + // 1. if exist customer remote service route ,use it, otherwise use default strategy + // 2. Use the service address directly if the router contains binding item + // 3. Select a remote service address randomly and bind to router var remoteAddr string - if addr, found := session.Router().Find(service); found { - remoteAddr = addr + if h.currentNode.Options.RemoteServiceRoute != nil { + if addr, found := session.Router().Find(service); found { + remoteAddr = addr + } else { + member := h.currentNode.Options.RemoteServiceRoute(service, session, members) + if member == nil { + log.Println(fmt.Sprintf("customize remoteServiceRoute handler: %s is not found", msg.Route)) + return + } + remoteAddr = member.ServiceAddr + session.Router().Bind(service, remoteAddr) + } } else { - remoteAddr = members[rand.Intn(len(members))].ServiceAddr - session.Router().Bind(service, remoteAddr) + if addr, found := session.Router().Find(service); found { + remoteAddr = addr + } else { + remoteAddr = members[rand.Intn(len(members))].ServiceAddr + session.Router().Bind(service, remoteAddr) + } } pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr) if err != nil { diff --git a/cluster/node.go b/cluster/node.go index d8508d9..85808e1 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -55,6 +55,7 @@ type Options struct { TSLCertificate string TSLKey string UnregisterCallback func(Member) + RemoteServiceRoute CustomerRemoteServiceRoute } // Node represents a node in nano cluster, which will contains a group of services. diff --git a/examples/customerroute/README.md b/examples/customerroute/README.md new file mode 100644 index 0000000..ecb49b9 --- /dev/null +++ b/examples/customerroute/README.md @@ -0,0 +1,27 @@ +# Nano cluster example + +## About this example + + + +## How to run the example? + +```shell +cd examples/customerroute +go build + +# run master server +./customerroute master +./customerroute chat --listen "127.0.0.1:34580" +./customerroute chat --listen "127.0.0.1:34581" +./customerroute gate --listen "127.0.0.1:34570" --gate-address "127.0.0.1:34590" +``` + +## open browser and visit url for 4 times +``` +http://127.0.0.1:12345/web/ +http://127.0.0.1:12345/web/ +http://127.0.0.1:12345/web/ +http://127.0.0.1:12345/web/ +``` +input content and send, the same ChatRoomService node will sync the message each other diff --git a/examples/customerroute/main.go b/examples/customerroute/main.go new file mode 100644 index 0000000..fac2b00 --- /dev/null +++ b/examples/customerroute/main.go @@ -0,0 +1,186 @@ +package main + +import ( + "fmt" + "github.com/lonng/nano/cluster/clusterpb" + "log" + "net/http" + "os" + "path/filepath" + "runtime" + + "github.com/lonng/nano" + "github.com/lonng/nano/examples/customerroute/onegate" + "github.com/lonng/nano/examples/customerroute/tworoom" + "github.com/lonng/nano/serialize/json" + "github.com/lonng/nano/session" + "github.com/pingcap/errors" + "github.com/urfave/cli" +) + +func main() { + app := cli.NewApp() + app.Name = "NanoCustomerRouteDemo" + app.Author = "Lonng" + app.Email = "heng@lonng.org" + app.Description = "Nano cluster demo" + app.Commands = []cli.Command{ + { + Name: "master", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "listen,l", + Usage: "Master service listen address", + Value: "127.0.0.1:34567", + }, + }, + Action: runMaster, + }, + { + Name: "gate", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "master", + Usage: "master server address", + Value: "127.0.0.1:34567", + }, + cli.StringFlag{ + Name: "listen,l", + Usage: "Gate service listen address", + Value: "", + }, + cli.StringFlag{ + Name: "gate-address", + Usage: "Client connect address", + Value: "", + }, + }, + Action: runGate, + }, + { + Name: "chat", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "master", + Usage: "master server address", + Value: "127.0.0.1:34567", + }, + cli.StringFlag{ + Name: "listen,l", + Usage: "Chat service listen address", + Value: "", + }, + }, + Action: runChat, + }, + } + log.SetFlags(log.LstdFlags | log.Lshortfile) + if err := app.Run(os.Args); err != nil { + log.Fatalf("Startup server error %+v", err) + } +} + +func srcPath() string { + _, file, _, _ := runtime.Caller(0) + return filepath.Dir(file) +} + +func runMaster(args *cli.Context) error { + listen := args.String("listen") + if listen == "" { + return errors.Errorf("master listen address cannot empty") + } + + webDir := filepath.Join(srcPath(), "onemaster", "web") + log.Println("Nano master server web content directory", webDir) + log.Println("Nano master listen address", listen) + log.Println("Open http://127.0.0.1:12345/web/ in browser") + + http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir(webDir)))) + go func() { + if err := http.ListenAndServe(":12345", nil); err != nil { + panic(err) + } + }() + + // Startup Nano server with the specified listen address + nano.Listen(listen, + nano.WithMaster(), + nano.WithSerializer(json.NewSerializer()), + nano.WithDebugMode(), + ) + + return nil +} + +func runGate(args *cli.Context) error { + listen := args.String("listen") + if listen == "" { + return errors.Errorf("gate listen address cannot empty") + } + + masterAddr := args.String("master") + if masterAddr == "" { + return errors.Errorf("master address cannot empty") + } + + gateAddr := args.String("gate-address") + if gateAddr == "" { + return errors.Errorf("gate address cannot empty") + } + + log.Println("Current server listen address", listen) + log.Println("Current gate server address", gateAddr) + log.Println("Remote master server address", masterAddr) + + // Startup Nano server with the specified listen address + nano.Listen(listen, + nano.WithAdvertiseAddr(masterAddr), + nano.WithClientAddr(gateAddr), + nano.WithComponents(onegate.Services), + nano.WithSerializer(json.NewSerializer()), + nano.WithIsWebsocket(true), + nano.WithWSPath("/nano"), + nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }), + nano.WithDebugMode(), + //set remote service route for gate + nano.WithCustomerRemoteServiceRoute(customerRemoteServiceRoute), + nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid() + ) + return nil +} + +func runChat(args *cli.Context) error { + listen := args.String("listen") + if listen == "" { + return errors.Errorf("chat listen address cannot empty") + } + + masterAddr := args.String("master") + if listen == "" { + return errors.Errorf("master address cannot empty") + } + + log.Println("Current chat server listen address", listen) + log.Println("Remote master server address", masterAddr) + + // Register session closed callback + session.Lifetime.OnClosed(tworoom.OnSessionClosed) + + // Startup Nano server with the specified listen address + nano.Listen(listen, + nano.WithAdvertiseAddr(masterAddr), + nano.WithComponents(tworoom.Services), + nano.WithSerializer(json.NewSerializer()), + nano.WithDebugMode(), + ) + + return nil +} + +func customerRemoteServiceRoute(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo { + count := int64(len(members)) + var index = session.UID() % count + fmt.Printf("remote service:%s route to :%v \n", service, members[index]) + return members[index] +} diff --git a/examples/customerroute/onegate/gate_service.go b/examples/customerroute/onegate/gate_service.go new file mode 100644 index 0000000..ede2130 --- /dev/null +++ b/examples/customerroute/onegate/gate_service.go @@ -0,0 +1,43 @@ +package onegate + +import ( + "fmt" + "github.com/lonng/nano/component" + "github.com/lonng/nano/examples/cluster/protocol" + "github.com/lonng/nano/session" + "github.com/pingcap/errors" +) + +type RegisterService struct { + component.Base + nextGateUid int64 +} + +func newRegisterService() *RegisterService { + return &RegisterService{} +} + +type ( + RegisterRequest struct { + Nickname string `json:"nickname"` + } + RegisterResponse struct { + Code int `json:"code"` + } +) + +func (bs *RegisterService) Login(s *session.Session, msg *RegisterRequest) error { + bs.nextGateUid++ + uid := bs.nextGateUid + s.Bind(uid) + fmt.Println("Login uid:", uid) + chat := &protocol.JoinRoomRequest{ + Nickname: msg.Nickname, + GateUid: uid, + MasterUid: uid, + } + if err := s.RPC("ChatRoomService.JoinRoom", chat); err != nil { + return errors.Trace(err) + } + return s.Response(&RegisterResponse{}) +} diff --git a/examples/customerroute/onegate/init.go b/examples/customerroute/onegate/init.go new file mode 100644 index 0000000..8b0ba0f --- /dev/null +++ b/examples/customerroute/onegate/init.go @@ -0,0 +1,14 @@ +package onegate + +import "github.com/lonng/nano/component" + +var ( + // All services in master server + Services = &component.Components{} + + bindService = newRegisterService() +) + +func init() { + Services.Register(bindService) +} diff --git a/examples/customerroute/onemaster/web/index.html b/examples/customerroute/onemaster/web/index.html new file mode 100644 index 0000000..9ae34b4 --- /dev/null +++ b/examples/customerroute/onemaster/web/index.html @@ -0,0 +1,80 @@ + + + + + Chat Demo + + +
+ +
+ + + + +
+
+ + + + + + + \ No newline at end of file diff --git a/examples/customerroute/onemaster/web/protocol.js b/examples/customerroute/onemaster/web/protocol.js new file mode 100644 index 0000000..b16420d --- /dev/null +++ b/examples/customerroute/onemaster/web/protocol.js @@ -0,0 +1,349 @@ +(function (exports, ByteArray, global) { + var Protocol = exports; + + var PKG_HEAD_BYTES = 4; + var MSG_FLAG_BYTES = 1; + var MSG_ROUTE_CODE_BYTES = 2; + var MSG_ID_MAX_BYTES = 5; + var MSG_ROUTE_LEN_BYTES = 1; + + var MSG_ROUTE_CODE_MAX = 0xffff; + + var MSG_COMPRESS_ROUTE_MASK = 0x1; + var MSG_TYPE_MASK = 0x7; + + var Package = Protocol.Package = {}; + var Message = Protocol.Message = {}; + + Package.TYPE_HANDSHAKE = 1; + Package.TYPE_HANDSHAKE_ACK = 2; + Package.TYPE_HEARTBEAT = 3; + Package.TYPE_DATA = 4; + Package.TYPE_KICK = 5; + + Message.TYPE_REQUEST = 0; + Message.TYPE_NOTIFY = 1; + Message.TYPE_RESPONSE = 2; + Message.TYPE_PUSH = 3; + + /** + * pomele client encode + * id message id; + * route message route + * msg message body + * socketio current support string + */ + Protocol.strencode = function(str) { + var byteArray = new ByteArray(str.length * 3); + var offset = 0; + for(var i = 0; i < str.length; i++){ + var charCode = str.charCodeAt(i); + var codes = null; + if(charCode <= 0x7f){ + codes = [charCode]; + }else if(charCode <= 0x7ff){ + codes = [0xc0|(charCode>>6), 0x80|(charCode & 0x3f)]; + }else{ + codes = [0xe0|(charCode>>12), 0x80|((charCode & 0xfc0)>>6), 0x80|(charCode & 0x3f)]; + } + for(var j = 0; j < codes.length; j++){ + byteArray[offset] = codes[j]; + ++offset; + } + } + var _buffer = new ByteArray(offset); + copyArray(_buffer, 0, byteArray, 0, offset); + return _buffer; + }; + + /** + * client decode + * msg String data + * return Message Object + */ + Protocol.strdecode = function(buffer) { + var bytes = new ByteArray(buffer); + var array = []; + var offset = 0; + var charCode = 0; + var end = bytes.length; + while(offset < end){ + if(bytes[offset] < 128){ + charCode = bytes[offset]; + offset += 1; + }else if(bytes[offset] < 224){ + charCode = ((bytes[offset] & 0x3f)<<6) + (bytes[offset+1] & 0x3f); + offset += 2; + }else{ + charCode = ((bytes[offset] & 0x0f)<<12) + ((bytes[offset+1] & 0x3f)<<6) + (bytes[offset+2] & 0x3f); + offset += 3; + } + array.push(charCode); + } + return String.fromCharCode.apply(null, array); + }; + + /** + * Package protocol encode. + * + * Pomelo package format: + * +------+-------------+------------------+ + * | type | body length | body | + * +------+-------------+------------------+ + * + * Head: 4bytes + * 0: package type, + * 1 - handshake, + * 2 - handshake ack, + * 3 - heartbeat, + * 4 - data + * 5 - kick + * 1 - 3: big-endian body length + * Body: body length bytes + * + * @param {Number} type package type + * @param {ByteArray} body body content in bytes + * @return {ByteArray} new byte array that contains encode result + */ + Package.encode = function(type, body){ + var length = body ? body.length : 0; + var buffer = new ByteArray(PKG_HEAD_BYTES + length); + var index = 0; + buffer[index++] = type & 0xff; + buffer[index++] = (length >> 16) & 0xff; + buffer[index++] = (length >> 8) & 0xff; + buffer[index++] = length & 0xff; + if(body) { + copyArray(buffer, index, body, 0, length); + } + return buffer; + }; + + /** + * Package protocol decode. + * See encode for package format. + * + * @param {ByteArray} buffer byte array containing package content + * @return {Object} {type: package type, buffer: body byte array} + */ + Package.decode = function(buffer){ + var offset = 0; + var bytes = new ByteArray(buffer); + var length = 0; + var rs = []; + while(offset < bytes.length) { + var type = bytes[offset++]; + length = ((bytes[offset++]) << 16 | (bytes[offset++]) << 8 | bytes[offset++]) >>> 0; + var body = length ? new ByteArray(length) : null; + copyArray(body, 0, bytes, offset, length); + offset += length; + rs.push({'type': type, 'body': body}); + } + return rs.length === 1 ? rs[0]: rs; + }; + + /** + * Message protocol encode. + * + * @param {Number} id message id + * @param {Number} type message type + * @param {Number} compressRoute whether compress route + * @param {Number|String} route route code or route string + * @param {Buffer} msg message body bytes + * @return {Buffer} encode result + */ + Message.encode = function(id, type, compressRoute, route, msg){ + // caculate message max length + var idBytes = msgHasId(type) ? caculateMsgIdBytes(id) : 0; + var msgLen = MSG_FLAG_BYTES + idBytes; + + if(msgHasRoute(type)) { + if(compressRoute) { + if(typeof route !== 'number'){ + throw new Error('error flag for number route!'); + } + msgLen += MSG_ROUTE_CODE_BYTES; + } else { + msgLen += MSG_ROUTE_LEN_BYTES; + if(route) { + route = Protocol.strencode(route); + if(route.length>255) { + throw new Error('route maxlength is overflow'); + } + msgLen += route.length; + } + } + } + + if(msg) { + msgLen += msg.length; + } + + var buffer = new ByteArray(msgLen); + var offset = 0; + + // add flag + offset = encodeMsgFlag(type, compressRoute, buffer, offset); + + // add message id + if(msgHasId(type)) { + offset = encodeMsgId(id, buffer, offset); + } + + // add route + if(msgHasRoute(type)) { + offset = encodeMsgRoute(compressRoute, route, buffer, offset); + } + + // add body + if(msg) { + offset = encodeMsgBody(msg, buffer, offset); + } + + return buffer; + }; + + /** + * Message protocol decode. + * + * @param {Buffer|Uint8Array} buffer message bytes + * @return {Object} message object + */ + Message.decode = function(buffer) { + var bytes = new ByteArray(buffer); + var bytesLen = bytes.length || bytes.byteLength; + var offset = 0; + var id = 0; + var route = null; + + // parse flag + var flag = bytes[offset++]; + var compressRoute = flag & MSG_COMPRESS_ROUTE_MASK; + var type = (flag >> 1) & MSG_TYPE_MASK; + + // parse id + if(msgHasId(type)) { + var m = parseInt(bytes[offset]); + var i = 0; + do{ + var m = parseInt(bytes[offset]); + id = id + ((m & 0x7f) * Math.pow(2,(7*i))); + offset++; + i++; + }while(m >= 128); + } + + // parse route + if(msgHasRoute(type)) { + if(compressRoute) { + route = (bytes[offset++]) << 8 | bytes[offset++]; + } else { + var routeLen = bytes[offset++]; + if(routeLen) { + route = new ByteArray(routeLen); + copyArray(route, 0, bytes, offset, routeLen); + route = Protocol.strdecode(route); + } else { + route = ''; + } + offset += routeLen; + } + } + + // parse body + var bodyLen = bytesLen - offset; + var body = new ByteArray(bodyLen); + + copyArray(body, 0, bytes, offset, bodyLen); + + return {'id': id, 'type': type, 'compressRoute': compressRoute, + 'route': route, 'body': body}; + }; + + var copyArray = function(dest, doffset, src, soffset, length) { + if('function' === typeof src.copy) { + // Buffer + src.copy(dest, doffset, soffset, soffset + length); + } else { + // Uint8Array + for(var index=0; index>= 7; + } while(id > 0); + return len; + }; + + var encodeMsgFlag = function(type, compressRoute, buffer, offset) { + if(type !== Message.TYPE_REQUEST && type !== Message.TYPE_NOTIFY && + type !== Message.TYPE_RESPONSE && type !== Message.TYPE_PUSH) { + throw new Error('unkonw message type: ' + type); + } + + buffer[offset] = (type << 1) | (compressRoute ? 1 : 0); + + return offset + MSG_FLAG_BYTES; + }; + + var encodeMsgId = function(id, buffer, offset) { + do{ + var tmp = id % 128; + var next = Math.floor(id/128); + + if(next !== 0){ + tmp = tmp + 128; + } + buffer[offset++] = tmp; + + id = next; + } while(id !== 0); + + return offset; + }; + + var encodeMsgRoute = function(compressRoute, route, buffer, offset) { + if (compressRoute) { + if(route > MSG_ROUTE_CODE_MAX){ + throw new Error('route number is overflow'); + } + + buffer[offset++] = (route >> 8) & 0xff; + buffer[offset++] = route & 0xff; + } else { + if(route) { + buffer[offset++] = route.length & 0xff; + copyArray(buffer, offset, route, 0, route.length); + offset += route.length; + } else { + buffer[offset++] = 0; + } + } + + return offset; + }; + + var encodeMsgBody = function(msg, buffer, offset) { + copyArray(buffer, offset, msg, 0, msg.length); + return offset + msg.length; + }; + + if(typeof(window) != "undefined") { + window.Protocol = Protocol; + } +})(typeof(window)=="undefined" ? module.exports : (this.Protocol = {}),typeof(window)=="undefined" ? Buffer : Uint8Array, this); diff --git a/examples/customerroute/onemaster/web/starx-wsclient.js b/examples/customerroute/onemaster/web/starx-wsclient.js new file mode 100644 index 0000000..7bf20ad --- /dev/null +++ b/examples/customerroute/onemaster/web/starx-wsclient.js @@ -0,0 +1,573 @@ +(function() { + function Emitter(obj) { + if (obj) return mixin(obj); + } + /** + * Mixin the emitter properties. + * + * @param {Object} obj + * @return {Object} + * @api private + */ + + function mixin(obj) { + for (var key in Emitter.prototype) { + obj[key] = Emitter.prototype[key]; + } + return obj; + } + + /** + * Listen on the given `event` with `fn`. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.on = + Emitter.prototype.addEventListener = function(event, fn){ + this._callbacks = this._callbacks || {}; + (this._callbacks[event] = this._callbacks[event] || []) + .push(fn); + return this; + }; + + /** + * Adds an `event` listener that will be invoked a single + * time then automatically removed. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.once = function(event, fn){ + var self = this; + this._callbacks = this._callbacks || {}; + + function on() { + self.off(event, on); + fn.apply(this, arguments); + } + + on.fn = fn; + this.on(event, on); + return this; + }; + + /** + * Remove the given callback for `event` or all + * registered callbacks. + * + * @param {String} event + * @param {Function} fn + * @return {Emitter} + * @api public + */ + + Emitter.prototype.off = + Emitter.prototype.removeListener = + Emitter.prototype.removeAllListeners = + Emitter.prototype.removeEventListener = function(event, fn){ + this._callbacks = this._callbacks || {}; + + // all + if (0 == arguments.length) { + this._callbacks = {}; + return this; + } + + // specific event + var callbacks = this._callbacks[event]; + if (!callbacks) return this; + + // remove all handlers + if (1 == arguments.length) { + delete this._callbacks[event]; + return this; + } + + // remove specific handler + var cb; + for (var i = 0; i < callbacks.length; i++) { + cb = callbacks[i]; + if (cb === fn || cb.fn === fn) { + callbacks.splice(i, 1); + break; + } + } + return this; + }; + + /** + * Emit `event` with the given args. + * + * @param {String} event + * @param {Mixed} ... + * @return {Emitter} + */ + + Emitter.prototype.emit = function(event){ + this._callbacks = this._callbacks || {}; + var args = [].slice.call(arguments, 1) + , callbacks = this._callbacks[event]; + + if (callbacks) { + callbacks = callbacks.slice(0); + for (var i = 0, len = callbacks.length; i < len; ++i) { + callbacks[i].apply(this, args); + } + } + + return this; + }; + + /** + * Return array of callbacks for `event`. + * + * @param {String} event + * @return {Array} + * @api public + */ + + Emitter.prototype.listeners = function(event){ + this._callbacks = this._callbacks || {}; + return this._callbacks[event] || []; + }; + + /** + * Check if this emitter has `event` handlers. + * + * @param {String} event + * @return {Boolean} + * @api public + */ + + Emitter.prototype.hasListeners = function(event){ + return !! this.listeners(event).length; + }; + var JS_WS_CLIENT_TYPE = 'js-websocket'; + var JS_WS_CLIENT_VERSION = '0.0.1'; + + var Protocol = window.Protocol; + var decodeIO_encoder = null; + var decodeIO_decoder = null; + var Package = Protocol.Package; + var Message = Protocol.Message; + var EventEmitter = Emitter; + var rsa = window.rsa; + + if(typeof(window) != "undefined" && typeof(sys) != 'undefined' && sys.localStorage) { + window.localStorage = sys.localStorage; + } + + var RES_OK = 200; + var RES_FAIL = 500; + var RES_OLD_CLIENT = 501; + + if (typeof Object.create !== 'function') { + Object.create = function (o) { + function F() {} + F.prototype = o; + return new F(); + }; + } + + var root = window; + var starx = Object.create(EventEmitter.prototype); // object extend from object + root.starx = starx; + var socket = null; + var reqId = 0; + var callbacks = {}; + var handlers = {}; + //Map from request id to route + var routeMap = {}; + var dict = {}; // route string to code + var abbrs = {}; // code to route string + + var heartbeatInterval = 0; + var heartbeatTimeout = 0; + var nextHeartbeatTimeout = 0; + var gapThreshold = 100; // heartbeat gap threashold + var heartbeatId = null; + var heartbeatTimeoutId = null; + var handshakeCallback = null; + + var decode = null; + var encode = null; + + var reconnect = false; + var reconncetTimer = null; + var reconnectUrl = null; + var reconnectAttempts = 0; + var reconnectionDelay = 5000; + var DEFAULT_MAX_RECONNECT_ATTEMPTS = 10; + + var useCrypto; + + var handshakeBuffer = { + 'sys': { + type: JS_WS_CLIENT_TYPE, + version: JS_WS_CLIENT_VERSION, + rsa: {} + }, + 'user': { + } + }; + + var initCallback = null; + + starx.init = function(params, cb) { + initCallback = cb; + var host = params.host; + var port = params.port; + var path = params.path; + + encode = params.encode || defaultEncode; + decode = params.decode || defaultDecode; + + var url = 'ws://' + host; + if(port) { + url += ':' + port; + } + + if(path) { + url += path; + } + + handshakeBuffer.user = params.user; + if(params.encrypt) { + useCrypto = true; + rsa.generate(1024, "10001"); + var data = { + rsa_n: rsa.n.toString(16), + rsa_e: rsa.e + }; + handshakeBuffer.sys.rsa = data; + } + handshakeCallback = params.handshakeCallback; + connect(params, url, cb); + }; + + var defaultDecode = starx.decode = function(data) { + var msg = Message.decode(data); + + if(msg.id > 0){ + msg.route = routeMap[msg.id]; + delete routeMap[msg.id]; + if(!msg.route){ + return; + } + } + + msg.body = deCompose(msg); + return msg; + }; + + var defaultEncode = starx.encode = function(reqId, route, msg) { + var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY; + + if(decodeIO_encoder && decodeIO_encoder.lookup(route)) { + var Builder = decodeIO_encoder.build(route); + msg = new Builder(msg).encodeNB(); + } else { + msg = Protocol.strencode(JSON.stringify(msg)); + } + + var compressRoute = 0; + if(dict && dict[route]) { + route = dict[route]; + compressRoute = 1; + } + + return Message.encode(reqId, type, compressRoute, route, msg); + }; + + var connect = function(params, url, cb) { + console.log('connect to ' + url); + + var params = params || {}; + var maxReconnectAttempts = params.maxReconnectAttempts || DEFAULT_MAX_RECONNECT_ATTEMPTS; + reconnectUrl = url; + + var onopen = function(event) { + if(!!reconnect) { + starx.emit('reconnect'); + } + reset(); + var obj = Package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(handshakeBuffer))); + send(obj); + }; + var onmessage = function(event) { + processPackage(Package.decode(event.data), cb); + // new package arrived, update the heartbeat timeout + if(heartbeatTimeout) { + nextHeartbeatTimeout = Date.now() + heartbeatTimeout; + } + }; + var onerror = function(event) { + starx.emit('io-error', event); + console.error('socket error: ', event); + }; + var onclose = function(event) { + starx.emit('close',event); + starx.emit('disconnect', event); + console.log('socket close: ', event); + if(!!params.reconnect && reconnectAttempts < maxReconnectAttempts) { + reconnect = true; + reconnectAttempts++; + reconncetTimer = setTimeout(function() { + connect(params, reconnectUrl, cb); + }, reconnectionDelay); + reconnectionDelay *= 2; + } + }; + socket = new WebSocket(url); + socket.binaryType = 'arraybuffer'; + socket.onopen = onopen; + socket.onmessage = onmessage; + socket.onerror = onerror; + socket.onclose = onclose; + }; + + starx.disconnect = function() { + if(socket) { + if(socket.disconnect) socket.disconnect(); + if(socket.close) socket.close(); + console.log('disconnect'); + socket = null; + } + + if(heartbeatId) { + clearTimeout(heartbeatId); + heartbeatId = null; + } + if(heartbeatTimeoutId) { + clearTimeout(heartbeatTimeoutId); + heartbeatTimeoutId = null; + } + }; + + var reset = function() { + reconnect = false; + reconnectionDelay = 1000 * 5; + reconnectAttempts = 0; + clearTimeout(reconncetTimer); + }; + + starx.request = function(route, msg, cb) { + if(arguments.length === 2 && typeof msg === 'function') { + cb = msg; + msg = {}; + } else { + msg = msg || {}; + } + route = route || msg.route; + if(!route) { + return; + } + + reqId++; + sendMessage(reqId, route, msg); + + callbacks[reqId] = cb; + routeMap[reqId] = route; + }; + + starx.notify = function(route, msg) { + msg = msg || {}; + sendMessage(0, route, msg); + }; + + var sendMessage = function(reqId, route, msg) { + if(useCrypto) { + msg = JSON.stringify(msg); + var sig = rsa.signString(msg, "sha256"); + msg = JSON.parse(msg); + msg['__crypto__'] = sig; + } + + if(encode) { + msg = encode(reqId, route, msg); + } + + var packet = Package.encode(Package.TYPE_DATA, msg); + send(packet); + }; + + var send = function(packet) { + socket.send(packet.buffer); + }; + + var handler = {}; + + var heartbeat = function(data) { + if(!heartbeatInterval) { + // no heartbeat + return; + } + + var obj = Package.encode(Package.TYPE_HEARTBEAT); + if(heartbeatTimeoutId) { + clearTimeout(heartbeatTimeoutId); + heartbeatTimeoutId = null; + } + + if(heartbeatId) { + // already in a heartbeat interval + return; + } + heartbeatId = setTimeout(function() { + heartbeatId = null; + send(obj); + + nextHeartbeatTimeout = Date.now() + heartbeatTimeout; + heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, heartbeatTimeout); + }, heartbeatInterval); + }; + + var heartbeatTimeoutCb = function() { + var gap = nextHeartbeatTimeout - Date.now(); + if(gap > gapThreshold) { + heartbeatTimeoutId = setTimeout(heartbeatTimeoutCb, gap); + } else { + console.error('server heartbeat timeout'); + starx.emit('heartbeat timeout'); + starx.disconnect(); + } + }; + + var handshake = function(data) { + data = JSON.parse(Protocol.strdecode(data)); + if(data.code === RES_OLD_CLIENT) { + starx.emit('error', 'client version not fullfill'); + return; + } + + if(data.code !== RES_OK) { + starx.emit('error', 'handshake fail'); + return; + } + + handshakeInit(data); + + var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK); + send(obj); + if(initCallback) { + initCallback(socket); + } + }; + + var onData = function(data) { + var msg = data; + if(decode) { + msg = decode(msg); + } + processMessage(starx, msg); + }; + + var onKick = function(data) { + data = JSON.parse(Protocol.strdecode(data)); + starx.emit('onKick', data); + }; + + handlers[Package.TYPE_HANDSHAKE] = handshake; + handlers[Package.TYPE_HEARTBEAT] = heartbeat; + handlers[Package.TYPE_DATA] = onData; + handlers[Package.TYPE_KICK] = onKick; + + var processPackage = function(msgs) { + if(Array.isArray(msgs)) { + for(var i=0; i