Skip to content

Commit

Permalink
refactor to by able to unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Nov 9, 2020
1 parent 56dfbad commit d49de8c
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions rpc/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ import (
"go.uber.org/zap"
)

type reqID uint64
type subscription int
type result interface{}
type callBackInfo struct {
stream chan result
reflectType reflect.Type
requestID uint64
subscription uint64
stream chan result
reflectType reflect.Type
}

type WSClient struct {
currentID uint64
rpcURL string
conn *websocket.Conn
lock sync.Mutex
pendingCallbacks map[reqID]*callBackInfo
callbacks map[subscription]*callBackInfo
currentID uint64
rpcURL string
conn *websocket.Conn
lock sync.Mutex
callbacksByRequestID map[uint64]*callBackInfo
callbacksBySubscription map[uint64]*callBackInfo
}

func NewWSClient(rpcURL string) (*WSClient, error) {
c := &WSClient{
currentID: 0,
rpcURL: rpcURL,
pendingCallbacks: map[reqID]*callBackInfo{},
callbacks: map[subscription]*callBackInfo{},
currentID: 0,
rpcURL: rpcURL,
callbacksByRequestID: map[uint64]*callBackInfo{},
callbacksBySubscription: map[uint64]*callBackInfo{},
}

conn, _, err := websocket.DefaultDialer.Dial(rpcURL, nil)
Expand Down Expand Up @@ -82,23 +82,23 @@ func (c *WSClient) receiveMessages() {
// that number will be associated to all future message destine to this request
if gjson.GetBytes(message, "id").Exists() {
zlog.Info("received subscription for id")
id := reqID(gjson.GetBytes(message, "id").Int())
sub := subscription(gjson.GetBytes(message, "result").Int())
id := uint64(gjson.GetBytes(message, "id").Int())
sub := uint64(gjson.GetBytes(message, "result").Int())

//moving pending callback info to the actual callbacks list
//moving pending callback info to the actual callbacksBySubscription list
c.lock.Lock()
callBack := c.pendingCallbacks[id]
delete(c.pendingCallbacks, id)
c.callbacks[sub] = callBack
callBack := c.callbacksByRequestID[id]
callBack.subscription = sub
c.callbacksBySubscription[sub] = callBack
c.lock.Unlock()

zlog.Info("move sub from pending to callback", zap.Uint64("id", uint64(id)), zap.Uint64("subscription", uint64(sub)))
continue
}

//getting the callback
sub := subscription(gjson.GetBytes(message, "params.subscription").Int())
callBack := c.callbacks[sub]
sub := uint64(gjson.GetBytes(message, "params.subscription").Int())
callBack := c.callbacksBySubscription[sub]

//getting and instantiate the return type for the call back.
resultType := reflect.New(callBack.reflectType)
Expand Down Expand Up @@ -203,12 +203,15 @@ func (c *WSClient) ProgramSubscribe(programID string, commitment CommitmentType)
return nil, 0, fmt.Errorf("program subscribe: write message: %c", err)
}

c.pendingCallbacks[reqID(id)] = &callBackInfo{
c.callbacksByRequestID[id] = &callBackInfo{
requestID: id,
stream: stream,
reflectType: reflect.TypeOf(ProgramWSResult{}),
}

return stream, id, nil
}

func (c *WSClient) ProgramUnsubscribe(reqID int) {}
func (c *WSClient) ProgramUnsubscribe(reqID int) {

}

0 comments on commit d49de8c

Please sign in to comment.