Skip to content

Commit

Permalink
Update mqtt/client/client.go
Browse files Browse the repository at this point in the history
  • Loading branch information
yosssi committed Dec 24, 2014
1 parent 274c2cf commit a7e4625
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 5 deletions.
67 changes: 63 additions & 4 deletions mqtt/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ import (
"sync"
"time"

"github.com/yosssi/gmq/mqtt"
"github.com/yosssi/gmq/mqtt/packet"
)

// Multiple errors string format
const strErrMulti = "error (%q) occurred while handling the other error (%q)"

// Maximum Packet Identifier
const maxPacketID = 65535

// Error values
var (
ErrAlreadyConnected = errors.New("the Client has already connected to the Server")
ErrNotYetConnected = errors.New("the Client has not yet connected to the Server")
ErrCONNACKTimeout = errors.New("the CONNACK Packet was not received within a reasonalbe amount of time")
ErrPINGRESPTimeout = errors.New("the PINGRESP Packet was not received within a reasonalbe amount of time")
ErrPacketIDExhaused = errors.New("Packet Identifiers are exhausted")
)

// Client represents a Client.
Expand Down Expand Up @@ -81,10 +86,7 @@ func (cli *Client) Connect(opts *ConnectOptions) error {
// Create a Session or reuse the current Session.
if opts.CleanSession || cli.sess == nil {
// Create a Session and set it to the Client.
cli.sess = &session{
cleanSession: opts.CleanSession,
clientID: opts.ClientID,
}
cli.sess = newSession(opts.CleanSession, opts.ClientID)
} else {
// Reuse the Session and set its Client Identifier to the options.
opts.ClientID = cli.sess.clientID
Expand Down Expand Up @@ -212,17 +214,51 @@ func (cli *Client) Publish(opts *PublishOptions) error {
opts = &PublishOptions{}
}

// Define a Packet Identifier.
var packetID uint16

if opts.QoS != mqtt.QoS0 {
// Lock for reading and updating the Session.
cli.muSess.Lock()

// Define an error.
var err error

// Generate a Packet Identifer.
packetID, err = cli.generatePacketID()
if err != nil {
// Unlock.
cli.muSess.Unlock()

return err
}
}

// Create a PUBLISH Packet.
p, err := packet.NewPUBLISH(&packet.PUBLISHOptions{
QoS: opts.QoS,
Retain: opts.Retain,
TopicName: []byte(opts.TopicName),
PacketID: packetID,
Message: []byte(opts.Message),
})
if err != nil {
if opts.QoS != mqtt.QoS0 {
// Unlock.
cli.muSess.Unlock()
}

return err
}

if opts.QoS != mqtt.QoS0 {
// Set the Packet to the Session.
cli.sess.sendingPackets[packetID] = p

// Unlock.
cli.muSess.Unlock()
}

// Send the Packet to the Server.
cli.conn.send <- p

Expand Down Expand Up @@ -556,6 +592,29 @@ func (cli *Client) sendPackets(keepAlive time.Duration, pingrespTimeout time.Dur
}
}

// generatePacketID generates and returns a Packet Identifier.
func (cli *Client) generatePacketID() (uint16, error) {
// Define a Packet Identifier.
var id uint16

for {
// Find a Packet Identifier which does not used.
if _, exist := cli.sess.sendingPackets[id]; !exist {
// Return the Packet Identifier.
return id, nil
}

if id == maxPacketID {
break
}

id++
}

// Return an error if available ids are not found.
return 0, ErrPacketIDExhaused
}

// New creates and returns a Client.
func New(opts *Options) *Client {
// Initialize the options.
Expand Down
18 changes: 17 additions & 1 deletion mqtt/client/session.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
package client

// session represents a Session which is a stateful interaction between a Client and a Server.
import "github.com/yosssi/gmq/mqtt/packet"

// session represents a Session which is a stateful interaction
// between a Client and a Server.
type session struct {
// cleanSession is the Clean Session.
cleanSession bool
// clientID is the Client Identifier.
clientID []byte

// sendingPackets contains the pairs of the Packet Identifier
// and the Packet.
sendingPackets map[uint16]packet.Packet
}

// newSession creates and returns a Session.
func newSession(cleanSession bool, clientID []byte) *session {
return &session{
cleanSession: cleanSession,
clientID: clientID,
sendingPackets: make(map[uint16]packet.Packet),
}
}

0 comments on commit a7e4625

Please sign in to comment.