From a74a03b2a4e564f0435dc6dd77c6cd31105a1eaa Mon Sep 17 00:00:00 2001 From: razr Date: Mon, 30 Nov 2015 17:24:45 +0800 Subject: [PATCH] pando protocol binary implementation. --- pkg/mqtt/mqtt_svr.go | 2 +- pkg/protocol/protocol.go | 142 +++++++++++++++++++----- pkg/protocol/protocol_test.go | 150 ++++++++++++++++++++++++++ pkg/protocol/structure.go | 47 ++++++++ services/mqttaccess/access.go | 9 +- services/mqttaccess/main.go | 10 +- services/mqttaccess/topic_handlers.go | 1 + 7 files changed, 329 insertions(+), 32 deletions(-) create mode 100644 pkg/protocol/protocol_test.go create mode 100644 pkg/protocol/structure.go create mode 100644 services/mqttaccess/topic_handlers.go diff --git a/pkg/mqtt/mqtt_svr.go b/pkg/mqtt/mqtt_svr.go index e1755fd..30f9355 100644 --- a/pkg/mqtt/mqtt_svr.go +++ b/pkg/mqtt/mqtt_svr.go @@ -13,7 +13,7 @@ type MqttSvrHandler struct { Wildcards []Wild } -func NewMqttSvrHanler() *MqttSvrHandler { +func NewMqttSvrHandler() *MqttSvrHandler { return &MqttSvrHandler{} } diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index a2f3096..a41be86 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -1,47 +1,133 @@ package protocol import ( + "bytes" + "encoding/binary" "github.com/PandoCloud/pando-cloud/pkg/tlv" ) -type CommandEventHead struct { - Flag uint8 - Timestamp uint64 - Token [16]byte - SubDeviceid uint16 - No uint16 - Priority uint16 - ParamsCount uint16 +type Payload interface { + Marshal() ([]byte, error) + UnMarshal([]byte) error } -type Command struct { - Head CommandEventHead - Params []tlv.TLV +func (c *Command) Marshal() ([]byte, error) { + buffer := new(bytes.Buffer) + err := binary.Write(buffer, binary.BigEndian, c.Head) + if err != nil { + return nil, err + } + + for _, param := range c.Params { + err = binary.Write(buffer, binary.BigEndian, param.ToBinary()) + if err != nil { + return nil, err + } + } + + return buffer.Bytes(), nil } -type Event struct { - Head CommandEventHead - Params []tlv.TLV +func (c *Command) UnMarshal(buf []byte) error { + n := len(buf) + r := bytes.NewReader(buf) + err := binary.Read(r, binary.BigEndian, &c.Head) + if err != nil { + return err + } + c.Params = []tlv.TLV{} + for i := binary.Size(c.Head); i < n; { + tlv := tlv.TLV{} + tlv.FromBinary(r) + i += int(tlv.Length()) + c.Params = append(c.Params, tlv) + } + + return nil } -type DataHead struct { - Flag uint8 - Timestamp uint64 - Token [16]byte +func (e *Event) Marshal() ([]byte, error) { + buffer := new(bytes.Buffer) + err := binary.Write(buffer, binary.BigEndian, e.Head) + if err != nil { + return nil, err + } + + for _, param := range e.Params { + err = binary.Write(buffer, binary.BigEndian, param.ToBinary()) + if err != nil { + return nil, err + } + } + + return buffer.Bytes(), nil } -type Data struct { - Head DataHead - SubData []SubData +func (e *Event) UnMarshal(buf []byte) error { + n := len(buf) + r := bytes.NewReader(buf) + err := binary.Read(r, binary.BigEndian, &e.Head) + if err != nil { + return err + } + e.Params = []tlv.TLV{} + for i := binary.Size(e.Head); i < n; { + tlv := tlv.TLV{} + tlv.FromBinary(r) + i += int(tlv.Length()) + e.Params = append(e.Params, tlv) + } + + return nil } -type SubDataHead struct { - SubDeviceid uint16 - PropertyNum uint16 - ParamsCount uint16 +func (d *Data) Marshal() ([]byte, error) { + buffer := new(bytes.Buffer) + err := binary.Write(buffer, binary.BigEndian, d.Head) + if err != nil { + return nil, err + } + + for _, sub := range d.SubData { + err = binary.Write(buffer, binary.BigEndian, sub.Head) + if err != nil { + return nil, err + } + for _, param := range sub.Params { + err = binary.Write(buffer, binary.BigEndian, param.ToBinary()) + if err != nil { + return nil, err + } + } + } + + return buffer.Bytes(), nil } -type SubData struct { - Head SubDataHead - Params []tlv.TLV +func (d *Data) UnMarshal(buf []byte) error { + n := len(buf) + r := bytes.NewReader(buf) + err := binary.Read(r, binary.BigEndian, &d.Head) + if err != nil { + return err + } + d.SubData = []SubData{} + for i := binary.Size(d.Head); i < n; { + sub := SubData{} + err = binary.Read(r, binary.BigEndian, &sub.Head) + if err != nil { + return err + } + i += int(binary.Size(sub.Head)) + sub.Params = []tlv.TLV{} + for j := 0; j < int(sub.Head.ParamsCount); j++ { + param := tlv.TLV{} + param.FromBinary(r) + i += int(param.Length()) + sub.Params = append(sub.Params, param) + } + d.SubData = append(d.SubData, sub) + } + + return nil } diff --git a/pkg/protocol/protocol_test.go b/pkg/protocol/protocol_test.go new file mode 100644 index 0000000..9af7ecb --- /dev/null +++ b/pkg/protocol/protocol_test.go @@ -0,0 +1,150 @@ +package protocol + +import ( + "github.com/PandoCloud/pando-cloud/pkg/tlv" + "reflect" + "testing" + "time" +) + +func TestCommand(t *testing.T) { + param := []interface{}{uint32(1), float32(3.2), []byte{'1', '2'}} + params, err := tlv.MakeTLVs(param) + if err != nil { + t.Fatal(err) + } + + payloadHead := CommandEventHead{ + Flag: 0, + Timestamp: uint64(time.Now().Unix()) * 1000, + SubDeviceid: uint16(2), + No: uint16(12), + Priority: uint16(1), + ParamsCount: uint16(len(param)), + } + payload := &Command{ + Head: payloadHead, + Params: params, + } + + buf, err := payload.Marshal() + if err != nil { + if err != nil { + t.Error(err) + } + } + + payload2 := &Command{} + + err = payload2.UnMarshal(buf) + if err != nil { + if err != nil { + t.Error(err) + } + } + + if !reflect.DeepEqual(payload, payload2) { + t.Errorf("test command payload failed, want %v, got %v", payload, payload2) + } +} + +func TestEvent(t *testing.T) { + param := []interface{}{uint32(1), float32(3.2), []byte{'1', '2'}} + params, err := tlv.MakeTLVs(param) + if err != nil { + t.Fatal(err) + } + + payloadHead := CommandEventHead{ + Flag: 0, + Timestamp: uint64(time.Now().Unix()) * 1000, + SubDeviceid: uint16(2), + No: uint16(12), + Priority: uint16(1), + ParamsCount: uint16(len(param)), + } + payload := &Event{ + Head: payloadHead, + Params: params, + } + + buf, err := payload.Marshal() + if err != nil { + if err != nil { + t.Error(err) + } + } + + payload2 := &Event{} + + err = payload2.UnMarshal(buf) + if err != nil { + if err != nil { + t.Error(err) + } + } + + if !reflect.DeepEqual(payload, payload2) { + t.Errorf("test event payload failed, want %v, got %v", payload, payload2) + } +} + +func TestData(t *testing.T) { + payloadHead := DataHead{ + Flag: 0, + Timestamp: uint64(time.Now().Unix() * 1000), + } + param1 := []interface{}{uint32(3), float32(1.2), int64(10)} + params1, err := tlv.MakeTLVs(param1) + if err != nil { + t.Fatal(err) + } + sub1 := SubData{ + Head: SubDataHead{ + SubDeviceid: uint16(1), + PropertyNum: uint16(1), + ParamsCount: uint16(len(params1)), + }, + Params: params1, + } + param2 := []interface{}{uint32(4), int64(11)} + params2, err := tlv.MakeTLVs(param2) + if err != nil { + t.Fatal(err) + } + sub2 := SubData{ + Head: SubDataHead{ + SubDeviceid: uint16(1), + PropertyNum: uint16(2), + ParamsCount: uint16(len(params2)), + }, + Params: params2, + } + + payload := &Data{ + Head: payloadHead, + SubData: []SubData{}, + } + payload.SubData = append(payload.SubData, sub1) + payload.SubData = append(payload.SubData, sub2) + + buf, err := payload.Marshal() + if err != nil { + if err != nil { + t.Error(err) + } + } + + payload2 := &Data{} + + err = payload2.UnMarshal(buf) + if err != nil { + if err != nil { + t.Error(err) + } + } + + if !reflect.DeepEqual(payload, payload2) { + t.Errorf("test data payload failed, want %v, got %v", payload, payload2) + } +} diff --git a/pkg/protocol/structure.go b/pkg/protocol/structure.go new file mode 100644 index 0000000..a2f3096 --- /dev/null +++ b/pkg/protocol/structure.go @@ -0,0 +1,47 @@ +package protocol + +import ( + "github.com/PandoCloud/pando-cloud/pkg/tlv" +) + +type CommandEventHead struct { + Flag uint8 + Timestamp uint64 + Token [16]byte + SubDeviceid uint16 + No uint16 + Priority uint16 + ParamsCount uint16 +} + +type Command struct { + Head CommandEventHead + Params []tlv.TLV +} + +type Event struct { + Head CommandEventHead + Params []tlv.TLV +} + +type DataHead struct { + Flag uint8 + Timestamp uint64 + Token [16]byte +} + +type Data struct { + Head DataHead + SubData []SubData +} + +type SubDataHead struct { + SubDeviceid uint16 + PropertyNum uint16 + ParamsCount uint16 +} + +type SubData struct { + Head SubDataHead + Params []tlv.TLV +} diff --git a/services/mqttaccess/access.go b/services/mqttaccess/access.go index 48c818b..60226bb 100644 --- a/services/mqttaccess/access.go +++ b/services/mqttaccess/access.go @@ -1,14 +1,19 @@ package main import ( + "github.com/PandoCloud/pando-cloud/pkg/mqtt" "github.com/PandoCloud/pando-cloud/pkg/rpcs" "github.com/PandoCloud/pando-cloud/pkg/server" ) -type Access struct{} +type Access struct { + MqttHandler *mqtt.MqttSvrHandler +} func NewAccess() (*Access, error) { - return &Access{}, nil + return &Access{ + mqtt.NewMqttSvrHandler(), + }, nil } func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error { diff --git a/services/mqttaccess/main.go b/services/mqttaccess/main.go index 176341b..24ec7c6 100644 --- a/services/mqttaccess/main.go +++ b/services/mqttaccess/main.go @@ -12,18 +12,26 @@ func main() { return } - // register a rpc service a, err := NewAccess() if err != nil { server.Log.Fatal(err) return } + + // register a rpc service err = server.RegisterRPCHandler(a) if err != nil { server.Log.Errorf("Register RPC service Error: %s", err) return } + // register a tcp service for mqtt + err = server.RegisterTCPHandler(a.MqttHandler) + if err != nil { + server.Log.Errorf("Register TCP service Error: %s", err) + return + } + // start to run err = server.Run() if err != nil { diff --git a/services/mqttaccess/topic_handlers.go b/services/mqttaccess/topic_handlers.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/services/mqttaccess/topic_handlers.go @@ -0,0 +1 @@ +package main