Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/web/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ func getStringFromMap(jmsg map[string]any, key string) string {

func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []byte) {
var rtnErr error
var cmdType string
defer func() {
panicErr := panichandler.PanicHandler("processWSCommand", recover())
panicCtx := "processWSCommand"
if cmdType != "" {
panicCtx = fmt.Sprintf("processWSCommand:%s", cmdType)
}
panicErr := panichandler.PanicHandler(panicCtx, recover())
if panicErr != nil {
rtnErr = panicErr
}
Expand All @@ -97,6 +102,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
rtnErr = fmt.Errorf("cannot parse wscommand: %v", err)
return
}
cmdType = wsCommand.GetWSCommand()
switch cmd := wsCommand.(type) {
case *webcmd.SetBlockTermSizeWSCommand:
data := wshrpc.CommandBlockInputData{
Expand Down Expand Up @@ -137,6 +143,9 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
if rpcMsg == nil {
return
}
if rpcMsg.Command != "" {
cmdType = fmt.Sprintf("%s:%s", cmdType, rpcMsg.Command)
}
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
Expand Down
14 changes: 9 additions & 5 deletions pkg/wshutil/wshproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *WshRpcProxy) sendResponseError(msg RpcMessage, sendErr error) {
Error: sendErr.Error(),
}
respBytes, _ := json.Marshal(resp)
p.SendRpcMessage(respBytes)
p.SendRpcMessage(respBytes, "resp-error")
}

func (p *WshRpcProxy) sendAuthenticateResponse(msg RpcMessage, routeId string) {
Expand All @@ -80,7 +80,7 @@ func (p *WshRpcProxy) sendAuthenticateResponse(msg RpcMessage, routeId string) {
Data: wshrpc.CommandAuthenticateRtnData{RouteId: routeId},
}
respBytes, _ := json.Marshal(resp)
p.SendRpcMessage(respBytes)
p.SendRpcMessage(respBytes, "auth-resp")
}

func (p *WshRpcProxy) sendAuthenticateTokenResponse(msg RpcMessage, entry *shellutil.TokenSwapEntry) {
Expand All @@ -99,7 +99,7 @@ func (p *WshRpcProxy) sendAuthenticateTokenResponse(msg RpcMessage, entry *shell
},
}
respBytes, _ := json.Marshal(resp)
p.SendRpcMessage(respBytes)
p.SendRpcMessage(respBytes, "auth-token-resp")
}

func validateRpcContextFromAuth(newCtx *wshrpc.RpcContext) (string, error) {
Expand Down Expand Up @@ -249,9 +249,13 @@ func (p *WshRpcProxy) HandleAuthentication() (*wshrpc.RpcContext, error) {
}

// TODO: Figure out who is sending to closed routes and why we're not catching it
func (p *WshRpcProxy) SendRpcMessage(msg []byte) {
func (p *WshRpcProxy) SendRpcMessage(msg []byte, debugStr string) {
defer func() {
panichandler.PanicHandler("WshRpcProxy.SendRpcMessage", recover())
panicCtx := "WshRpcProxy.SendRpcMessage"
if debugStr != "" {
panicCtx = fmt.Sprintf("%s:%s", panicCtx, debugStr)
}
panichandler.PanicHandler(panicCtx, recover())
}()
p.ToRemoteCh <- msg
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/wshutil/wshrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (router *WshRouter) SendEvent(routeId string, event wps.WaveEvent) {
// nothing to do
return
}
rpc.SendRpcMessage(msgBytes)
rpc.SendRpcMessage(msgBytes, "eventrecv")
}

func (router *WshRouter) handleNoRoute(msg RpcMessage) {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute
// if we don't (we are the terminal router), then add it to our announced route map
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(input.msgBytes)
upstream.SendRpcMessage(input.msgBytes, "announce-upstream")
return
}
if msg.Source == input.fromRouteId {
Expand Down Expand Up @@ -201,12 +201,12 @@ func (router *WshRouter) getAnnouncedRoute(routeId string) string {
func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool {
rpc := router.GetRpc(routeId)
if rpc != nil {
rpc.SendRpcMessage(msgBytes)
rpc.SendRpcMessage(msgBytes, "route")
return true
}
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(msgBytes)
upstream.SendRpcMessage(msgBytes, "route-upstream")
return true
} else {
// we are the upstream, so consult our announced routes map
Expand All @@ -216,7 +216,7 @@ func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool
log.Printf("[router] no rpc for route id %q\n", routeId)
return false
}
rpc.SendRpcMessage(msgBytes)
rpc.SendRpcMessage(msgBytes, "route-local")
return true
}
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient, sh
if shouldAnnounce && !alreadyExists && router.GetUpstreamClient() != nil {
announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId}
announceBytes, _ := json.Marshal(announceMsg)
router.GetUpstreamClient().SendRpcMessage(announceBytes)
router.GetUpstreamClient().SendRpcMessage(announceBytes, "route-announce")
}
for {
msgBytes, ok := rpc.RecvRpcMessage()
Expand Down
4 changes: 2 additions & 2 deletions pkg/wshutil/wshrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ServerImpl interface {
}

type AbstractRpcClient interface {
SendRpcMessage(msg []byte)
SendRpcMessage(msg []byte, debugStr string)
RecvRpcMessage() ([]byte, bool) // blocking
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func GetRpcResponseHandlerFromContext(ctx context.Context) *RpcResponseHandler {
return rtn.(*RpcResponseHandler)
}

func (w *WshRpc) SendRpcMessage(msg []byte) {
func (w *WshRpc) SendRpcMessage(msg []byte, debugStr string) {
w.InputCh <- msg
}

Expand Down
Loading