Skip to content

Commit

Permalink
Merge pull request #3 from alehed/feature/new_slowlink_msg
Browse files Browse the repository at this point in the history
New slowlink msg
  • Loading branch information
notedit committed May 17, 2020
2 parents a152adf + bd0773c commit 10eb8b9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
38 changes: 23 additions & 15 deletions janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (gateway *Gateway) send(msg map[string]interface{}, transaction chan interf
return
}

if debug {
// log message being sent
var log bytes.Buffer
json.Indent(&log, data, ">", " ")
log.Write([]byte("\n"))
log.WriteTo(os.Stdout)
}

gateway.writeMu.Lock()
err = gateway.conn.WriteMessage(websocket.TextMessage, data)
gateway.writeMu.Unlock()
Expand Down Expand Up @@ -140,7 +148,7 @@ func (gateway *Gateway) recv() {
if debug {
// log message being sent
var log bytes.Buffer
json.Indent(&log, data, ">", " ")
json.Indent(&log, data, "<", " ")
log.Write([]byte("\n"))
log.WriteTo(os.Stdout)
}
Expand All @@ -158,7 +166,7 @@ func (gateway *Gateway) recv() {
}

// Pass message on from here
if base.Id == "" {
if base.ID == "" {
// Is this a Handle event?
if base.Handle == 0 {
// Error()
Expand All @@ -185,7 +193,7 @@ func (gateway *Gateway) recv() {
go passMsg(handle.Events, msg)
}
} else {
id, _ := strconv.ParseUint(base.Id, 10, 64)
id, _ := strconv.ParseUint(base.ID, 10, 64)
// Lookup Transaction
gateway.Lock()
transaction := gateway.transactions[id]
Expand Down Expand Up @@ -235,22 +243,22 @@ func (gateway *Gateway) Create() (*Session, error) {
// Create new session
session := new(Session)
session.gateway = gateway
session.Id = success.Data.Id
session.ID = success.Data.ID
session.Handles = make(map[uint64]*Handle)
session.Events = make(chan interface{}, 2)

// Store this session
gateway.Lock()
gateway.Sessions[session.Id] = session
gateway.Sessions[session.ID] = session
gateway.Unlock()

return session, nil
}

// Session represents a session instance on the Janus Gateway.
type Session struct {
// Id is the session_id of this session
Id uint64
// ID is the session_id of this session
ID uint64

// Handles is a map of plugin handles within this session
Handles map[uint64]*Handle
Expand All @@ -265,7 +273,7 @@ type Session struct {
}

func (session *Session) send(msg map[string]interface{}, transaction chan interface{}) {
msg["session_id"] = session.Id
msg["session_id"] = session.ID
session.gateway.send(msg, transaction)
}

Expand All @@ -288,11 +296,11 @@ func (session *Session) Attach(plugin string) (*Handle, error) {

handle := new(Handle)
handle.session = session
handle.Id = success.Data.Id
handle.ID = success.Data.ID
handle.Events = make(chan interface{}, 8)

session.Lock()
session.Handles[handle.Id] = handle
session.Handles[handle.ID] = handle
session.Unlock()

return handle, nil
Expand Down Expand Up @@ -333,16 +341,16 @@ func (session *Session) Destroy() (*AckMsg, error) {

// Remove this session from the gateway
session.gateway.Lock()
delete(session.gateway.Sessions, session.Id)
delete(session.gateway.Sessions, session.ID)
session.gateway.Unlock()

return ack, nil
}

// Handle represents a handle to a plugin instance on the Gateway.
type Handle struct {
// Id is the handle_id of this plugin handle
Id uint64
// ID is the handle_id of this plugin handle
ID uint64

// Type // pub or sub
Type string
Expand All @@ -358,7 +366,7 @@ type Handle struct {
}

func (handle *Handle) send(msg map[string]interface{}, transaction chan interface{}) {
msg["handle_id"] = handle.Id
msg["handle_id"] = handle.ID
handle.session.send(msg, transaction)
}

Expand Down Expand Up @@ -471,7 +479,7 @@ func (handle *Handle) Detach() (*AckMsg, error) {

// Remove this handle from the session
handle.session.Lock()
delete(handle.session.Handles, handle.Id)
delete(handle.session.Handles, handle.ID)
handle.session.Unlock()

return ack, nil
Expand Down
17 changes: 8 additions & 9 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
// type. The BaseMsg type extracts the following JSON from the message:
// {
// "janus": <Type>,
// "transaction": <Id>,
// "transaction": <ID>,
// "session_id": <Session>,
// "sender": <Handle>
// }
// The Type field is inspected to determine which concrete type
// to decode the message to, while the other fields (Id/Session/Handle) are
// to decode the message to, while the other fields (ID/Session/Handle) are
// inspected to determine where the message should be delivered. Messages
// with an Id field defined are considered responses to previous requests, and
// will be passed directly to requester. Messages without an Id field are
// with an ID field defined are considered responses to previous requests, and
// will be passed directly to requester. Messages without an ID field are
// considered unsolicited events from the gateway and are expected to have
// both Session and Handle fields defined. They will be passed to the Events
// channel of the related Handle and can be read from there.


package janus

var msgtypes = map[string]func() interface{}{
Expand All @@ -36,7 +35,7 @@ var msgtypes = map[string]func() interface{}{

type BaseMsg struct {
Type string `json:"janus"`
Id string `json:"transaction"`
ID string `json:"transaction"`
Session uint64 `json:"session_id"`
Handle uint64 `json:"sender"`
}
Expand All @@ -62,7 +61,7 @@ type SuccessMsg struct {
}

type SuccessData struct {
Id uint64
ID uint64
}

type DetachedMsg struct{}
Expand All @@ -75,7 +74,7 @@ type InfoMsg struct {
DataChannels bool `json:"data_channels"`
IPv6 bool `json:"ipv6"`
LocalIP string `json:"local-ip"`
ICE_TCP bool `json:"ice-tcp"`
IceTCP bool `json:"ice-tcp"`
Transports map[string]PluginInfo
Plugins map[string]PluginInfo
}
Expand Down Expand Up @@ -113,7 +112,7 @@ type TimeoutMsg struct {

type SlowLinkMsg struct {
Uplink bool
Nacks int64
Lost int64
}

type MediaMsg struct {
Expand Down

0 comments on commit 10eb8b9

Please sign in to comment.