From 75197ca4e79ca22a3ab01032f5b6ec1dce8becf5 Mon Sep 17 00:00:00 2001 From: Romain Beuque <556072+rbeuque74@users.noreply.github.com> Date: Mon, 31 Oct 2022 23:54:22 +0100 Subject: [PATCH] feat(task): add Kafka consumer to create tasks from Kafka topic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Romain Beuque <556072+rbeuque74@users.noreply.github.com> Signed-off-by: Thomas Bétrancourt --- engine/engine.go | 26 ++ go.mod | 33 ++- go.sum | 89 ++++-- pkg/kafkaconsumer/kafkaconsumer.go | 209 +++++++++++++ pkg/kafkaconsumer/kafkaconsumer_test.go | 277 ++++++++++++++++++ .../templates_tests/kafka-task-template.yaml | 26 ++ 6 files changed, 632 insertions(+), 28 deletions(-) create mode 100644 pkg/kafkaconsumer/kafkaconsumer.go create mode 100644 pkg/kafkaconsumer/kafkaconsumer_test.go create mode 100644 pkg/kafkaconsumer/templates_tests/kafka-task-template.yaml diff --git a/engine/engine.go b/engine/engine.go index ea1e6e5d..b98196e0 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -27,6 +27,7 @@ import ( "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" "github.com/ovh/utask/pkg/jsonschema" + "github.com/ovh/utask/pkg/kafkaconsumer" "github.com/ovh/utask/pkg/metadata" "github.com/ovh/utask/pkg/now" "github.com/ovh/utask/pkg/taskutils" @@ -147,6 +148,31 @@ func Init(ctx context.Context, wg *sync.WaitGroup, store *configstore.Store) err if err := RetryCollector(ctx); err != nil { return err } + + // init KafkaConsumer + kcItems, err := configstore.Filter().Store(store).Slice(kafkaconsumer.ConfigKey).Unmarshal(func() interface{} { return &kafkaconsumer.KafkaConfig{} }).Squash().GetItemList() + if err != nil { + return err + } + + for _, cfgItem := range kcItems.Items { + i, err := cfgItem.Unmarshaled() + if err != nil { + return err + } + + cfg := i.(*kafkaconsumer.KafkaConfig) + consumer, err := kafkaconsumer.StartNewTaskConsumer(ctx, *cfg) + if err != nil { + return err + } + + if err := consumer.SetDefaultConsumer(ctx); err != nil { + return err + } + + consumer.StartConsumer(ctx) + } } return nil } diff --git a/go.mod b/go.mod index 828c55d9..e4e69887 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,14 @@ go 1.19 require ( github.com/Masterminds/sprig/v3 v3.1.0 github.com/Masterminds/squirrel v1.4.0 + github.com/Shopify/sarama v1.37.2 github.com/cenkalti/backoff v2.2.1+incompatible github.com/fabienm/go-logrus-formatters v1.0.0 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/gin-gonic/gin v1.7.7 github.com/go-gorp/gorp v2.2.0+incompatible github.com/go-ping/ping v0.0.0-20210506233800-ff8be3320020 + github.com/go-playground/validator/v10 v10.11.1 github.com/gofrs/uuid v3.3.0+incompatible github.com/jpillora/backoff v1.0.0 github.com/juju/errors v0.0.0-20200330140219-3fe23663418f @@ -29,13 +31,13 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v0.0.6 github.com/spf13/viper v1.4.0 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.8.0 github.com/tidwall/gjson v1.9.3 github.com/wI2L/fizz v0.17.0 github.com/ybriffa/go-http-digest-auth-client v0.6.3 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/net v0.7.0 - golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f + golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 gopkg.in/mail.v2 v2.3.1 ) @@ -47,6 +49,9 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51 // indirect github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 // indirect github.com/facebookgo/httpcontrol v0.0.0-20150708234001-ccde4420e1fe // indirect @@ -54,31 +59,34 @@ require ( github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-playground/locales v0.13.0 // indirect - github.com/go-playground/universal-translator v0.17.0 // indirect - github.com/go-playground/validator/v10 v10.4.1 // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.1.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.5.1 // indirect - github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/huandu/xstrings v1.3.1 // indirect github.com/imdario/mergo v0.3.9 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/goidentity/v6 v6.0.1 // indirect - github.com/jcmturner/gokrb5/v8 v8.3.0 // indirect - github.com/jcmturner/rpc/v2 v2.0.2 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/now v1.0.1 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.11 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/leodido/go-urn v1.2.0 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/magiconair/properties v1.8.1 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect @@ -89,12 +97,14 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.4.0 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/poy/onpar v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect github.com/spf13/afero v1.2.2 // indirect github.com/spf13/cast v1.3.1 // indirect @@ -112,4 +122,5 @@ require ( gopkg.in/ini.v1 v1.46.0 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 85f28ac1..a0a19f84 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,9 @@ github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00 github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y= github.com/SSSaaS/sssa-golang v0.0.0-20170502204618-d37d7782d752 h1:NMpC6M+PtNNDYpq7ozB7kINpv10L5yeli5GJpka2PX8= github.com/SSSaaS/sssa-golang v0.0.0-20170502204618-d37d7782d752/go.mod h1:PbJ8S5YaSYAvDPTiEuUsBHQwTUlPs6VM+Av8Oi3v570= +github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4= +github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/a8m/expect v1.0.0/go.mod h1:4IwSCMumY49ScypDnjNbYEjgVeqy1/U2cEs3Lat96eA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -79,11 +82,18 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -101,6 +111,7 @@ github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqL github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 h1:E2s37DuLxFhQDg5gKsWoLBOB0n+ZW8s599zru8FJ2/Y= github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -134,14 +145,17 @@ github.com/go-ping/ping v0.0.0-20210506233800-ff8be3320020/go.mod h1:KmHOjTUmJh/ github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= -github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= +github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= -github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= +github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= +github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= +github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -179,6 +193,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -211,18 +227,24 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= -github.com/gorilla/sessions v1.2.0 h1:S7P+1Hm5V/AT9cjEcUD5uDaQSX0OE577aCXgoaKpYbQ= github.com/gorilla/sessions v1.2.0/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.1 h1:Vsx5XKPqPs3M6sM4U4GWyUqFS8aBiL9U5gkgvpkg4SE= github.com/hashicorp/go-retryablehttp v0.5.1/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -239,15 +261,17 @@ github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFK github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= -github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.2.0/go.mod h1:T1hnNppQsBtxW0tCHMHTkAt8n/sABdzZgZdoFrZaZNM= -github.com/jcmturner/gokrb5/v8 v8.3.0 h1:+a/zAxqOO5Ljb5UGIUMOnxf5u6kMh9gWqOG67KBICK8= -github.com/jcmturner/gokrb5/v8 v8.3.0/go.mod h1:T1hnNppQsBtxW0tCHMHTkAt8n/sABdzZgZdoFrZaZNM= -github.com/jcmturner/rpc/v2 v2.0.2 h1:gMB4IwRXYsWw4Bc6o/az2HJgFUA1ffSh90i26ZJ6Xl0= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= github.com/jcmturner/rpc/v2 v2.0.2/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/now v1.0.1 h1:HjfetcXq097iXP0uoPCdnM4Efp5/9MsM0/M+XOTeR3M= github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -275,22 +299,28 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= -github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lib/pq v1.6.0 h1:I5DPxhYJChW9KYc66se+oKFFQX6VuQrKiprsX6ivRZc= github.com/lib/pq v1.6.0/go.mod h1:4vXEAYvW1fRQ2/FhZ78H73A60MHw1geSm145z2mdY1g= github.com/loopfz/gadgeto v0.9.0/go.mod h1:S3tK5SXmKY3l39rUpPZw1B/iiy1CftV13QABFhj32Ss= @@ -349,6 +379,9 @@ github.com/ovh/tat v5.2.5+incompatible/go.mod h1:eRL842a3T+2ept8tEg5IxitkGJL6X8w github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -389,10 +422,15 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/robertkrimen/otto v0.0.0-20191219234010-c382bd3c16ff h1:+6NUiITWwE5q1KO6SAfUX918c+Tab0+tGAM/mtdlUyA= github.com/robertkrimen/otto v0.0.0-20191219234010-c382bd3c16ff/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= @@ -425,11 +463,16 @@ github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/tidwall/gjson v1.9.3 h1:hqzS9wAHMO+KVBBkLxYdkEeeFHuqr95GfClRLKlgK0E= github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -471,8 +514,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -536,8 +581,10 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -557,8 +604,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= -golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc= +golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -601,12 +648,15 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -750,8 +800,9 @@ gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gG gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= @@ -784,6 +835,10 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/kafkaconsumer/kafkaconsumer.go b/pkg/kafkaconsumer/kafkaconsumer.go new file mode 100644 index 00000000..c9adf99c --- /dev/null +++ b/pkg/kafkaconsumer/kafkaconsumer.go @@ -0,0 +1,209 @@ +package kafkaconsumer + +import ( + "context" + "fmt" + "log" + "regexp" + "time" + + "github.com/Shopify/sarama" + "github.com/go-playground/validator/v10" + "github.com/loopfz/gadgeto/zesty" + "github.com/sirupsen/logrus" + + "github.com/ovh/utask" + "github.com/ovh/utask/models/tasktemplate" + "github.com/ovh/utask/pkg/auth" + "github.com/ovh/utask/pkg/taskutils" +) + +const ( + TimeoutDefault = "10s" + ConfigKey = "kafka-consumer" +) + +// KafkaConfig is the configuration needed to write a message on Kafka topic +type KafkaConfig struct { + Brokers []string `json:"brokers" validate:"required,gt=0"` + KafkaVersion string `json:"kafka_version,omitempty"` + Group string `json:"group" validate:"required,gt=0"` + SASL struct { + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + } `json:"sasl,omitempty"` + WithTLS bool `json:"with_tls"` + Timeout string `json:"timeout"` + Topics []string `json:"topics" validate:"required,gt=0"` + OldestOffset bool `json:"oldest_offset"` + + TaskTemplate string `json:"task_template" validate:"required,gt=0"` + Input map[string]interface{} `json:"input"` + RequesterUsername string `json:"requester_username"` + RequesterGroups []string `json:"requester_groups"` + ResolverUsername string `json:"resolver_username"` + WatcherUsernames []string `json:"watcher_usernames"` + WatcherGroups []string `json:"watcher_groups"` +} + +func StartNewTaskConsumer(ctx context.Context, cfg KafkaConfig) (*Consumer, error) { + err := validator.New().Struct(cfg) + if err != nil { + return nil, err + } + + if cfg.Timeout == "" { + cfg.Timeout = TimeoutDefault + } + + td, err := time.ParseDuration(cfg.Timeout) + if err != nil { + return nil, fmt.Errorf("failed to parse timeout: %s", err) + } + + // Kafka config + config := sarama.NewConfig() + config.Net.TLS.Enable = cfg.WithTLS + config.Net.DialTimeout = td + config.ClientID = "uTask-" + clearString(utask.AppName()) + + if cfg.KafkaVersion != "" { + version, err := sarama.ParseKafkaVersion(cfg.KafkaVersion) + if err != nil { + return nil, fmt.Errorf("failed parsing Kafka version: %v", err) + } + + config.Version = version + } + + // SASL authentication + if cfg.SASL.User != "" || cfg.SASL.Password != "" { + config.Net.SASL.Enable = true + config.Net.SASL.User = cfg.SASL.User + config.Net.SASL.Password = cfg.SASL.Password + } + + if cfg.OldestOffset { + config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + + dbp, err := zesty.NewDBProvider(utask.DBName) + if err != nil { + return nil, err + } + + tt, err := tasktemplate.LoadFromName(dbp, cfg.TaskTemplate) + if err != nil { + return nil, err + } + + return &Consumer{ + cfg: cfg, + saramaCfg: config, + tt: *tt, + }, nil +} + +func (c *Consumer) SetDefaultConsumer(ctx context.Context) error { + client, err := sarama.NewConsumerGroup(c.cfg.Brokers, c.cfg.Group, c.saramaCfg) + if err != nil { + return fmt.Errorf("failed creating consumer group client: %v", err) + } + + c.client = client + + return nil +} + +func (c *Consumer) SetCustomConsumer(client sarama.ConsumerGroup) { + c.client = client +} + +func (c *Consumer) StartConsumer(ctx context.Context) { + go func() { + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := c.client.Consume(ctx, c.cfg.Topics, c); err != nil { + logrus.WithError(err).Warn("kafkaconsumer: fail to consume") + continue + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + } + }() + + logrus.Debugf("kafkaconsumer: starting consumption") + + go func(ctx context.Context) { + <-ctx.Done() + if err := c.client.Close(); err != nil { + logrus.WithError(err).Warn("kafkaconsumer: fail to close client") + } + }(ctx) +} + +// Consumer represents a Sarama consumer group consumer +type Consumer struct { + cfg KafkaConfig + tt tasktemplate.TaskTemplate + saramaCfg *sarama.Config + client sarama.ConsumerGroup +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case message := <-claim.Messages(): + ctx := auth.WithIdentity(context.Background(), consumer.cfg.RequesterUsername) + ctx = auth.WithGroups(ctx, consumer.cfg.RequesterGroups) + + dbp, err := zesty.NewDBProvider(utask.DBName) + if err != nil { + return err + } + + _, err = taskutils.CreateTask(ctx, dbp, + &consumer.tt, consumer.cfg.WatcherUsernames, consumer.cfg.WatcherGroups, + []string{}, []string{}, + consumer.cfg.Input, nil, + "created from KafkaConsumer", nil, nil) + if err != nil { + log.Print(err) + return err + } + + session.MarkMessage(message, "") + session.Commit() + + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/Shopify/sarama/issues/1192 + case <-session.Context().Done(): + return nil + } + } +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +var ( + nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9-]+`) +) + +func clearString(str string) string { + return nonAlphanumericRegex.ReplaceAllString(str, "") +} diff --git a/pkg/kafkaconsumer/kafkaconsumer_test.go b/pkg/kafkaconsumer/kafkaconsumer_test.go new file mode 100644 index 00000000..570c93a8 --- /dev/null +++ b/pkg/kafkaconsumer/kafkaconsumer_test.go @@ -0,0 +1,277 @@ +package kafkaconsumer + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/ghodss/yaml" + "github.com/juju/errors" + "github.com/loopfz/gadgeto/zesty" + "github.com/maxatome/go-testdeep/td" + "github.com/ovh/configstore" + "github.com/sirupsen/logrus" + + "github.com/ovh/utask" + "github.com/ovh/utask/db" + "github.com/ovh/utask/db/pgjuju" + "github.com/ovh/utask/engine/step" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/models/tasktemplate" + "github.com/ovh/utask/pkg/auth" + "github.com/ovh/utask/pkg/now" + "github.com/ovh/utask/pkg/plugins/builtin/echo" + "github.com/ovh/utask/pkg/plugins/builtin/script" +) + +func TestMain(m *testing.M) { + store := configstore.DefaultStore + store.InitFromEnvironment() + + logrus.SetOutput(os.Stdout) + logrus.SetLevel(logrus.ErrorLevel) + + step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin) + step.RegisterRunner(script.Plugin.PluginName(), script.Plugin) + + if err := db.Init(store); err != nil { + panic(err) + } + + if err := now.Init(); err != nil { + panic(err) + } + + if err := auth.Init(store); err != nil { + panic(err) + } + + os.Exit(m.Run()) +} + +func loadTemplates(t *testing.T, dbp zesty.DBProvider) error { + templateList := map[string][]byte{} + files, err := ioutil.ReadDir("./templates_tests") + if err != nil { + panic(err) + } + + for _, file := range files { + if file.Mode().IsRegular() { + bytesValue, err := os.ReadFile(filepath.Join("./templates_tests", file.Name())) + if err != nil { + panic(err) + } + templateList[file.Name()] = bytesValue + + var tmpl tasktemplate.TaskTemplate + + if err := yaml.Unmarshal(bytesValue, &tmpl); err != nil { + return err + } + if err := tmpl.Valid(); err != nil { + return err + } + tmpl.Normalize() + if err := dbp.DB().Insert(&tmpl); err != nil { + intErr := pgjuju.Interpret(err) + if !errors.IsAlreadyExists(intErr) { + return intErr + } + existing, err := tasktemplate.LoadFromName(dbp, tmpl.Name) + if err != nil { + return err + } + tmpl.ID = existing.ID + if _, err := dbp.DB().Update(&tmpl); err != nil { + return err + } + } + } + } + return nil +} + +func TestKafkaConsumer(t *testing.T) { + assert, require := td.AssertRequire(t) + + dbp, err := zesty.NewDBProvider(utask.DBName) + require.CmpNoError(err) + err = loadTemplates(t, dbp) + require.CmpNoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + + c, err := StartNewTaskConsumer(ctx, KafkaConfig{ + Brokers: []string{"localhost:123"}, + KafkaVersion: "2.8.1", + Group: "my-consumer-group", + Topics: []string{"my-topic"}, + + TaskTemplate: "kafka-task-template", + RequesterUsername: "rb", + Input: map[string]interface{}{ + "quantity": json.Number("12"), + "foo": "hello you", + }, + }) + require.CmpNoError(err) + + consumer := MockKafka{} + c.SetCustomConsumer(&consumer) + c.StartConsumer(ctx) + + template := "kafka-task-template" + filter := task.ListFilter{ + Template: &template, + PageSize: 1, + } + + time.Sleep(time.Second) + cancel() + + time.Sleep(time.Second) + tasks, err := task.ListTasks(dbp, filter) + require.CmpNoError(err) + assert.Len(tasks, 1) + + task, err := task.LoadFromID(dbp, tasks[0].ID) + require.CmpNoError(err) + assert.Cmp(task.Input["quantity"], json.Number("12")) + assert.Cmp(task.Input["foo"], "hello you") +} + +type MockKafka struct{} + +func (mk *MockKafka) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error { + mgcs := MockConsumerGroupSession{ + ctx: ctx, + } + mcgc := MockConsumerGroupClaim{ + msgs: make(chan *sarama.ConsumerMessage), + } + handler.Setup(&mgcs) + go handler.ConsumeClaim(&mgcs, &mcgc) + mcgc.msgs <- &sarama.ConsumerMessage{ + Topic: "mytopic", + Partition: 1, + Offset: 2, + Value: []byte("foobar coco"), + } + handler.Cleanup(&mgcs) + time.Sleep(20 * time.Second) + return nil +} + +// Errors returns a read channel of errors that occurred during the consumer life-cycle. +// By default, errors are logged and not returned over this channel. +// If you want to implement any custom error handling, set your config's +// Consumer.Return.Errors setting to true, and read from this channel. +func (mk *MockKafka) Errors() <-chan error { + return make(chan error) +} + +// Close stops the ConsumerGroup and detaches any running sessions. It is required to call +// this function before the object passes out of scope, as it will otherwise leak memory. +func (mk *MockKafka) Close() error { + return nil +} +func (mk *MockKafka) Pause(partitions map[string][]int32) {} +func (mk *MockKafka) Resume(partitions map[string][]int32) {} +func (mk *MockKafka) PauseAll() {} +func (mk *MockKafka) ResumeAll() {} + +// ConsumerGroupSession represents a consumer group member session. +type MockConsumerGroupSession struct { + ctx context.Context +} + +// Claims returns information about the claimed partitions by topic. +func (mcgs *MockConsumerGroupSession) Claims() map[string][]int32 { + return map[string][]int32{} +} + +// MemberID returns the cluster member ID. +func (mcgs *MockConsumerGroupSession) MemberID() string { + return "foobar" +} + +// GenerationID returns the current generation ID. +func (mcgs *MockConsumerGroupSession) GenerationID() int32 { + return 42 +} + +// MarkOffset marks the provided offset, alongside a metadata string +// that represents the state of the partition consumer at that point in time. The +// metadata string can be used by another consumer to restore that state, so it +// can resume consumption. +// +// To follow upstream conventions, you are expected to mark the offset of the +// next message to read, not the last message read. Thus, when calling `MarkOffset` +// you should typically add one to the offset of the last consumed message. +// +// Note: calling MarkOffset does not necessarily commit the offset to the backend +// store immediately for efficiency reasons, and it may never be committed if +// your application crashes. This means that you may end up processing the same +// message twice, and your processing should ideally be idempotent. +func (mcgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { +} + +// Commit the offset to the backend +// +// Note: calling Commit performs a blocking synchronous operation. +func (mcgs *MockConsumerGroupSession) Commit() {} + +// ResetOffset resets to the provided offset, alongside a metadata string that +// represents the state of the partition consumer at that point in time. Reset +// acts as a counterpart to MarkOffset, the difference being that it allows to +// reset an offset to an earlier or smaller value, where MarkOffset only +// allows incrementing the offset. cf MarkOffset for more details. +func (mcgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { +} + +// MarkMessage marks a message as consumed. +func (mcgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {} + +// Context returns the session context. +func (mcgs *MockConsumerGroupSession) Context() context.Context { + return mcgs.ctx +} + +// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group. +type MockConsumerGroupClaim struct { + msgs chan *sarama.ConsumerMessage +} + +func (mcgc *MockConsumerGroupClaim) Topic() string { + return "foobar-topic" +} + +func (mcgc *MockConsumerGroupClaim) Partition() int32 { + return 41 +} + +func (mcgc *MockConsumerGroupClaim) InitialOffset() int64 { + return 43 +} + +// HighWaterMarkOffset returns the high water mark offset of the partition, +// i.e. the offset that will be used for the next message that will be produced. +// You can use this to determine how far behind the processing is. +func (mcgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64 { + return 23 +} + +// Messages returns the read channel for the messages that are returned by +// the broker. The messages channel will be closed when a new rebalance cycle +// is due. You must finish processing and mark offsets within +// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually +// re-assigned to another group member. +func (mcgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { + return mcgc.msgs +} diff --git a/pkg/kafkaconsumer/templates_tests/kafka-task-template.yaml b/pkg/kafkaconsumer/templates_tests/kafka-task-template.yaml new file mode 100644 index 00000000..fa0d8739 --- /dev/null +++ b/pkg/kafkaconsumer/templates_tests/kafka-task-template.yaml @@ -0,0 +1,26 @@ +name: kafka-task-template +description: Number input should be valid +title_format: "[test] input number test" +auto_runnable: true +inputs: + - name: quantity + description: Options quantity (1 will… streams, 2 for 10 ...) + collection: false + type: number + optional: false + - name: foo + description: A string value + type: string + optional: true + default: "" +steps: + stepOne: + description: first step + idempotent: true + retry_pattern: seconds + action: + type: echo + configuration: + output: + value: "{{.input.quantity}}" + foo: "{{.input.foo}}"