diff --git a/build/local/linux/run.sh b/build/local/linux/run.sh index c85fc8e..cabceea 100755 --- a/build/local/linux/run.sh +++ b/build/local/linux/run.sh @@ -3,12 +3,14 @@ # init db echo 'CREATE DATABASE PandoCloud' | mysql -uroot +killall -9 httpaccess registry apiprovider devicemanager controller mqttaccess + # start services $GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost inner:80 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem & $GOPATH/bin/registry -etcd http://localhost:2379 -rpchost localhost:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost localhost -dbname PandoCloud -dbport 3306 -dbuser root -loglevel debug & $GOPATH/bin/apiprovider -etcd http://localhost:2379 -loglevel debug -httphost localhost:8888 & $GOPATH/bin/devicemanager -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20033 & $GOPATH/bin/controller -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20032 & -$GOPATH/bin/mqttaccess -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20030 & +$GOPATH/bin/mqttaccess -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20030 -tcphost localhost:1883& exit 0 \ No newline at end of file diff --git a/pkg/mqtt/connection.go b/pkg/mqtt/connection.go index 40ed53b..79c2b1a 100644 --- a/pkg/mqtt/connection.go +++ b/pkg/mqtt/connection.go @@ -166,7 +166,12 @@ func (c *Connection) RcvMsgFromClient() { } c.DeviceId = deviceid - token, _ := hex.DecodeString(msg.Password) + token, err := hex.DecodeString(msg.Password) + if err != nil { + server.Log.Warn("token format error : %v", err) + ret = RetCodeNotAuthorized + goto CLOSE + } err = c.ValidateToken(token) if err != nil { server.Log.Warn("validate token error : %v", err) diff --git a/services/mqttaccess/mqtt_provider.go b/services/mqttaccess/mqtt_provider.go index 2dd70c8..3196616 100644 --- a/services/mqttaccess/mqtt_provider.go +++ b/services/mqttaccess/mqtt_provider.go @@ -13,8 +13,12 @@ func NewMQTTProvider() *MQTTProvider { } func (mp *MQTTProvider) ValidateDeviceToken(deviceid uint64, token []byte) error { + args := rpcs.ArgsValidateDeviceAccessToken{ + Id: deviceid, + AccessToken: token, + } reply := rpcs.ReplyValidateDeviceAccessToken{} - err := server.RPCCallByName("devicemanager", "DeviceManager.ValidateDeviceAccessToken", deviceid, &reply) + err := server.RPCCallByName("devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply) if err != nil { server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err) } @@ -28,15 +32,21 @@ func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) { } } func (mp *MQTTProvider) OnDeviceOffline(deviceid uint64) { + args := rpcs.ArgsGetOffline{ + Id: deviceid, + } reply := rpcs.ReplyGetOffline{} - err := server.RPCCallByName("devicemanager", "DeviceManager.GetOffline", deviceid, &reply) + err := server.RPCCallByName("devicemanager", "DeviceManager.GetOffline", args, &reply) if err != nil { server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err) } } func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid uint64) { + args := rpcs.ArgsDeviceId{ + Id: deviceid, + } reply := rpcs.ReplyHeartBeat{} - err := server.RPCCallByName("devicemanager", "DeviceManager.HeartBeat", deviceid, &reply) + err := server.RPCCallByName("devicemanager", "DeviceManager.HeartBeat", args, &reply) if err != nil { server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err) } diff --git a/tests/device/device.go b/tests/device/device.go index 0ff8d8f..e3f3d09 100644 --- a/tests/device/device.go +++ b/tests/device/device.go @@ -4,6 +4,9 @@ import ( "encoding/hex" "encoding/json" "fmt" + MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" + "strings" + "time" ) // device register args @@ -54,23 +57,23 @@ type DeviceAuthResponse struct { type Device struct { // API URL - BrokerUrl string + Url string // basic info ProductKey string DeviceCode string Version string - // private thins + // private things id int64 secrect string token []byte access string } -func NewDevice(broker string, productkey string, code string, version string) *Device { +func NewDevice(url string, productkey string, code string, version string) *Device { return &Device{ - BrokerUrl: broker, + Url: url, ProductKey: productkey, DeviceCode: code, Version: version, @@ -83,7 +86,7 @@ func (d *Device) DoRegister() error { DeviceCode: d.DeviceCode, Version: d.Version, } - regUrl := fmt.Sprintf("%v%v", d.BrokerUrl, "/v1/devices/registration") + regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration") request, err := json.Marshal(args) if err != nil { return err @@ -112,9 +115,9 @@ func (d *Device) DoLogin() error { args := DeviceAuthArgs{ DeviceId: d.id, DeviceSecret: d.secrect, - Protocol: "http", + Protocol: "mqtt", } - regUrl := fmt.Sprintf("%v%v", d.BrokerUrl, "/v1/devices/authentication") + regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication") request, err := json.Marshal(args) if err != nil { return err @@ -142,3 +145,37 @@ func (d *Device) DoLogin() error { return nil } + +func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) { + fmt.Printf("TOPIC: %s\n", msg.Topic()) + fmt.Printf("MSG: %s\n", msg.Payload()) + topicPieces := strings.Split(msg.Topic()) + clientid := topicPieces[0] + msgtype := topicPieces[1] +} + +func (d *Device) DoAccess() error { + + //create a ClientOptions struct setting the broker address, clientid, turn + //off trace output and set the default message handler + opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access) + clientid := fmt.Sprintf("%x", d.id) + opts.SetClientID(clientid) + opts.SetUsername(clientid) // clientid as username + opts.SetPassword(hex.EncodeToString(d.token)) + opts.SetKeepAlive(30 * time.Second) + opts.SetDefaultPublishHandler(d.messageHandler) + + //create and start a client using the above ClientOptions + c := MQTT.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + // we just pause here to wait for messages + <-make(chan int) + + defer c.Disconnect(250) + + return nil +} diff --git a/tests/device/main.go b/tests/device/main.go index 10c27aa..f8ee5fb 100644 --- a/tests/device/main.go +++ b/tests/device/main.go @@ -1,16 +1,17 @@ package main import ( + "flag" "fmt" ) -const ( - TestBroker = "https://localhost" - TestProductKey = "aec003c9018b9a572ceb19720e589c375ead1a2b1fbd0d089d067128611754ff" +var ( + TestUrl = flag.String("url", "https://localhost", "login url") + TestProductKey = flag.String("productkey", "aec003c9018b9a572ceb19720e589c375ead1a2b1fbd0d089d067128611754ff", "product key") ) func main() { - dev := NewDevice(TestBroker, TestProductKey, "ffe34e", "version") + dev := NewDevice(*TestUrl, *TestProductKey, "ffe34e", "version") err := dev.DoRegister() if err != nil { @@ -24,4 +25,10 @@ func main() { return } + err = dev.DoAccess() + if err != nil { + fmt.Errorf("device access error %s", err) + return + } + }