diff --git a/go.mod b/go.mod index f2234633..c4724eb5 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,12 @@ module github.com/suborbital/hive go 1.14 require ( - github.com/google/uuid v1.1.2 + github.com/google/uuid v1.1.3 github.com/pkg/errors v0.9.1 - github.com/suborbital/grav v0.1.0 + github.com/schollz/progressbar/v2 v2.15.0 // indirect + github.com/suborbital/grav v0.3.0 github.com/suborbital/vektor v0.2.2 - golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 + golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect + golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a ) diff --git a/go.sum b/go.sum index 2647763d..920f171b 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.3 h1:twObb+9XcuH5B9V1TBCvvvZoO6iEdILi2a76PYn5rJI= +github.com/google/uuid v1.1.3/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -14,6 +16,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/schollz/peerdiscovery v1.6.0/go.mod h1:hSU7N/NkfNH6AZwU/WBcDZtMABVbTfAWk/XD3XKxN+s= +github.com/schollz/peerdiscovery v1.6.1/go.mod h1:bq5/NB9o9/jyEwiW4ubehfToBa2LwdQQMoNiy/vSdYg= github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI= github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY= github.com/sethvargo/go-envconfig v0.3.0/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0= @@ -21,12 +24,15 @@ github.com/sethvargo/go-envconfig v0.3.2 h1:277Lb2iTpUZjUZu1qLoLa/aetwvtZbKh8wNW github.com/sethvargo/go-envconfig v0.3.2/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/suborbital/grav v0.0.10 h1:guf0PEBwqnwFCUOReStx+RddqSN1j81x+78c1bX/MI4= github.com/suborbital/grav v0.0.10/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw= github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0= github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw= github.com/suborbital/grav v0.1.0 h1:rTQV8TsEYf3f7xKZtEBdvKlj4MWWDuRlSN8v/kpQ/44= github.com/suborbital/grav v0.1.0/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE= +github.com/suborbital/grav v0.3.0 h1:dxe3YCKIblSlZ0Pl+uy0qW/xtXrmRD2gQNLSY4ErgvY= +github.com/suborbital/grav v0.3.0/go.mod h1:PapJ62PtT9dPmW37WaCD+UMhoZiNPp0N9E3nUfEujC4= github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASctH4= github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q= @@ -41,19 +47,34 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201217014255-9d1352758620/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d h1:dOiJ2n2cMwGLce/74I/QHMbnpk5GfY7InR8rczoMqRM= golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201113135734-0a15ea8d9b02/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -62,3 +83,5 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/hive/hive.go b/hive/hive.go index 119abe5e..45808d8a 100644 --- a/hive/hive.go +++ b/hive/hive.go @@ -9,9 +9,11 @@ import ( "github.com/suborbital/vektor/vlog" ) +// MsgTypeHiveJobErr and others are Grav message types used for Hive job const ( - msgTypeHiveJobErr = "hive.joberr" - msgTypeHiveResult = "hive.result" + MsgTypeHiveJobErr = "hive.joberr" + MsgTypeHiveResult = "hive.result" + MsgTypeHiveNilResult = "hive.nil" ) // JobFunc is a function that runs a job of a predetermined type @@ -71,39 +73,43 @@ func (h *Hive) Listen(pod *grav.Pod, msgType string) { return h.Do(job) } - pod.OnType(func(msg grav.Message) error { + pod.OnType(msgType, func(msg grav.Message) error { var replyMsg grav.Message result, err := helper(msg.Data()).Then() if err != nil { - h.log.Error(errors.Wrap(err, "job returned error result")) - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(err.Error())) + h.log.Error(errors.Wrapf(err, "job from message %s returned error result", msg.UUID())) + replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(err.Error())) } else { if result == nil { - return nil - } - - if resultMsg, isMsg := result.(grav.Message); isMsg { + // if the job returned no result + replyMsg = grav.NewMsg(MsgTypeHiveNilResult, []byte{}) + } else if resultMsg, isMsg := result.(grav.Message); isMsg { + // if the job returned a Grav message resultMsg.SetReplyTo(msg.UUID()) replyMsg = resultMsg } else if bytes, isBytes := result.([]byte); isBytes { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, bytes) + // if the job returned bytes + replyMsg = grav.NewMsg(MsgTypeHiveResult, bytes) } else if resultString, isString := result.(string); isString { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, []byte(resultString)) + // if the job returned a string + replyMsg = grav.NewMsg(MsgTypeHiveResult, []byte(resultString)) } else { + // if the job returned something else like a struct resultJSON, err := json.Marshal(result) if err != nil { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error())) + h.log.Error(errors.Wrapf(err, "job from message %s returned result that could not be JSON marshalled", msg.UUID())) + replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error())) } - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, resultJSON) + replyMsg = grav.NewMsg(MsgTypeHiveResult, resultJSON) } } - pod.Send(replyMsg) + pod.ReplyTo(msg, replyMsg) return nil - }, msgType) + }) } // Job is a shorter alias for NewJob diff --git a/hive/message_test.go b/hive/message_test.go index 8b9da78a..30469892 100644 --- a/hive/message_test.go +++ b/hive/message_test.go @@ -10,7 +10,9 @@ import ( ) const msgTypeTester = "hive.test" +const msgTypeNil = "hive.testnil" +// to test jobs listening to a Grav message type msgRunner struct{} func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) { @@ -23,6 +25,15 @@ func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) { func (m *msgRunner) OnStart() error { return nil } +// to test jobs with a nil result +type nilRunner struct{} + +func (m *nilRunner) Run(job Job, do DoFunc) (interface{}, error) { + return nil, nil +} + +func (m *nilRunner) OnStart() error { return nil } + func TestHandleMessage(t *testing.T) { hive := New() g := grav.New() @@ -33,10 +44,10 @@ func TestHandleMessage(t *testing.T) { sender := g.Connect() - sender.OnType(func(msg grav.Message) error { + sender.OnType(msgTypeTester, func(msg grav.Message) error { counter.Count() return nil - }, msgTypeTester) + }) sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown"))) @@ -55,10 +66,10 @@ func TestHandleMessagePt2(t *testing.T) { sender := g.Connect() - sender.OnType(func(msg grav.Message) error { + sender.OnType(msgTypeTester, func(msg grav.Message) error { counter.Count() return nil - }, msgTypeTester) + }) for i := 0; i < 9876; i++ { sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown"))) @@ -68,3 +79,27 @@ func TestHandleMessagePt2(t *testing.T) { t.Error(errors.Wrap(err, "failed to counter.Wait")) } } + +func TestHandleMessageNilResult(t *testing.T) { + hive := New() + g := grav.New() + + hive.HandleMsg(g.Connect(), msgTypeNil, &nilRunner{}) + + counter := testutil.NewAsyncCounter(10) + + pod := g.Connect() + + pod.OnType(MsgTypeHiveNilResult, func(msg grav.Message) error { + counter.Count() + return nil + }) + + for i := 0; i < 5; i++ { + pod.Send(grav.NewMsg(msgTypeNil, []byte("hi"))) + } + + if err := counter.Wait(5, 1); err != nil { + t.Error(errors.Wrap(err, "failed to counter.Wait")) + } +}