Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugs For arpc.js #16

Closed
spxvszero opened this issue Sep 6, 2021 · 8 comments
Closed

Bugs For arpc.js #16

spxvszero opened this issue Sep 6, 2021 · 8 comments
Labels
bug Something isn't working

Comments

@spxvszero
Copy link

您好,请问一下,有关 websocket 的消息截断的问题,在 onMessage 函数下,通过 offset 去循环处理一段消息,这里使用 offset 的原因是什么?

this._onMessage = function (event) {
try {
var offset = 0;
while (offset < event.data.byteLength) {
var headArr = new Uint8Array(event.data.slice(offset, offset + 16));
var bodyLen = 0;
for (var i = _HeaderIndexBodyLenBegin; i < _HeaderIndexBodyLenEnd; i++) {
bodyLen |= (headArr[i] << ((i - _HeaderIndexBodyLenBegin) * 8)) & 0xFF;
}
var cmd = headArr[_HeaderIndexCmd];

我在使用的时候,发现 bodyLen 的长度计算不正确,例如我从服务器推送一条 Notify 的消息,在这里断点,得到的 event.data.byteLength 长度是 645,然而通过 bodyLen 计算出来的结果是 119,即使这是在 while 循环中,offset 会在下一次循环开始前偏移,但是消息已经在第一次循环里就发送给 handle 了

if (handler) {
var data = client.codec.Unmarshal(bodyArr);
handler.h(new Context(client, headArr, bodyArr, method, data));
} else {

还有另一个问题是,第二次循环中,header 的内容并不存在,计算的 method 或者其他信息都是原来消息体的一部分信息,即使想要组装数据,似乎也没法完成。

var isAsync = headArr[_HeaderIndexFlag] & _HeaderFlagMaskAsync;
var methodLen = headArr[_HeaderIndexMethodLen];
var method = new TextDecoder("utf-8").decode(event.data.slice(offset + 16, offset + 16 + methodLen));

以下是我使用的部分代码:

       //JS:
       client = new ArpcClient("ws://localhost:8888/ws", null)
       //订阅模式的 Handler
       client.handle("/broadcast/info", function(ctx) {
            console.log("[Info] ", ctx)
       });
       //请求订阅,成功之后通过上面的 Handler 获取服务器推送的消息
       client.call("/registerBroadcast", "xxxx", 5000, function(resp) {
            console.log("response ", resp)
       })


//Server:
func InitServer(port string)  {
	ln, _ := websocket.Listen("localhost:"+port, nil)
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Println("url: %v", r.URL.String())
		if r.URL.Path == "/" {
			http.ServeFile(w, r, "chat.html")
		} else if r.URL.Path == "/arpc.js" {
			http.ServeFile(w, r, "arpc.js")
		} else {
			http.NotFound(w, r)
		}
	})

	http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
	go func() {
		err := http.ListenAndServe("localhost:"+port, nil)
		if err != nil {
			fmt.Println("ListenAndServe: ", err)
			panic(err)
		}
	}()

	server := arpc.NewServer()

	/*
		Broadcast auth

		request  :
			AccessKey	string
		response :
			true or false
	*/
	server.Handler.Handle(pinApi.AddressForApi(pinApi.BROADCAST_AUTH), func(context *arpc.Context) {
		var accessKey string
		if err := context.Bind(&accessKey); err != nil {
			context.Write(code.Failed)
			return
		}
		if len(accessKey) > 0 && accessKey == default.AccessKey  {
			BroadcastClients = append(BroadcastClients, context.Client)
			context.Write(define.RPC_SUCCESS)
		}else {
			context.Write(code.Failed)
		}
	})

	//broadcast info
	go broadcastInfo()

	server.Serve(ln)
}
func broadcastServerInfo() {
	for true {
		if len(BroadcastClients) > 0 {
			log.Println(len(BroadcastClients), " clients listen Broad casting...")
                        // jsonData 是个较长的 json 数据
			msg := RPCServer.NewMessage(arpc.CmdNotify,"/broadcast/info", jsonData)
			for _, client := range BroadcastClients {
				client.PushMsg(msg, arpc.TimeZero)
			}
		}
		time.Sleep(time.Second * 1)
	}
}

@lesismal
Copy link
Owner

lesismal commented Sep 6, 2021

是否有遇到您疑惑的点相关的bug,如果遇到,请提供下能够复现的完整示例代码我这边排查下。

我先对于您疑惑的几个点做一下解释:

您好,请问一下,有关 websocket 的消息截断的问题,在 onMessage 函数下,通过 offset 去循环处理一段消息,这里使用 offset 的原因是什么?
我在使用的时候,发现 bodyLen 的长度计算不正确,例如我从服务器推送一条 Notify 的消息,在这里断点,得到的 event.data.byteLength 长度是 645,然而通过 bodyLen 计算出来的结果是 119,即使这是在 while 循环中,offset 会在下一次循环开始前偏移,但是消息已经在第一次循环里就发送给 handle 了

这是由于 server 端默认是 批次发送 的,默认提供的websocket扩展是基于gorilla/websocket.Conn进行封装实现了 net.Conn 的接口,websocket.Conn 只是作为 net.Conn 接口,当发生批次发送时,server会把多个 arpc Message 合并成一段 buffer 执行 websocket.Conn 封装的 Write:
https://github.com/lesismal/arpc/blob/master/client.go#L781
https://github.com/lesismal/arpc/blob/master/client.go#L845

所以,虽然 websocket 本身不需要处理所谓的 粘包,但在默认提供的扩展中仍存在一个完整 websocket Message 可能包含多个 arpc Message 情况,所以这里需要循环处理、而不是把一个 websocket Message buffer 就当成一个 arpc Message 的 buffer

还有另一个问题是,第二次循环中,header 的内容并不存在,计算的 method 或者其他信息都是原来消息体的一部分信息,即使想要组装数据,似乎也没法完成。

上面已经讲到,一个 websocket Message 可能包含多个 arpc Message,并且本身封装的 Write 也是调用 WriteMessage,所以可以保证一个 websocket Message 中一个或者多个 arpc Message 的完整性,不需要担心 js client 解析时遇到半个 arpc Message 的问题,所以要么 offset 已经大于 websocket Message 长度结束循环,要么 headerArr 就正常拿到完整包头:
https://github.com/lesismal/arpc/blob/master/extension/jsclient/arpc.js#L182

@spxvszero
Copy link
Author

上面我写的部分代码就是例子了……

可能比较麻烦,我重写了一个 Demo 打包发给您,在写的过程中我发现了另一个问题,就是如果使用这种订阅模式,另一个客户端如果不是 web 端,而是 server 端的话,会出现 timeout 的情况,在代码里我也一并打包给您了。

单纯运行 server 的程序,然后打开网页端就是我说的上述没有正确分段导致 websocket Message 截断的问题;
在运行 server 程序的前提下,在另一个终端运行 client 程序,就会出现新发现的 timeout 问题;

代码均在 main.go 中,要编译 client 程序,将 main() 里面的 server [部分代码注释,其余部分解开注释即可。
pinDemo.zip

@lesismal
Copy link
Owner

lesismal commented Sep 6, 2021

上面我写的部分代码就是例子了……

可能比较麻烦,我重写了一个 Demo 打包发给您,在写的过程中我发现了另一个问题,就是如果使用这种订阅模式,另一个客户端如果不是 web 端,而是 server 端的话,会出现 timeout 的情况,在代码里我也一并打包给您了。

单纯运行 server 的程序,然后打开网页端就是我说的上述没有正确分段导致 websocket Message 截断的问题;
在运行 server 程序的前提下,在另一个终端运行 client 程序,就会出现新发现的 timeout 问题;

代码均在 main.go 中,要编译 client 程序,将 main() 里面的 server [部分代码注释,其余部分解开注释即可。
pinDemo.zip

你例子中的server用的是websocket,你指的超时应该是说go client的超时吧?因为你的go client用的是tcp协议,tcp连到go的 websocket上,websocket msg解析都没通。

js client我试了下是能正常收到消息的:
image

另外代码中还有一些问题,比如:

  1. BroadcastClients 没加锁
  2. 不应该为每个新连接上来的client单独设置HandleDisconnected:
context.Client.Handler.HandleDisconnected(func(clientWhichDisconnected *arpc.Client) {
	//remove from BroadcastClients
	idx := indexOfClient(clientWhichDisconnected,BroadcastClients)
	if idx >= 0 {
		if length := len(BroadcastClients); length != 1 {
			BroadcastClients[idx] = BroadcastClients[length - 1]
			BroadcastClients = BroadcastClients[:length - 1]
		}else {
			BroadcastClients = BroadcastClients[:0]
		}
	}
});

而是应该为 server 的Handler 统一设置:

server.Handler.HandleDisconnected(...)

@lesismal
Copy link
Owner

lesismal commented Sep 6, 2021

如果想既服务 ws 又服务 tcp,可以开两个端口分别处理不同的协议,如:

arpc.DefaultHandler.Handle("/auth", func(context *arpc.Context) {...}
arpc.DefaultHandler.HandleDisconnected(...)

ln, _ := websocket.Listen("ws addr", nil)
http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
wsServer := arpc.NewServer()
go wsServer.Serve(ln)

tcpServer := arpc.NewServer()
tcpServer.Run("tcp addr")

当然你也可以自己实现一个 listener,封装一层,根据Accept后根据读到的前几个字节判断是否为http upgrade ws,是则转给 ws server 否则转给 tcp server,这样可以实现共用一个端口来服务两个不同的协议,但是会有性能损失和不必要的复杂度。

当然,你也可以使用 websocket 作为 go client 的协议,但不如直接用tcp性能好。

@spxvszero
Copy link
Author

感谢您的回复

JS Client 是能收到消息,但是被截断了,以下是浏览器的输出:

image

实际 Server 端发送的消息是:

image

也非常感谢您的建议

  1. 没有加锁是我的疏忽,由于写的是 Demo,没有考虑太多了。
  2. Server 的 HandleDisconnected 我原以为是 Server 端终止的时候会调用的回调,感谢您的建议。
  3. 我的确是想着同一个端口处理 ws 和 rpc,也感谢您的建议。

@lesismal
Copy link
Owner

lesismal commented Sep 6, 2021

JS Client 是能收到消息,但是被截断了,以下是浏览器的输出:

确实是,看了下代码,计算 body length 时多写了个 & 0xFF,导致只计算了第一个字节,稍等我修复更新上去。

感谢反馈!

@lesismal lesismal added the bug Something isn't working label Sep 6, 2021
@lesismal
Copy link
Owner

lesismal commented Sep 6, 2021

已修复: 0f639ef

请尝试最新版本的 js client

@spxvszero
Copy link
Author

赞👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants