From a684e40488433dc34a2f1267aac88a2328e5e287 Mon Sep 17 00:00:00 2001 From: Connor Hicks Date: Thu, 12 Nov 2020 20:47:25 -0500 Subject: [PATCH] split HandleMsg up and added Listen --- go.mod | 4 ++-- go.sum | 24 +++++++++++++++++++++++ hive/hive.go | 45 ++++++++++++++++++++++++++++++-------------- hive/message_test.go | 7 +------ 4 files changed, 58 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index e13da9d4..f2234633 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.14 require ( github.com/google/uuid v1.1.2 github.com/pkg/errors v0.9.1 - github.com/suborbital/grav v0.0.11 - github.com/suborbital/vektor v0.1.3 + github.com/suborbital/grav v0.1.0 + github.com/suborbital/vektor v0.2.2 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 ) diff --git a/go.sum b/go.sum index 2e9fb0bd..2647763d 100644 --- a/go.sum +++ b/go.sum @@ -1,40 +1,64 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/schollz/peerdiscovery v1.6.0/go.mod h1:hSU7N/NkfNH6AZwU/WBcDZtMABVbTfAWk/XD3XKxN+s= +github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI= github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY= github.com/sethvargo/go-envconfig v0.3.0/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0= +github.com/sethvargo/go-envconfig v0.3.2 h1:277Lb2iTpUZjUZu1qLoLa/aetwvtZbKh8wNWXmc6dSk= +github.com/sethvargo/go-envconfig v0.3.2/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/suborbital/grav v0.0.10 h1:guf0PEBwqnwFCUOReStx+RddqSN1j81x+78c1bX/MI4= github.com/suborbital/grav v0.0.10/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw= github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0= github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw= +github.com/suborbital/grav v0.1.0 h1:rTQV8TsEYf3f7xKZtEBdvKlj4MWWDuRlSN8v/kpQ/44= +github.com/suborbital/grav v0.1.0/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE= github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASctH4= github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q= github.com/suborbital/vektor v0.1.2/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.1.3 h1:rC5ic4FnjmcbizmV/WAQt67QkF6eJ7jHSsuy8IFC2bc= github.com/suborbital/vektor v0.1.3/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= +github.com/suborbital/vektor v0.2.2 h1:x3yit9RMXcP8LirkKb1f/psNev/G/iTIHo6eL/f1OBI= +github.com/suborbital/vektor v0.2.2/go.mod h1:6YQE7r6t1JcVs3twpqjXDftsLUaTNUk5YorRKHcDamI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d h1:dOiJ2n2cMwGLce/74I/QHMbnpk5GfY7InR8rczoMqRM= +golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/hive/hive.go b/hive/hive.go index 19bee4d5..119abe5e 100644 --- a/hive/hive.go +++ b/hive/hive.go @@ -1,6 +1,8 @@ package hive import ( + "encoding/json" + "github.com/pkg/errors" "github.com/suborbital/grav/grav" "github.com/suborbital/vektor/vk" @@ -8,8 +10,8 @@ import ( ) const ( - msgTypeHiveJobErr = "hive.joberr" - msgTypeHiveTypeErr = "hive.typeerr" + msgTypeHiveJobErr = "hive.joberr" + msgTypeHiveResult = "hive.result" ) // JobFunc is a function that runs a job of a predetermined type @@ -51,12 +53,18 @@ func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) JobFun } // HandleMsg registers a Runnable with the Hive and triggers that job whenever the provided Grav pod -// receives a message of a particular type. The message is passed to the runnable as the job data. -// The job's result is then emitted as a message. If the result cannot be cast to type grav.Message, -// or if an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent. +// receives a message of a particular type. func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options ...Option) { h.handle(msgType, runner, options...) + h.Listen(pod, msgType) +} + +// Listen causes Hive to listen for messages of the given type and trigger the job of the same type. +// The message's data is passed to the runnable as the job data. +// The job's result is then emitted as a message. If an error occurs, it is logged and an error is sent. +// If the result is nil, nothing is sent. +func (h *Hive) Listen(pod *grav.Pod, msgType string) { helper := func(data interface{}) *Result { job := NewJob(msgType, data) @@ -64,26 +72,35 @@ func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options } pod.OnType(func(msg grav.Message) error { - var resultMsg grav.Message + var replyMsg grav.Message - result, err := helper(msg).Then() + result, err := helper(msg.Data()).Then() if err != nil { h.log.Error(errors.Wrap(err, "job returned error result")) - resultMsg = grav.NewMsg(msgTypeHiveJobErr, []byte(err.Error())) + replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(err.Error())) } else { if result == nil { return nil } - var ok bool - resultMsg, ok = result.(grav.Message) - if !ok { - h.log.Error(errors.Wrap(err, "job result is not a grav.Message, discarding")) - resultMsg = grav.NewMsg(msgTypeHiveTypeErr, []byte("failed to convert job result to grav.Message type")) + if resultMsg, isMsg := result.(grav.Message); isMsg { + resultMsg.SetReplyTo(msg.UUID()) + replyMsg = resultMsg + } else if bytes, isBytes := result.([]byte); isBytes { + replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, bytes) + } else if resultString, isString := result.(string); isString { + replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, []byte(resultString)) + } else { + resultJSON, err := json.Marshal(result) + if err != nil { + replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error())) + } + + replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, resultJSON) } } - pod.Send(resultMsg) + pod.Send(replyMsg) return nil }, msgType) diff --git a/hive/message_test.go b/hive/message_test.go index 5f18414d..8b9da78a 100644 --- a/hive/message_test.go +++ b/hive/message_test.go @@ -14,12 +14,7 @@ const msgTypeTester = "hive.test" type msgRunner struct{} func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) { - msg := job.Msg() - if msg == nil { - return nil, errors.New("not a message") - } - - name := string(msg.Data()) + name := string(job.Bytes()) reply := grav.NewMsg(msgTypeTester, []byte(fmt.Sprintf("hello, %s", name)))