diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 11984d883ec..5f489feed2f 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -315,6 +315,7 @@ func main() { clientset, cfg.TestkubeRegistry, cfg.TestkubePodStartTimeout, + clusterId, ) if err != nil { ui.ExitOnError("Creating executor client", err) @@ -338,6 +339,7 @@ func main() { testsClientV3, cfg.TestkubeRegistry, cfg.TestkubePodStartTimeout, + clusterId, ) if err != nil { ui.ExitOnError("Creating container executor", err) @@ -389,6 +391,7 @@ func main() { storageClient, cfg.GraphqlPort, artifactStorage, + cfg.CDEventsTarget, ) if mode == common.ModeAgent { diff --git a/go.mod b/go.mod index 71a1426b7fb..7b8a246a76d 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,16 @@ require ( github.com/99designs/gqlgen v0.17.27 github.com/Masterminds/semver v1.5.0 github.com/adhocore/gronx v1.1.2 + github.com/cdevents/sdk-go v0.3.0 github.com/cli/cli/v2 v2.20.2 + github.com/cloudevents/sdk-go/v2 v2.14.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/creasty/defaults v1.7.0 github.com/denisbrodbeck/machineid v1.0.1 github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0 github.com/fasthttp/websocket v1.5.0 github.com/fluxcd/pkg/apis/event v0.2.0 + github.com/gabriel-vasile/mimetype v1.4.2 github.com/gofiber/adaptor/v2 v2.1.29 github.com/gofiber/fiber/v2 v2.39.0 github.com/gofiber/websocket/v2 v2.1.1 @@ -77,6 +80,9 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect + github.com/go-playground/validator/v10 v10.11.1 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/google/gnostic v0.6.9 // indirect @@ -91,6 +97,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/lithammer/fuzzysearch v1.1.5 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -106,8 +113,10 @@ require ( github.com/nats-io/nats-server/v2 v2.8.4 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/package-url/packageurl-go v0.1.0 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d // indirect github.com/segmentio/backo-go v1.0.1 // indirect github.com/shurcooL/githubv4 v0.0.0-20220922232305-70b4d362a8cb // indirect diff --git a/go.sum b/go.sum index 403b901236a..2419793378f 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/briandowns/spinner v1.19.0 h1:s8aq38H+Qju89yhp89b4iIiMzMm8YN3p6vGpwyh/a8E= github.com/briandowns/spinner v1.19.0/go.mod h1:mQak9GHqbspjC/5iUx3qMlIho8xBS/ppAL/hX5SmPJU= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/cdevents/sdk-go v0.3.0 h1:YHb47qIVi3qV+HmkyW3e0gqCQaqKW0rnL4EejSDuMFs= +github.com/cdevents/sdk-go v0.3.0/go.mod h1:8EFl9VDZkxEmO/sr06Phzr501OiU6B5d04+eYpf1tF0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -132,6 +134,8 @@ github.com/cli/safeexec v1.0.0/go.mod h1:Z/D4tTN8Vs5gXYHDCbaM1S/anmEDnJb1iW0+EJ5 github.com/cli/shurcooL-graphql v0.0.2 h1:rwP5/qQQ2fM0TzkUTwtt6E2LbIYf6R+39cUXTa04NYk= github.com/cli/shurcooL-graphql v0.0.2/go.mod h1:tlrLmw/n5Q/+4qSvosT+9/W5zc8ZMjnJeYBxSdb4nWA= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= +github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -184,6 +188,8 @@ github.com/fluxcd/pkg/apis/event v0.2.0 h1:cmAtkZfoEaNVYegI4SFM8XstdRAil3O9AoP+8 github.com/fluxcd/pkg/apis/event v0.2.0/go.mod h1:OyzKqs90J+MK7rQaEOFMMCkALpPkfmxlkabgyY2wSFQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= +github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= @@ -208,6 +214,14 @@ github.com/go-openapi/jsonreference v0.20.1 h1:FBLnyygC4/IZZr893oiomc9XaghoveYTr github.com/go-openapi/jsonreference v0.20.1/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= +github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= +github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= +github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= +github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -369,6 +383,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -379,6 +394,8 @@ github.com/lithammer/fuzzysearch v1.1.5/go.mod h1:1R1LRNk7yKid1BaQkmuLQaHruxcC4H github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubeshop/testkube-operator v1.10.8-0.20230504145001-42249450d65a h1:hBHA3iMgIO9dqa+8aNQ/6NhgV9RFLWAo36Eo7VRVD/4= github.com/kubeshop/testkube-operator v1.10.8-0.20230504145001-42249450d65a/go.mod h1:6Rs8MugOzaMcthGzobf6GBlRzbOFiK/GJiuYN6MCfEw= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -462,6 +479,8 @@ github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6 github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo= github.com/otiai10/mint v1.4.0 h1:umwcf7gbpEwf7WFzqmWwSv0CzbeMsae2u9ZvpP8j2q4= github.com/otiai10/mint v1.4.0/go.mod h1:gifjb2MYOoULtKLqUAEILUG/9KONW6f7YsJ6vQLTlFI= +github.com/package-url/packageurl-go v0.1.0 h1:efWBc98O/dBZRg1pw2xiDzovnlMjCa9NPnfaiBduh8I= +github.com/package-url/packageurl-go v0.1.0/go.mod h1:C/ApiuWpmbpni4DIOECf6WCjFUZV7O1Fx7VAzrZHgBw= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -516,12 +535,16 @@ github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d h1:Q+gqLBOPkFGHyCJxXMRqtUgUbTjI8/Ze8vu8GGyNFwo= github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4= @@ -636,6 +659,7 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -785,6 +809,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/app/api/v1/server.go b/internal/app/api/v1/server.go index 08325ff61a9..dd64b892149 100644 --- a/internal/app/api/v1/server.go +++ b/internal/app/api/v1/server.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" + "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/repository/config" "github.com/kubeshop/testkube/pkg/version" @@ -35,6 +36,7 @@ import ( testkubeclientset "github.com/kubeshop/testkube-operator/pkg/clientset/versioned" "github.com/kubeshop/testkube/internal/app/api/metrics" "github.com/kubeshop/testkube/pkg/event" + "github.com/kubeshop/testkube/pkg/event/kind/cdevent" "github.com/kubeshop/testkube/pkg/event/kind/slack" "github.com/kubeshop/testkube/pkg/event/kind/webhook" ws "github.com/kubeshop/testkube/pkg/event/kind/websocket" @@ -77,6 +79,7 @@ func NewTestkubeAPI( storage storage.Client, graphqlPort string, artifactsStorage storage.ArtifactsStorage, + cdeventsTarget string, ) TestkubeAPI { var httpConfig server.Config @@ -125,6 +128,15 @@ func NewTestkubeAPI( s.Events.Loader.Register(s.WebsocketLoader) s.Events.Loader.Register(s.slackLoader) + if cdeventsTarget != "" { + cdeventLoader, err := cdevent.NewCDEventLoader(cdeventsTarget, clusterId, namespace, testkube.AllEventTypes) + if err == nil { + s.Events.Loader.Register(cdeventLoader) + } else { + s.Log.Debug("cdevents init error", "error", err.Error()) + } + } + s.InitEnvs() s.InitRoutes() diff --git a/internal/config/config.go b/internal/config/config.go index fa966fb39a3..4f1c47d5f48 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,6 +58,7 @@ type Config struct { GraphqlPort string `envconfig:"TESTKUBE_GRAPHQL_PORT" default:"8070"` TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` + CDEventsTarget string `envconfig:"CDEVENTS_TARGET" default:""` } func Get() (*Config, error) { diff --git a/pkg/api/v1/testkube/model_execution_extended.go b/pkg/api/v1/testkube/model_execution_extended.go index fba45a44108..1e8c226043a 100644 --- a/pkg/api/v1/testkube/model_execution_extended.go +++ b/pkg/api/v1/testkube/model_execution_extended.go @@ -188,3 +188,11 @@ func (e Execution) IsTimeout() bool { return *e.ExecutionResult.Status == TIMEOUT_ExecutionStatus } + +func (e Execution) IsPassed() bool { + if e.ExecutionResult == nil { + return true + } + + return *e.ExecutionResult.Status == PASSED_ExecutionStatus +} diff --git a/pkg/api/v1/testkube/model_test_suite_execution_extended.go b/pkg/api/v1/testkube/model_test_suite_execution_extended.go index 52e23e94597..8a32a492c7d 100644 --- a/pkg/api/v1/testkube/model_test_suite_execution_extended.go +++ b/pkg/api/v1/testkube/model_test_suite_execution_extended.go @@ -135,3 +135,11 @@ func (e *TestSuiteExecution) IsPassed() bool { func (e *TestSuiteExecution) IsFailed() bool { return *e.Status == FAILED_TestSuiteExecutionStatus } + +func (e *TestSuiteExecution) IsAborted() bool { + return *e.Status == ABORTED_TestSuiteExecutionStatus +} + +func (e *TestSuiteExecution) IsTimeout() bool { + return *e.Status == TIMEOUT_TestSuiteExecutionStatus +} diff --git a/pkg/cloud/data/artifact/scraper_integration_test.go b/pkg/cloud/data/artifact/scraper_integration_test.go index 924474b27f2..6ebe784afa7 100644 --- a/pkg/cloud/data/artifact/scraper_integration_test.go +++ b/pkg/cloud/data/artifact/scraper_integration_test.go @@ -10,6 +10,7 @@ import ( "github.com/kubeshop/testkube/pkg/utils/test" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -82,7 +83,20 @@ func TestCloudScraper_ArchiveFilesystemExtractor_Integration(t *testing.T) { Execute(gomock.Any(), cloudscraper.CmdScraperPutObjectSignedURL, gomock.Eq(req2)). Return([]byte(`{"URL":"`+testServer.URL+`/dummy"}`), nil) - s := scraper.NewExtractLoadScraper(extractor, cloudLoader) + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := cloudevents.NewEventFromHTTPRequest(r) + // then + assert.NoError(t, err) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + s := scraper.NewExtractLoadScraper(extractor, cloudLoader, client, "") execution := testkube.Execution{ Id: "my-execution-id", TestName: "my-test", @@ -168,7 +182,20 @@ func TestCloudScraper_RecursiveFilesystemExtractor_Integration(t *testing.T) { Execute(gomock.Any(), cloudscraper.CmdScraperPutObjectSignedURL, gomock.Eq(req3)). Return([]byte(`{"URL":"`+testServer.URL+`/dummy"}`), nil) - s := scraper.NewExtractLoadScraper(extractor, cloudLoader) + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := cloudevents.NewEventFromHTTPRequest(r) + // then + assert.NoError(t, err) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + s := scraper.NewExtractLoadScraper(extractor, cloudLoader, client, "") execution := testkube.Execution{ Id: "my-execution-id", TestName: "my-test", diff --git a/pkg/envs/variables.go b/pkg/envs/variables.go index 0720e7ed549..04acf042937 100644 --- a/pkg/envs/variables.go +++ b/pkg/envs/variables.go @@ -22,6 +22,8 @@ type Params struct { DataDir string // RUNNER_DATADIR GitUsername string // RUNNER_GITUSERNAME GitToken string // RUNNER_GITTOKEN + ClusterID string `envconfig:"RUNNER_CLUSTERID"` // RUNNER_CLUSTERID + CDEventsTarget string `envconfig:"RUNNER_CDEVENTS_TARGET"` // RUNNER_CDEVENTS_TARGET CloudMode bool `envconfig:"RUNNER_CLOUD_MODE"` // RUNNER_CLOUD_MODE CloudAPIKey string `envconfig:"RUNNER_CLOUD_API_KEY"` // RUNNER_CLOUD_API_KEY CloudAPITLSInsecure bool `envconfig:"RUNNER_CLOUD_API_TLS_INSECURE"` // RUNNER_CLOUD_API_TLS_INSECURE @@ -56,6 +58,8 @@ func printParams(params Params) { output.PrintLogf("RUNNER_GITUSERNAME=\"%s\"", params.GitUsername) printSensitiveParam("RUNNER_GITTOKEN", params.GitToken) output.PrintLogf("RUNNER_DATADIR=\"%s\"", params.DataDir) + output.PrintLogf("RUNNER_CLUSTERID=\"%s\"", params.ClusterID) + output.PrintLogf("RUNNER_CDEVENTS_TARGET=\"%s\"", params.CDEventsTarget) output.PrintLogf("RUNNER_CLOUD_MODE=\"%t\"", params.CloudMode) output.PrintLogf("RUNNER_CLOUD_API_TLS_INSECURE=\"%t\"", params.CloudAPITLSInsecure) output.PrintLogf("RUNNER_CLOUD_API_URL=\"%s\"", params.CloudAPIURL) diff --git a/pkg/event/emitter.go b/pkg/event/emitter.go index ff6278999a2..5cdb7bcf6b3 100644 --- a/pkg/event/emitter.go +++ b/pkg/event/emitter.go @@ -170,7 +170,7 @@ func (e *Emitter) notifyHandler(l common.Listener) bus.Handler { log := e.Log.With("listen-on", l.Events(), "queue-group", l.Name(), "selector", l.Selector(), "metadata", l.Metadata()) return func(event testkube.Event) error { if event.Valid(l.Selector(), l.Events()) { - l.Notify(event) + log.Infow("notification result", l.Notify(event)) log.Infow("listener notified", event.Log()...) } else { log.Infow("dropping event not matching selector or type", event.Log()...) diff --git a/pkg/event/kind/cdevent/listener.go b/pkg/event/kind/cdevent/listener.go new file mode 100644 index 00000000000..6119c8e0ebc --- /dev/null +++ b/pkg/event/kind/cdevent/listener.go @@ -0,0 +1,84 @@ +package cdevent + +import ( + "context" + "fmt" + "strings" + + cdevents "github.com/cdevents/sdk-go/pkg/api" + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/event/kind/common" + "github.com/kubeshop/testkube/pkg/log" + cde "github.com/kubeshop/testkube/pkg/mapper/cdevents" +) + +var _ common.Listener = (*CDEventListener)(nil) + +func NewCDEventListener(name, selector, clusterID, defaultNamespace string, events []testkube.EventType, client cloudevents.Client) *CDEventListener { + return &CDEventListener{ + name: name, + Log: log.DefaultLogger, + selector: selector, + events: events, + client: client, + clusterID: clusterID, + defaultNamespace: defaultNamespace, + } +} + +type CDEventListener struct { + name string + Log *zap.SugaredLogger + events []testkube.EventType + selector string + client cloudevents.Client + clusterID string + defaultNamespace string +} + +func (l *CDEventListener) Name() string { + return l.name +} + +func (l *CDEventListener) Selector() string { + return l.selector +} + +func (l *CDEventListener) Events() []testkube.EventType { + return l.events +} +func (l *CDEventListener) Metadata() map[string]string { + return map[string]string{ + "name": l.Name(), + "events": fmt.Sprintf("%v", l.Events()), + "selector": l.Selector(), + } +} + +func (l *CDEventListener) Notify(event testkube.Event) (result testkube.EventResult) { + // Create the base event + ev, err := cde.MapTestkubeEventToCDEvent(event, l.clusterID, l.defaultNamespace) + if err != nil { + return testkube.NewFailedEventResult(event.Id, err) + } + + ce, err := cdevents.AsCloudEvent(ev) + if err != nil { + return testkube.NewFailedEventResult(event.Id, err) + } + + if result := l.client.Send(context.Background(), *ce); cloudevents.IsUndelivered(result) { + return testkube.NewFailedEventResult(event.Id, fmt.Errorf("failed to deliver, %v", result)) + } else if msg := result.Error(); msg != "" && !strings.Contains(msg, "200") { + return testkube.NewFailedEventResult(event.Id, fmt.Errorf("failed to send, %s", msg)) + } + + return testkube.NewSuccessEventResult(event.Id, "event sent to cd event") +} + +func (l *CDEventListener) Kind() string { + return "cdevent" +} diff --git a/pkg/event/kind/cdevent/listener_test.go b/pkg/event/kind/cdevent/listener_test.go new file mode 100644 index 00000000000..502ef7a09e7 --- /dev/null +++ b/pkg/event/kind/cdevent/listener_test.go @@ -0,0 +1,100 @@ +package cdevent + +import ( + "net/http" + "net/http/httptest" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/stretchr/testify/assert" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" +) + +var testEventTypes = []testkube.EventType{*testkube.EventStartTest} + +func TestCDEventListener_Notify(t *testing.T) { + t.Parallel() + + t.Run("send event success response", func(t *testing.T) { + t.Parallel() + + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := cloudevents.NewEventFromHTTPRequest(r) + // then + assert.NoError(t, err) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + l := NewCDEventListener("cdeli", "", "clusterID", "", testEventTypes, client) + + // when + r := l.Notify(testkube.Event{ + Type_: testkube.EventStartTest, + TestExecution: exampleExecution(), + }) + + assert.Equal(t, "", r.Error()) + }) + + t.Run("send event failed response", func(t *testing.T) { + t.Parallel() + + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + l := NewCDEventListener("cdeli", "", "clusterID", "", testEventTypes, client) + + // when + r := l.Notify(testkube.Event{ + Type_: testkube.EventStartTest, + TestExecution: exampleExecution(), + }) + + // then + assert.NotEqual(t, "", r.Error()) + + }) + + t.Run("send event bad uri", func(t *testing.T) { + t.Parallel() + // given + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget("abcdef")) + assert.NoError(t, err) + + l := NewCDEventListener("cdeli", "", "clusterID", "", testEventTypes, client) + + // when + r := l.Notify(testkube.Event{ + Type_: testkube.EventStartTest, + TestExecution: exampleExecution(), + }) + + // then + assert.NotEqual(t, "", r.Error()) + }) + +} + +func exampleExecution() *testkube.Execution { + execution := testkube.NewQueuedExecution() + execution.Name = "test-1" + execution.TestName = "test" + execution.TestNamespace = "testkube" + return execution +} diff --git a/pkg/event/kind/cdevent/loader.go b/pkg/event/kind/cdevent/loader.go new file mode 100644 index 00000000000..36499e98bce --- /dev/null +++ b/pkg/event/kind/cdevent/loader.go @@ -0,0 +1,45 @@ +package cdevent + +import ( + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/event/kind/common" + "github.com/kubeshop/testkube/pkg/log" +) + +var _ common.ListenerLoader = (*CDEventLoader)(nil) + +func NewCDEventLoader(target, clusterID, defaultNamespace string, events []testkube.EventType) (*CDEventLoader, error) { + c, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(target)) + if err != nil { + return nil, err + } + + return &CDEventLoader{ + Log: log.DefaultLogger, + events: events, + client: c, + clusterID: clusterID, + defaultNamespace: defaultNamespace, + }, nil +} + +// CDEventLoader is a reconciler for cdevent events for now it returns single listener for cdevent +type CDEventLoader struct { + Log *zap.SugaredLogger + events []testkube.EventType + client cloudevents.Client + clusterID string + defaultNamespace string +} + +func (r *CDEventLoader) Kind() string { + return "cdevent" +} + +// Load returns single listener for cd eventt +func (r *CDEventLoader) Load() (listeners common.Listeners, err error) { + return common.Listeners{NewCDEventListener("cdevent", "", r.clusterID, r.defaultNamespace, r.events, r.client)}, nil +} diff --git a/pkg/event/kind/cdevent/loader_test.go b/pkg/event/kind/cdevent/loader_test.go new file mode 100644 index 00000000000..fd1826700f1 --- /dev/null +++ b/pkg/event/kind/cdevent/loader_test.go @@ -0,0 +1,19 @@ +package cdevent + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWebhookLoader(t *testing.T) { + t.Parallel() + + cdeventLoader, err := NewCDEventLoader("target", "", "", nil) + assert.NoError(t, err) + + listeners, err := cdeventLoader.Load() + + assert.Equal(t, 1, len(listeners)) + assert.NoError(t, err) +} diff --git a/pkg/event/kind/common/interface_test.go b/pkg/event/kind/common/interface_test.go index a6dbac36b5f..a374d935c90 100644 --- a/pkg/event/kind/common/interface_test.go +++ b/pkg/event/kind/common/interface_test.go @@ -9,8 +9,10 @@ import ( ) func TestCompareListeners(t *testing.T) { - t.Run("both nil metada", func(t *testing.T) { + t.Parallel() + t.Run("both nil metada", func(t *testing.T) { + t.Parallel() l1 := &NilListener{} l2 := &NilListener{} @@ -20,7 +22,7 @@ func TestCompareListeners(t *testing.T) { }) t.Run("one nil metada and one not nil metada", func(t *testing.T) { - + t.Parallel() l1 := &NilListener{} l2 := &FakeListener{} @@ -30,7 +32,7 @@ func TestCompareListeners(t *testing.T) { }) t.Run("equal metada", func(t *testing.T) { - + t.Parallel() l1 := &FakeListener{field1: "1", field2: "2"} l2 := &FakeListener{field1: "1", field2: "2"} @@ -40,7 +42,7 @@ func TestCompareListeners(t *testing.T) { }) t.Run("not equal metada", func(t *testing.T) { - + t.Parallel() l1 := &FakeListener{field1: "2", field2: "1"} l2 := &FakeListener{field1: "1", field2: "2"} diff --git a/pkg/event/kind/common/name_test.go b/pkg/event/kind/common/name_test.go index 4775c5f3c0b..8b77bec67cb 100644 --- a/pkg/event/kind/common/name_test.go +++ b/pkg/event/kind/common/name_test.go @@ -7,7 +7,7 @@ import ( ) func TestListenerName(t *testing.T) { - + t.Parallel() // given in := "webhooks.http://localhost:8080/something/else.start-test" diff --git a/pkg/event/kind/slack/loader.go b/pkg/event/kind/slack/loader.go index 433d33a9d9f..70d87b99d96 100644 --- a/pkg/event/kind/slack/loader.go +++ b/pkg/event/kind/slack/loader.go @@ -26,7 +26,7 @@ func NewSlackLoader(messageTemplate, configString string, events []testkube.Even } } -// SlackLoader is a reconciler for websocket events for now it returns single listener for slack +// SlackLoader is a reconciler for slack events for now it returns single listener for slack type SlackLoader struct { Log *zap.SugaredLogger events []testkube.EventType diff --git a/pkg/event/kind/slack/loader_test.go b/pkg/event/kind/slack/loader_test.go index ad86dc59426..477a85c39f3 100644 --- a/pkg/event/kind/slack/loader_test.go +++ b/pkg/event/kind/slack/loader_test.go @@ -9,8 +9,9 @@ import ( ) func TestSlackLoader_Load(t *testing.T) { - + t.Parallel() t.Run("loads Slack listeners for all event types", func(t *testing.T) { + t.Parallel() // given // default slack notifier is not ready by default l := NewSlackLoader("", "", testkube.AllEventTypes) diff --git a/pkg/event/kind/webhook/listener_test.go b/pkg/event/kind/webhook/listener_test.go index fd47da8c45f..83fbce81e73 100644 --- a/pkg/event/kind/webhook/listener_test.go +++ b/pkg/event/kind/webhook/listener_test.go @@ -17,8 +17,9 @@ const executionID = "id-1" var testEventTypes = []testkube.EventType{testkube.EventType("")} func TestWebhookListener_Notify(t *testing.T) { - + t.Parallel() t.Run("send event success response", func(t *testing.T) { + t.Parallel() // given testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var event testkube.Event @@ -44,6 +45,7 @@ func TestWebhookListener_Notify(t *testing.T) { }) t.Run("send event failed response", func(t *testing.T) { + t.Parallel() // given testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadGateway) @@ -66,6 +68,7 @@ func TestWebhookListener_Notify(t *testing.T) { }) t.Run("send event bad uri", func(t *testing.T) { + t.Parallel() // given s := NewWebhookListener("l1", "http://baduri.badbadbad", "", testEventTypes, "") @@ -81,6 +84,7 @@ func TestWebhookListener_Notify(t *testing.T) { }) t.Run("send event success response using payload field", func(t *testing.T) { + t.Parallel() // given testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body := bytes.NewBuffer([]byte{}) diff --git a/pkg/event/kind/webhook/loader_test.go b/pkg/event/kind/webhook/loader_test.go index 92cf6fb9327..ebbecb69417 100644 --- a/pkg/event/kind/webhook/loader_test.go +++ b/pkg/event/kind/webhook/loader_test.go @@ -20,7 +20,7 @@ func (l DummyLoader) List(selector string) (*executorsv1.WebhookList, error) { } func TestWebhookLoader(t *testing.T) { - + t.Parallel() webhooksLoader := NewWebhookLoader(&DummyLoader{}) listeners, err := webhooksLoader.Load() diff --git a/pkg/executor/client/job.go b/pkg/executor/client/job.go index 7048f9335c9..078b780ceac 100644 --- a/pkg/executor/client/job.go +++ b/pkg/executor/client/job.go @@ -75,6 +75,7 @@ func NewJobExecutor( clientset kubernetes.Interface, registry string, podStartTimeout time.Duration, + clusterID string, ) (client *JobExecutor, err error) { return &JobExecutor{ ClientSet: clientset, @@ -90,6 +91,7 @@ func NewJobExecutor( testsClient: testsClient, registry: registry, podStartTimeout: podStartTimeout, + clusterID: clusterID, }, nil } @@ -113,6 +115,7 @@ type JobExecutor struct { testsClient testsv3.Interface registry string podStartTimeout time.Duration + clusterID string } type JobOptions struct { @@ -140,6 +143,7 @@ type JobOptions struct { EnvSecrets []testkube.EnvReference Labels map[string]string Registry string + ClusterID string } // Logs returns job logs stream channel using kubernetes api @@ -289,7 +293,7 @@ func (c *JobExecutor) MonitorJobForTimeout(ctx context.Context, jobName string) // CreateJob creates new Kubernetes job based on execution and execute options func (c *JobExecutor) CreateJob(ctx context.Context, execution testkube.Execution, options ExecuteOptions) error { jobs := c.ClientSet.BatchV1().Jobs(c.Namespace) - jobOptions, err := NewJobOptions(c.images.Init, c.jobTemplate, c.serviceAccountName, c.registry, execution, options) + jobOptions, err := NewJobOptions(c.images.Init, c.jobTemplate, c.serviceAccountName, c.registry, c.clusterID, execution, options) if err != nil { return err } @@ -711,7 +715,8 @@ func NewJobSpec(log *zap.SugaredLogger, options JobOptions) (*batchv1.Job, error job.Spec.Template.Labels[key] = value } - envs := append(executor.RunnerEnvVars, secretEnvVars...) + envs := append(executor.RunnerEnvVars, corev1.EnvVar{Name: "RUNNER_CLUSTERID", Value: options.ClusterID}) + envs = append(envs, secretEnvVars...) if options.HTTPProxy != "" { envs = append(envs, corev1.EnvVar{Name: "HTTP_PROXY", Value: options.HTTPProxy}) } @@ -737,7 +742,8 @@ func NewJobSpec(log *zap.SugaredLogger, options JobOptions) (*batchv1.Job, error return &job, nil } -func NewJobOptions(initImage, jobTemplate string, serviceAccountName, registry string, execution testkube.Execution, options ExecuteOptions) (jobOptions JobOptions, err error) { +func NewJobOptions(initImage, jobTemplate string, serviceAccountName, registry, clusterID string, + execution testkube.Execution, options ExecuteOptions) (jobOptions JobOptions, err error) { jsn, err := json.Marshal(execution) if err != nil { return jobOptions, err @@ -755,6 +761,7 @@ func NewJobOptions(initImage, jobTemplate string, serviceAccountName, registry s jobOptions.Variables = execution.Variables jobOptions.ServiceAccountName = serviceAccountName jobOptions.Registry = registry + jobOptions.ClusterID = clusterID return } diff --git a/pkg/executor/common.go b/pkg/executor/common.go index 0fcd8a76b2a..c8bfa436adb 100644 --- a/pkg/executor/common.go +++ b/pkg/executor/common.go @@ -76,6 +76,10 @@ var RunnerEnvVars = []corev1.EnvVar{ Name: "RUNNER_DATADIR", Value: "/data", }, + { + Name: "RUNNER_CDEVENTS_TARGET", + Value: os.Getenv("CDEVENTS_TARGET"), + }, { Name: "RUNNER_CLOUD_MODE", Value: getRunnerCloudMode(), diff --git a/pkg/executor/containerexecutor/containerexecutor.go b/pkg/executor/containerexecutor/containerexecutor.go index 17bc4b3f763..55c56c4dbe0 100644 --- a/pkg/executor/containerexecutor/containerexecutor.go +++ b/pkg/executor/containerexecutor/containerexecutor.go @@ -60,6 +60,7 @@ func NewContainerExecutor( testsClient testsv3.Interface, registry string, podStartTimeout time.Duration, + clusterID string, ) (client *ContainerExecutor, err error) { clientSet, err := k8sclient.ConnectToK8s() if err != nil { @@ -81,6 +82,7 @@ func NewContainerExecutor( executorsClient: executorsClient, registry: registry, podStartTimeout: podStartTimeout, + clusterID: clusterID, }, nil } @@ -104,6 +106,7 @@ type ContainerExecutor struct { executorsClient executorsclientv1.Interface registry string podStartTimeout time.Duration + clusterID string } type JobOptions struct { @@ -140,6 +143,7 @@ type JobOptions struct { EnvSecrets []testkube.EnvReference Labels map[string]string Registry string + ClusterID string } // Logs returns job logs stream channel using kubernetes api @@ -275,7 +279,7 @@ func (c *ContainerExecutor) ExecuteSync(ctx context.Context, execution *testkube func (c *ContainerExecutor) createJob(ctx context.Context, execution testkube.Execution, options client.ExecuteOptions) (*JobOptions, error) { jobsClient := c.clientSet.BatchV1().Jobs(c.namespace) - jobOptions, err := NewJobOptions(c.images, c.templates, c.serviceAccountName, c.registry, execution, options) + jobOptions, err := NewJobOptions(c.images, c.templates, c.serviceAccountName, c.registry, c.clusterID, execution, options) if err != nil { return nil, err } diff --git a/pkg/executor/containerexecutor/containerexecutor_test.go b/pkg/executor/containerexecutor/containerexecutor_test.go index 6295963eb7e..763d7b958f4 100644 --- a/pkg/executor/containerexecutor/containerexecutor_test.go +++ b/pkg/executor/containerexecutor/containerexecutor_test.go @@ -120,10 +120,12 @@ func TestNewExecutorJobSpecWithArgs(t *testing.T) { {Name: "RUNNER_SSL", Value: "false"}, {Name: "RUNNER_SCRAPPERENABLED", Value: "false"}, {Name: "RUNNER_DATADIR", Value: "/data"}, + {Name: "RUNNER_CDEVENTS_TARGET", Value: ""}, {Name: "RUNNER_CLOUD_MODE", Value: "false"}, {Name: "RUNNER_CLOUD_API_KEY", Value: ""}, {Name: "RUNNER_CLOUD_API_URL", Value: ""}, {Name: "RUNNER_CLOUD_API_TLS_INSECURE", Value: "false"}, + {Name: "RUNNER_CLUSTERID", Value: ""}, {Name: "key", Value: "value"}, {Name: "aa", Value: "bb"}, } @@ -156,6 +158,7 @@ func TestNewExecutorJobSpecWithWorkingDirRelative(t *testing.T) { executor.Templates{}, "", "", + "", testkube.Execution{ Id: "name", TestName: "name-test-1", @@ -189,6 +192,7 @@ func TestNewExecutorJobSpecWithWorkingDirAbsolute(t *testing.T) { executor.Templates{}, "", "", + "", testkube.Execution{ Id: "name", TestName: "name-test-1", @@ -222,6 +226,7 @@ func TestNewExecutorJobSpecWithoutWorkingDir(t *testing.T) { executor.Templates{}, "", "", + "", testkube.Execution{ Id: "name", TestName: "name-test-1", diff --git a/pkg/executor/containerexecutor/tmpl.go b/pkg/executor/containerexecutor/tmpl.go index 84ed5dd70e4..a997a23e6a9 100644 --- a/pkg/executor/containerexecutor/tmpl.go +++ b/pkg/executor/containerexecutor/tmpl.go @@ -92,7 +92,8 @@ func NewExecutorJobSpec(log *zap.SugaredLogger, options *JobOptions) (*batchv1.J job.Spec.Template.Labels[key] = value } - envs := append(executor.RunnerEnvVars, secretEnvVars...) + envs := append(executor.RunnerEnvVars, corev1.EnvVar{Name: "RUNNER_CLUSTERID", Value: options.ClusterID}) + envs = append(envs, secretEnvVars...) if options.HTTPProxy != "" { envs = append(envs, corev1.EnvVar{Name: "HTTP_PROXY", Value: options.HTTPProxy}) } @@ -155,7 +156,7 @@ func NewScraperJobSpec(log *zap.SugaredLogger, options *JobOptions) (*batchv1.Jo return nil, fmt.Errorf("decoding scraper job spec error: %w", err) } - envs := executor.RunnerEnvVars + envs := append(executor.RunnerEnvVars, corev1.EnvVar{Name: "RUNNER_CLUSTERID", Value: options.ClusterID}) if options.HTTPProxy != "" { envs = append(envs, corev1.EnvVar{Name: "HTTP_PROXY", Value: options.HTTPProxy}) } @@ -196,7 +197,7 @@ func NewPersistentVolumeClaimSpec(log *zap.SugaredLogger, options *JobOptions) ( } // NewJobOptions provides job options for templates -func NewJobOptions(images executor.Images, templates executor.Templates, serviceAccountName, registry string, +func NewJobOptions(images executor.Images, templates executor.Templates, serviceAccountName, registry, clusterID string, execution testkube.Execution, options client.ExecuteOptions) (*JobOptions, error) { jsn, err := json.Marshal(execution) if err != nil { @@ -220,5 +221,6 @@ func NewJobOptions(images executor.Images, templates executor.Templates, service jobOptions.Variables = execution.Variables jobOptions.ServiceAccountName = serviceAccountName jobOptions.Registry = registry + jobOptions.ClusterID = clusterID return jobOptions, nil } diff --git a/pkg/executor/output/parser_test.go b/pkg/executor/output/parser_test.go index e74e4ddc72e..adfdcc049fc 100644 --- a/pkg/executor/output/parser_test.go +++ b/pkg/executor/output/parser_test.go @@ -125,6 +125,8 @@ func TestParseRunnerOutput(t *testing.T) { {"type":"line","content":"RUNNER_GITUSERNAME=\"\"","time":"2023-01-17T15:29:17.921800138Z"} {"type":"line","content":"RUNNER_GITTOKEN=\"\"","time":"2023-01-17T15:29:17.921801596Z"} {"type":"line","content":"RUNNER_DATADIR=\"/data\"","time":"2023-01-17T15:29:17.921803138Z"} +{"type":"line","content":"RUNNER_CDEVENTS_TARGET=\"\"","time":"2023-01-17T15:29:17.921801596Z"} +{"type":"line","content":"RUNNER_CLUSTERID=\"\"","time":"2023-01-17T15:29:17.921801596Z"} {"type":"error","content":"❌ can't find branch or commit in params, repo:\u0026{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret:\u003cnil\u003e TokenSecret:\u003cnil\u003e WorkingDir:}","time":"2023-01-17T15:29:17.921940304Z"} {"type":"error","content":"can't find branch or commit in params, repo:\u0026{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret:\u003cnil\u003e TokenSecret:\u003cnil\u003e WorkingDir:}","time":"2023-01-17T15:29:17.921946638Z"} {"type":"event","content":"running test [63c6bec1790802b7e3e57048]","time":"2023-01-17T15:29:17.921920596Z"} @@ -143,6 +145,8 @@ RUNNER_SCRAPPERENABLED="true" RUNNER_GITUSERNAME="" RUNNER_GITTOKEN="" RUNNER_DATADIR="/data" +RUNNER_CDEVENTS_TARGET="" +RUNNER_CLUSTERID="" ❌ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret: TokenSecret: WorkingDir:} can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret: TokenSecret: WorkingDir:} running test [63c6bec1790802b7e3e57048] @@ -191,6 +195,8 @@ could not start process: fork/exec ./zap-api-scan.py: no such file or directory {"type":"line","content":"RUNNER_GITUSERNAME=\"\"","time":"2023-01-19T15:22:25.867984179Z"} {"type":"line","content":"RUNNER_GITTOKEN=\"\"","time":"2023-01-19T15:22:25.867986013Z"} {"type":"line","content":"RUNNER_DATADIR=\"/data\"","time":"2023-01-19T15:22:25.867987596Z"} +{"type":"line","content":"RUNNER_CDEVENTS_TARGET=\"\"","time":"2023-01-17T15:29:17.921801596Z"} +{"type":"line","content":"RUNNER_CLUSTERID=\"\"","time":"2023-01-17T15:29:17.921801596Z"} {"type":"event","content":"running test [63c960287104b0fa0b7a45ef]","time":"2023-01-19T15:22:25.868132888Z"} {"type":"line","content":"🚚 Preparing for test run","time":"2023-01-19T15:22:25.868161346Z"} {"type":"line","content":"❌ can't find branch or commit in params, repo:\u0026{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret:\u003cnil\u003e TokenSecret:\u003cnil\u003e WorkingDir:}","time":"2023-01-19T15:22:25.868183971Z"} @@ -209,6 +215,8 @@ RUNNER_SCRAPPERENABLED="true" RUNNER_GITUSERNAME="" RUNNER_GITTOKEN="" RUNNER_DATADIR="/data" +RUNNER_CDEVENTS_TARGET="" +RUNNER_CLUSTERID="" running test [63c960287104b0fa0b7a45ef] 🚚 Preparing for test run ❌ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret: TokenSecret: WorkingDir:} @@ -240,6 +248,8 @@ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github. {"type":"line","content":"RUNNER_GITUSERNAME=\"\"","time":"2023-01-19T15:22:25.867984179Z"} {"type":"line","content":"RUNNER_GITTOKEN=\"\"","time":"2023-01-19T15:22:25.867986013Z"} {"type":"line","content":"RUNNER_DATADIR=\"/data\"","time":"2023-01-19T15:22:25.867987596Z"} +{"type":"line","content":"RUNNER_CDEVENTS_TARGET=\"\"","time":"2023-01-17T15:29:17.921801596Z"} +{"type":"line","content":"RUNNER_CLUSTERID=\"\"","time":"2023-01-17T15:29:17.921801596Z"} {"type":"event","content":"running test [63c960287104b0fa0b7a45ef]","time":"2023-01-19T15:22:25.868132888Z"} {"type":"line","content":"🚚 Preparing for test run","time":"2023-01-19T15:22:25.868161346Z"} {"type":"line","content":"❌ can't find branch or commit in params, repo:\u0026{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret:\u003cnil\u003e TokenSecret:\u003cnil\u003e WorkingDir:}","time":"2023-01-19T15:22:25.868183971Z"} @@ -264,6 +274,8 @@ RUNNER_SCRAPPERENABLED="true" RUNNER_GITUSERNAME="" RUNNER_GITTOKEN="" RUNNER_DATADIR="/data" +RUNNER_CDEVENTS_TARGET="" +RUNNER_CLUSTERID="" running test [63c960287104b0fa0b7a45ef] 🚚 Preparing for test run ❌ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret: TokenSecret: WorkingDir:} @@ -296,6 +308,8 @@ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github. {"type":"line","content":"RUNNER_GITUSERNAME=\"\"","time":"2023-01-20T12:44:15.718983549Z"} {"type":"line","content":"RUNNER_GITTOKEN=\"\"","time":"2023-01-20T12:44:15.718986174Z"} {"type":"line","content":"RUNNER_DATADIR=\"/data\"","time":"2023-01-20T12:44:15.718989049Z"} +{"type":"line","content":"RUNNER_CDEVENTS_TARGET=\"\"","time":"2023-01-17T15:29:17.921801596Z"} +{"type":"line","content":"RUNNER_CLUSTERID=\"\"","time":"2023-01-17T15:29:17.921801596Z"} {"type":"event","content":"running test [63ca8c8988564860327a16b5]","time":"2023-01-20T12:44:15.719276383Z"} {"type":"line","content":"🚚 Preparing for test run","time":"2023-01-20T12:44:15.719285633Z"} {"type":"line","content":"❌ can't find branch or commit in params, repo:\u0026{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret:\u003cnil\u003e TokenSecret:\u003cnil\u003e WorkingDir:}","time":"2023-01-20T12:44:15.719302049Z"} @@ -314,6 +328,8 @@ RUNNER_SCRAPPERENABLED="true" RUNNER_GITUSERNAME="" RUNNER_GITTOKEN="" RUNNER_DATADIR="/data" +RUNNER_CDEVENTS_TARGET="" +RUNNER_CLUSTERID="" running test [63ca8c8988564860327a16b5] 🚚 Preparing for test run ❌ can't find branch or commit in params, repo:&{Type_:git-file Uri:https://github.com/kubeshop/testkube.git Branch: Commit: Path:test/cypress/executor-smoke/cypress-11 Username: Token: UsernameSecret: TokenSecret: WorkingDir:} diff --git a/pkg/executor/scraper/extractor.go b/pkg/executor/scraper/extractor.go index 2e194cade71..00d83680c17 100644 --- a/pkg/executor/scraper/extractor.go +++ b/pkg/executor/scraper/extractor.go @@ -16,11 +16,13 @@ const ( //go:generate mockgen -destination=./mock_extractor.go -package=scraper "github.com/kubeshop/testkube/pkg/executor/scraper" Extractor type Extractor interface { - Extract(ctx context.Context, paths []string, process ProcessFn) error + Extract(ctx context.Context, paths []string, process ProcessFn, notify NotifyFn) error } type ProcessFn func(ctx context.Context, object *Object) error +type NotifyFn func(ctx context.Context, path string) error + type Object struct { Name string Size int64 diff --git a/pkg/executor/scraper/factory/factory.go b/pkg/executor/scraper/factory/factory.go index 81deaea83dc..e043b15f2d9 100644 --- a/pkg/executor/scraper/factory/factory.go +++ b/pkg/executor/scraper/factory/factory.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/pkg/errors" "github.com/kubeshop/testkube/pkg/agent" @@ -76,7 +77,15 @@ func GetScraper(ctx context.Context, params envs.Params, extractorType Extractor return nil, errors.Errorf("unknown uploader type: %s", uploaderType) } - return scraper.NewExtractLoadScraper(extractor, loader), nil + var cdeventsClient cloudevents.Client + if params.CDEventsTarget != "" { + cdeventsClient, err = cloudevents.NewClientHTTP(cloudevents.WithTarget(params.CDEventsTarget)) + if err != nil { + log.DefaultLogger.Warnf("failed to create cloud event client %w", err) + } + } + + return scraper.NewExtractLoadScraper(extractor, loader, cdeventsClient, params.ClusterID), nil } func getCloudLoader(ctx context.Context, params envs.Params) (uploader *cloudscraper.CloudUploader, err error) { diff --git a/pkg/executor/scraper/filesystem_extractor.go b/pkg/executor/scraper/filesystem_extractor.go index bcf3f851ded..cff2e84e5e6 100644 --- a/pkg/executor/scraper/filesystem_extractor.go +++ b/pkg/executor/scraper/filesystem_extractor.go @@ -44,7 +44,7 @@ func GenerateTarballMetaFile() ArchiveFilesystemExtractorOpts { } } -func (e *ArchiveFilesystemExtractor) Extract(ctx context.Context, paths []string, process ProcessFn) error { +func (e *ArchiveFilesystemExtractor) Extract(ctx context.Context, paths []string, process ProcessFn, notify NotifyFn) error { var archiveFiles []*archive.File for _, dir := range paths { log.DefaultLogger.Infof("scraping artifacts in directory: %v", dir) @@ -67,11 +67,16 @@ func (e *ArchiveFilesystemExtractor) Extract(ctx context.Context, paths []string return nil } + if err := notify(ctx, path); err != nil { + log.DefaultLogger.Warnf("error notifying for file %s", path) + } + archiveFile, err := e.newArchiveFile(dir, path) if err != nil { return errors.Wrapf(err, "error creating archive file for path %s", path) } archiveFiles = append(archiveFiles, archiveFile) + return nil }, ) @@ -189,7 +194,7 @@ func NewRecursiveFilesystemExtractor(fs filesystem.FileSystem) *RecursiveFilesys return &RecursiveFilesystemExtractor{fs: fs} } -func (e *RecursiveFilesystemExtractor) Extract(ctx context.Context, paths []string, process ProcessFn) error { +func (e *RecursiveFilesystemExtractor) Extract(ctx context.Context, paths []string, process ProcessFn, notify NotifyFn) error { for _, dir := range paths { log.DefaultLogger.Infof("scraping artifacts in directory: %v", dir) @@ -211,6 +216,10 @@ func (e *RecursiveFilesystemExtractor) Extract(ctx context.Context, paths []stri return nil } + if err := notify(ctx, path); err != nil { + log.DefaultLogger.Warnf("error notifying for file %s", path) + } + reader, err := e.fs.OpenFileBuffered(path) if err != nil { return errors.Wrapf(err, "error opening buffered %s", path) diff --git a/pkg/executor/scraper/filesystem_extractor_integration_test.go b/pkg/executor/scraper/filesystem_extractor_integration_test.go index c44a3c434da..d7e0e000291 100644 --- a/pkg/executor/scraper/filesystem_extractor_integration_test.go +++ b/pkg/executor/scraper/filesystem_extractor_integration_test.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "strings" "testing" "github.com/kubeshop/testkube/pkg/utils/test" @@ -51,9 +52,16 @@ func TestArchiveFilesystemExtractor_Extract_NoMeta_Integration(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + if !strings.Contains(path, "file1.txt") && !strings.Contains(path, "file2.txt") && !strings.Contains(path, "subdir/file3.txt") { + t.Fatalf("Unexpected path: %s", path) + } + return nil + } + extractor := scraper.NewArchiveFilesystemExtractor(filesystem.NewOSFileSystem()) scrapeDirs := []string{tempDir} - err = extractor.Extract(context.Background(), scrapeDirs, processFn) + err = extractor.Extract(context.Background(), scrapeDirs, processFn, notifyFn) require.NoError(t, err) assert.Equal(t, 1, processCallCount) } @@ -116,9 +124,16 @@ func TestArchiveFilesystemExtractor_Extract_Meta_Integration(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + if !strings.Contains(path, "file1.txt") && !strings.Contains(path, "file2.txt") && !strings.Contains(path, "subdir/file3.txt") { + t.Fatalf("Unexpected path: %s", path) + } + return nil + } + extractor := scraper.NewArchiveFilesystemExtractor(filesystem.NewOSFileSystem(), scraper.GenerateTarballMetaFile()) scrapeDirs := []string{tempDir} - err = extractor.Extract(context.Background(), scrapeDirs, processFn) + err = extractor.Extract(context.Background(), scrapeDirs, processFn, notifyFn) require.NoError(t, err) assert.Equal(t, 2, processCallCount) } @@ -170,9 +185,17 @@ func TestRecursiveFilesystemExtractor_Extract_Integration(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + if !strings.Contains(path, "file1.txt") && !strings.Contains(path, "file2.txt") && !strings.Contains(path, "subdir/file3.txt") { + t.Fatalf("unexpected file: %s", path) + } + + return nil + } + extractor := scraper.NewRecursiveFilesystemExtractor(filesystem.NewOSFileSystem()) scrapeDirs := []string{tempDir, "/nonexistent"} - err = extractor.Extract(context.Background(), scrapeDirs, processFn) + err = extractor.Extract(context.Background(), scrapeDirs, processFn, notifyFn) require.NoError(t, err) assert.Equal(t, processCallCount, 3) } @@ -206,9 +229,16 @@ func TestRecursiveFilesystemExtractor_Extract_RelPath_Integration(t *testing.T) return nil } + notifyFn := func(ctx context.Context, path string) error { + if !strings.Contains(path, "file1.txt") { + t.Fatalf("unexpected path: %s", path) + } + return nil + } + extractor := scraper.NewRecursiveFilesystemExtractor(filesystem.NewOSFileSystem()) scrapeDirs := []string{filepath.Join(tempDir, "file1.txt"), "/nonexistent"} - err = extractor.Extract(context.Background(), scrapeDirs, processFn) + err = extractor.Extract(context.Background(), scrapeDirs, processFn, notifyFn) require.NoError(t, err) assert.Equal(t, processCallCount, 1) } diff --git a/pkg/executor/scraper/filesystem_extractor_test.go b/pkg/executor/scraper/filesystem_extractor_test.go index e1a874402e9..da3937ee9f4 100644 --- a/pkg/executor/scraper/filesystem_extractor_test.go +++ b/pkg/executor/scraper/filesystem_extractor_test.go @@ -44,8 +44,13 @@ func TestRecursiveFilesystemExtractor_Extract(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + assert.Equal(t, "/my/directory/file1", path) + return nil + } + // Call the Extract function - err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn) + err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn, notifyFn) assert.NoErrorf(t, err, "Extract failed: %v", err) } @@ -91,8 +96,13 @@ func TestArchiveFilesystemExtractor_Extract_NoMeta(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + assert.Equal(t, "/my/directory/file1", path) + return nil + } + // Call the Extract function - err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn) + err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn, notifyFn) assert.NoErrorf(t, err, "Extract failed: %v", err) assert.Equal(t, 1, processFnCallCount) } @@ -154,8 +164,13 @@ func TestArchiveFilesystemExtractor_Extract_Meta(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + assert.Equal(t, "/my/directory/file1", path) + return nil + } + // Call the Extract function - err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn) + err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn, notifyFn) assert.NoErrorf(t, err, "Extract failed: %v", err) assert.Equal(t, 2, processFnCallCount) } @@ -175,8 +190,13 @@ func TestRecursiveFilesystemExtractor_ExtractEmpty(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + t.Fatalf("notifyFn should not be called when no files were scraped") + return nil + } + // Call the Extract function - err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn) + err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn, notifyFn) assert.NoErrorf(t, err, "Extract failed: %v", err) } @@ -195,7 +215,12 @@ func TestArchiveFilesystemExtractor_ExtractEmpty(t *testing.T) { return nil } + notifyFn := func(ctx context.Context, path string) error { + t.Fatalf("notifyFn should not be called when no files were scraped") + return nil + } + // Call the Extract function - err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn) + err := extractor.Extract(context.Background(), []string{"/my/directory"}, processFn, notifyFn) assert.NoErrorf(t, err, "Extract failed: %v", err) } diff --git a/pkg/executor/scraper/minio_scraper_integration_test.go b/pkg/executor/scraper/minio_scraper_integration_test.go index 6c88a435690..ef942616deb 100644 --- a/pkg/executor/scraper/minio_scraper_integration_test.go +++ b/pkg/executor/scraper/minio_scraper_integration_test.go @@ -2,12 +2,15 @@ package scraper_test import ( "context" + "net/http" + "net/http/httptest" "os" "path/filepath" "testing" "github.com/kubeshop/testkube/pkg/utils/test" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,7 +57,21 @@ func TestMinIOScraper_Archive_Integration(t *testing.T) { } execution := testkube.Execution{Id: "minio-test"} - s := scraper.NewExtractLoadScraper(extractor, loader) + + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := cloudevents.NewEventFromHTTPRequest(r) + // then + assert.NoError(t, err) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + s := scraper.NewExtractLoadScraper(extractor, loader, client, "") err = s.Scrape(context.Background(), []string{tempDir}, execution) if err != nil { t.Fatalf("error scraping: %v", err) @@ -108,7 +125,21 @@ func TestMinIOScraper_Recursive_Integration(t *testing.T) { } execution := testkube.Execution{Id: "minio-test"} - s := scraper.NewExtractLoadScraper(extractor, loader) + + // given + testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := cloudevents.NewEventFromHTTPRequest(r) + // then + assert.NoError(t, err) + }) + + svr := httptest.NewServer(testHandler) + defer svr.Close() + + client, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(svr.URL)) + assert.NoError(t, err) + + s := scraper.NewExtractLoadScraper(extractor, loader, client, "") err = s.Scrape(context.Background(), []string{tempDir}, execution) if err != nil { t.Fatalf("error scraping: %v", err) diff --git a/pkg/executor/scraper/mock_extractor.go b/pkg/executor/scraper/mock_extractor.go index 6d18429aa42..25b9b80416f 100644 --- a/pkg/executor/scraper/mock_extractor.go +++ b/pkg/executor/scraper/mock_extractor.go @@ -35,15 +35,15 @@ func (m *MockExtractor) EXPECT() *MockExtractorMockRecorder { } // Extract mocks base method. -func (m *MockExtractor) Extract(arg0 context.Context, arg1 ProcessFn) error { +func (m *MockExtractor) Extract(arg0 context.Context, arg1 []string, arg2 ProcessFn, arg3 NotifyFn) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Extract", arg0, arg1) + ret := m.ctrl.Call(m, "Extract", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // Extract indicates an expected call of Extract. -func (mr *MockExtractorMockRecorder) Extract(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockExtractorMockRecorder) Extract(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Extract", reflect.TypeOf((*MockExtractor)(nil).Extract), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Extract", reflect.TypeOf((*MockExtractor)(nil).Extract), arg0, arg1, arg2, arg3) } diff --git a/pkg/executor/scraper/scraper.go b/pkg/executor/scraper/scraper.go index e77dd5f3e44..43c269417ad 100644 --- a/pkg/executor/scraper/scraper.go +++ b/pkg/executor/scraper/scraper.go @@ -2,8 +2,15 @@ package scraper import ( "context" + "fmt" + + cdevents "github.com/cdevents/sdk-go/pkg/api" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/gabriel-vasile/mimetype" "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/log" + cde "github.com/kubeshop/testkube/pkg/mapper/cdevents" ) // Scraper is responsible for collecting and persisting the execution artifacts @@ -16,25 +23,62 @@ type Scraper interface { } type ExtractLoadScraper struct { - extractor Extractor - loader Uploader + extractor Extractor + loader Uploader + cdeventsClient cloudevents.Client + clusterID string } -func NewExtractLoadScraper(extractor Extractor, loader Uploader) *ExtractLoadScraper { +func NewExtractLoadScraper(extractor Extractor, loader Uploader, cdeventsClient cloudevents.Client, clusterID string) *ExtractLoadScraper { return &ExtractLoadScraper{ - extractor: extractor, - loader: loader, + extractor: extractor, + loader: loader, + cdeventsClient: cdeventsClient, + clusterID: clusterID, } } func (s *ExtractLoadScraper) Scrape(ctx context.Context, paths []string, execution testkube.Execution) error { return s. extractor. - Extract(ctx, paths, func(ctx context.Context, object *Object) error { - return s.loader.Upload(ctx, object, execution) - }) + Extract(ctx, paths, + func(ctx context.Context, object *Object) error { + return s.loader.Upload(ctx, object, execution) + }, + func(ctx context.Context, path string) error { + if s.cdeventsClient != nil { + if err := s.sendCDEvent(execution, path); err != nil { + return err + } + } + + return nil + }) } func (s *ExtractLoadScraper) Close() error { return s.loader.Close() } + +func (s *ExtractLoadScraper) sendCDEvent(execution testkube.Execution, path string) error { + mtype, err := mimetype.DetectFile(path) + if err != nil { + log.DefaultLogger.Warnf("failed to detect mime type %w", err) + } + + ev, err := cde.MapTestkubeArtifactToCDEvent(&execution, s.clusterID, mtype.String()) + if err != nil { + return err + } + + ce, err := cdevents.AsCloudEvent(ev) + if err != nil { + return err + } + + if result := s.cdeventsClient.Send(context.Background(), *ce); cloudevents.IsUndelivered(result) { + return fmt.Errorf("failed to send, %v", result) + } + + return nil +} diff --git a/pkg/mapper/cdevents/mapper.go b/pkg/mapper/cdevents/mapper.go new file mode 100644 index 00000000000..43873d7a314 --- /dev/null +++ b/pkg/mapper/cdevents/mapper.go @@ -0,0 +1,414 @@ +package cdevents + +import ( + "errors" + "fmt" + "strings" + + cdevents "github.com/cdevents/sdk-go/pkg/api" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" +) + +// MapTestkubeEventToCDEvent maps OpenAPI spec Event to CDEvent CDEventReader +func MapTestkubeEventToCDEvent(tkEvent testkube.Event, clusterID, defaultNamespace string) (cdevents.CDEventReader, error) { + if tkEvent.Type_ == nil { + return nil, errors.New("empty event type") + } + + switch *tkEvent.Type_ { + case *testkube.EventStartTest: + return MapTestkubeEventStartTestToCDEvent(tkEvent, clusterID, defaultNamespace) + case *testkube.EventEndTestAborted, *testkube.EventEndTestFailed, *testkube.EventEndTestTimeout, *testkube.EventEndTestSuccess: + return MapTestkubeEventFinishTestToCDEvent(tkEvent, clusterID, defaultNamespace) + case *testkube.EventStartTestSuite: + return MapTestkubeEventStartTestSuiteToCDEvent(tkEvent, clusterID) + case *testkube.EventEndTestSuiteAborted, *testkube.EventEndTestSuiteFailed, *testkube.EventEndTestSuiteTimeout, *testkube.EventEndTestSuiteSuccess: + return MapTestkubeEventFinishTestSuiteToCDEvent(tkEvent, clusterID) + } + + return nil, fmt.Errorf("not supported event type %s", tkEvent.Type_) +} + +// MapTestkubeEventQueuedTestToCDEvent maps OpenAPI spec Queued Test Event to CDEvent CDEventReader +func MapTestkubeEventQueuedTestToCDEvent(event testkube.Event, clusterID, defaultNamespace string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestCaseRunQueuedEvent() + if err != nil { + return nil, err + } + + if event.TestExecution != nil { + ev.SetSubjectId(event.TestExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestExecution != nil { + ev.SetSubjectTestCase(&cdevents.TestCaseRunQueuedSubjectContentTestCase{ + Id: event.TestExecution.TestName, + Type: MapTestkubeTestTypeToCDEventTestCaseType(event.TestExecution.TestType), + }) + + namespace := event.TestExecution.TestNamespace + if namespace == "" { + namespace = defaultNamespace + } + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: namespace, + Source: clusterID, + }) + + if event.TestExecution.RunningContext != nil { + ev.SetSubjectTrigger(&cdevents.TestCaseRunQueuedSubjectContentTrigger{ + Type: MapTestkubeRunningContextTypeToCDEventTiggerType(event.TestExecution.RunningContext.Type_), + }) + } + + if event.TestExecution.TestSuiteName != "" { + ev.SetSubjectTestSuiteRun(&cdevents.Reference{ + Id: event.TestExecution.TestSuiteName, + Source: clusterID, + }) + } + } + + return ev, nil +} + +// MapTestkubeEventStartTestToCDEvent maps OpenAPI spec Start Test Event to CDEvent CDEventReader +func MapTestkubeEventStartTestToCDEvent(event testkube.Event, clusterID, defaultNamespace string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestCaseRunStartedEvent() + if err != nil { + return nil, err + } + + if event.TestExecution != nil { + ev.SetSubjectId(event.TestExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestExecution != nil { + ev.SetSubjectTestCase(&cdevents.TestCaseRunStartedSubjectContentTestCase{ + Id: event.TestExecution.TestName, + Type: MapTestkubeTestTypeToCDEventTestCaseType(event.TestExecution.TestType), + }) + + namespace := event.TestExecution.TestNamespace + if namespace == "" { + namespace = defaultNamespace + } + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: namespace, + Source: clusterID, + }) + + if event.TestExecution.RunningContext != nil { + ev.SetSubjectTrigger(&cdevents.TestCaseRunStartedSubjectContentTrigger{ + Type: MapTestkubeRunningContextTypeToCDEventTiggerType(event.TestExecution.RunningContext.Type_), + }) + } + + if event.TestExecution.TestSuiteName != "" { + ev.SetSubjectTestSuiteRun(&cdevents.Reference{ + Id: event.TestExecution.TestSuiteName, + Source: clusterID, + }) + } + } + + return ev, nil +} + +// MapTestkubeEventFinishTestToCDEvent maps OpenAPI spec Failed, Aborted, Timeout, Success Test Event to CDEvent CDEventReader +func MapTestkubeEventFinishTestToCDEvent(event testkube.Event, clusterID, defaultNamespace string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestCaseRunFinishedEvent() + if err != nil { + return nil, err + } + + if event.TestExecution != nil { + ev.SetSubjectId(event.TestExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestExecution != nil { + ev.SetSubjectTestCase(&cdevents.TestCaseRunFinishedSubjectContentTestCase{ + Id: event.TestExecution.TestName, + Type: MapTestkubeTestTypeToCDEventTestCaseType(event.TestExecution.TestType), + }) + + namespace := event.TestExecution.TestNamespace + if namespace == "" { + namespace = defaultNamespace + } + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: namespace, + Source: clusterID, + }) + + if event.TestExecution.IsAborted() || event.TestExecution.IsTimeout() { + ev.SetSubjectOutcome("cancel") + if event.TestExecution.ExecutionResult != nil { + ev.SetSubjectReason(event.TestExecution.ExecutionResult.ErrorMessage) + } + } + + if event.TestExecution.IsFailed() { + ev.SetSubjectOutcome("fail") + if event.TestExecution.ExecutionResult != nil { + ev.SetSubjectReason(event.TestExecution.ExecutionResult.ErrorMessage) + } + } + + if event.TestExecution.IsPassed() { + ev.SetSubjectOutcome("pass") + } + + if event.TestExecution.TestSuiteName != "" { + ev.SetSubjectTestSuiteRun(&cdevents.Reference{ + Id: event.TestExecution.TestSuiteName, + Source: clusterID, + }) + } + } + + return ev, nil +} + +// MapTestkubeArtifactToCDEvent maps OpenAPI spec Artifact to CDEvent CDEventReader +func MapTestkubeArtifactToCDEvent(execution *testkube.Execution, clusterID, format string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestOutputPublishedEvent() + if err != nil { + return nil, err + } + + ev.SetSubjectId(execution.Name) + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + ev.SetSubjectTestCaseRun(&cdevents.Reference{ + Id: execution.TestName, + Source: clusterID, + }) + + ev.SetSubjectFormat(format) + ev.SetSubjectOutputType(MapMimeTypeToCDEventOutputType(format)) + + return ev, nil +} + +// MapTestkubeEventQueuedTestSuiteToCDEvent maps OpenAPI spec Queued Test Suite Event to CDEvent CDEventReader +func MapTestkubeEventQueuedTestSuiteToCDEvent(event testkube.Event, clusterID string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestSuiteRunQueuedEvent() + if err != nil { + return nil, err + } + + if event.TestSuiteExecution != nil { + ev.SetSubjectId(event.TestSuiteExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestSuiteExecution != nil { + if event.TestSuiteExecution.TestSuite != nil { + ev.SetSubjectTestSuite(&cdevents.TestSuiteRunQueuedSubjectContentTestSuite{ + Id: event.TestSuiteExecution.TestSuite.Name, + }) + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: event.TestSuiteExecution.TestSuite.Namespace, + Source: clusterID, + }) + } + + if event.TestSuiteExecution.RunningContext != nil { + ev.SetSubjectTrigger(&cdevents.TestSuiteRunQueuedSubjectContentTrigger{ + Type: MapTestkubeRunningContextTypeToCDEventTiggerType(event.TestSuiteExecution.RunningContext.Type_), + }) + } + } + + return ev, nil +} + +// MapTestkubeEventStartTestSuiteToCDEvent maps OpenAPI spec Start Test Suite Event to CDEvent CDEventReader +func MapTestkubeEventStartTestSuiteToCDEvent(event testkube.Event, clusterID string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestSuiteRunStartedEvent() + if err != nil { + return nil, err + } + + if event.TestSuiteExecution != nil { + ev.SetSubjectId(event.TestSuiteExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestSuiteExecution != nil { + if event.TestSuiteExecution.TestSuite != nil { + ev.SetSubjectTestSuite(&cdevents.TestSuiteRunStartedSubjectContentTestSuite{ + Id: event.TestSuiteExecution.TestSuite.Name, + }) + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: event.TestSuiteExecution.TestSuite.Namespace, + Source: clusterID, + }) + } + + if event.TestSuiteExecution.RunningContext != nil { + ev.SetSubjectTrigger(&cdevents.TestSuiteRunStartedSubjectContentTrigger{ + Type: MapTestkubeRunningContextTypeToCDEventTiggerType(event.TestSuiteExecution.RunningContext.Type_), + }) + } + } + + return ev, nil +} + +// MapTestkubeEventFinishTestSuiteToCDEvent maps OpenAPI spec Failed, Aborted, Timeout, Success Test Event to CDEvent CDEventReader +func MapTestkubeEventFinishTestSuiteToCDEvent(event testkube.Event, clusterID string) (cdevents.CDEventReader, error) { + // Create the base event + ev, err := cdevents.NewTestSuiteRunFinishedEvent() + if err != nil { + return nil, err + } + + if event.TestSuiteExecution != nil { + ev.SetSubjectId(event.TestSuiteExecution.Name) + } + + ev.SetSubjectSource(clusterID) + ev.SetSource(clusterID) + if event.TestSuiteExecution != nil { + if event.TestSuiteExecution.TestSuite != nil { + ev.SetSubjectTestSuite(&cdevents.TestSuiteRunFinishedSubjectContentTestSuite{ + Id: event.TestSuiteExecution.TestSuite.Name, + }) + + ev.SetSubjectEnvironment(&cdevents.Reference{ + Id: event.TestSuiteExecution.TestSuite.Namespace, + Source: clusterID, + }) + } + + if event.TestSuiteExecution.IsAborted() || event.TestSuiteExecution.IsTimeout() { + ev.SetSubjectOutcome("cancel") + var errs []string + for _, result := range event.TestSuiteExecution.StepResults { + if result.Execution != nil && result.Execution.ExecutionResult != nil { + errs = append(errs, result.Execution.ExecutionResult.ErrorMessage) + } + } + + ev.SetSubjectReason(strings.Join(errs, ",")) + } + + if event.TestSuiteExecution.IsFailed() { + ev.SetSubjectOutcome("fail") + var errs []string + for _, result := range event.TestSuiteExecution.StepResults { + if result.Execution != nil && result.Execution.ExecutionResult != nil { + errs = append(errs, result.Execution.ExecutionResult.ErrorMessage) + } + } + + ev.SetSubjectReason(strings.Join(errs, ",")) + } + + if event.TestSuiteExecution.IsPassed() { + ev.SetSubjectOutcome("pass") + } + } + + return ev, nil +} + +// MapTestkubeRunningContextTypeToCDEventTiggerType maps OpenAPI spec Running Context Type to CDEvent Trigger Type +func MapTestkubeRunningContextTypeToCDEventTiggerType(contextType string) string { + switch testkube.RunningContextType(contextType) { + case testkube.RunningContextTypeUserCLI, testkube.RunningContextTypeUserUI: + return "manual" + case testkube.RunningContextTypeTestTrigger, testkube.RunningContextTypeTestSuite: + return "event" + case testkube.RunningContextTypeScheduler: + return "schedule" + } + + return "other" +} + +// MapTestkubeTestTypeToCDEventTestCaseType maps OpenAPI spec Test Type to CDEvent Test Case Type +func MapTestkubeTestTypeToCDEventTestCaseType(testType string) string { + var types = map[string]string{ + "artillery/": "performance", + "curl/": "functional", + "cypress/": "functional", + "ginkgo/": "unit", + "gradle/": "integration", + "jmeter/": "performance", + "k6/": "performance", + "kubepug/": "compliance", + "maven/": "integration", + "playwright/": "functional", + "postman/": "functional", + "soapui/": "functional", + "zap/": "security", + } + + for key, value := range types { + if strings.Contains(testType, key) { + return value + } + } + + return "other" +} + +// MapMimeTypeToCDEventOutputType maps mime type to CDEvent Output Type +func MapMimeTypeToCDEventOutputType(mimeType string) string { + if strings.Contains(mimeType, "video/") || strings.Contains(mimeType, "audio/") { + return "video" + } + + if strings.Contains(mimeType, "image/") { + return "image" + } + + if strings.Contains(mimeType, "text/") { + return "report" + } + + var types = map[string]string{ + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "report", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": "report", + "application/vnd.openxmlformats-officedocument.presentationml.presentation": "report", + "application/vnd.oasis.opendocument.text": "report", + "application/vnd.oasis.opendocument.spreadsheet": "report", + "application/vnd.oasis.opendocument.presentation": "report", + "application/pdf": "report", + "application/vnd.ms-excel": "report", + "application/vnd.ms-powerpoint": "report", + "application/msword": "report", + "application/json": "log", + } + + for key, value := range types { + if mimeType == key { + return value + } + } + + return "other" +} diff --git a/pkg/mapper/cdevents/mapper_test.go b/pkg/mapper/cdevents/mapper_test.go new file mode 100644 index 00000000000..db85eeb3295 --- /dev/null +++ b/pkg/mapper/cdevents/mapper_test.go @@ -0,0 +1,450 @@ +package cdevents + +import ( + "errors" + "testing" + + cdevents "github.com/cdevents/sdk-go/pkg/api" + "github.com/stretchr/testify/assert" + + "github.com/kubeshop/testkube/pkg/api/v1/testkube" +) + +func TestMapTestkubeEventQueuedTestToCDEvent(t *testing.T) { + t.Parallel() + + event := testkube.Event{ + TestExecution: &testkube.Execution{ + Name: "test-1", + TestName: "Test 1", + TestType: "ginkgo/test", + TestNamespace: "default", + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + TestSuiteName: "Suite 1", + }, + } + clusterID := "cluster-1" + defaultNamespace := "default" + + ev, err := MapTestkubeEventQueuedTestToCDEvent(event, clusterID, defaultNamespace) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "test-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestCaseRunQueuedEvent) + assert.Equal(t, true, ok) + + testID := cde.Subject.Content.TestCase.Id + if testID != "Test 1" { + t.Errorf("Unexpected test case id: %s", testID) + } + + testType := cde.Subject.Content.TestCase.Type + if testType != "unit" { + t.Errorf("Unexpected test case type: %s", testType) + } + + envID := cde.Subject.Content.Environment.Id + if envID != defaultNamespace { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + triggerType := cde.Subject.Content.Trigger.Type + if triggerType != "schedule" { + t.Errorf("Unexpected trigger type: %s", triggerType) + } + + suiteID := cde.Subject.Content.TestSuiteRun.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + suiteSource := cde.Subject.Content.TestSuiteRun.Source + if suiteSource != clusterID { + t.Errorf("Unexpected test suite source: %s", suiteSource) + } +} + +func TestMapTestkubeEventStatTestToCDEvent(t *testing.T) { + t.Parallel() + + event := testkube.Event{ + TestExecution: &testkube.Execution{ + Name: "test-1", + TestName: "Test 1", + TestType: "ginkgo/test", + TestNamespace: "default", + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + TestSuiteName: "Suite 1", + }, + } + clusterID := "cluster-1" + defaultNamespace := "default" + + ev, err := MapTestkubeEventStartTestToCDEvent(event, clusterID, defaultNamespace) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "test-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestCaseRunStartedEvent) + assert.Equal(t, true, ok) + + testID := cde.Subject.Content.TestCase.Id + if testID != "Test 1" { + t.Errorf("Unexpected test case id: %s", testID) + } + + testType := cde.Subject.Content.TestCase.Type + if testType != "unit" { + t.Errorf("Unexpected test case type: %s", testType) + } + + envID := cde.Subject.Content.Environment.Id + if envID != defaultNamespace { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + triggerType := cde.Subject.Content.Trigger.Type + if triggerType != "schedule" { + t.Errorf("Unexpected trigger type: %s", triggerType) + } + + suiteID := cde.Subject.Content.TestSuiteRun.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + suiteSource := cde.Subject.Content.TestSuiteRun.Source + if suiteSource != clusterID { + t.Errorf("Unexpected test suite source: %s", suiteSource) + } +} + +func TestMapTestkubeEventFinishTestToCDEvent(t *testing.T) { + t.Parallel() + + result := testkube.NewErrorExecutionResult(errors.New("fake")) + event := testkube.Event{ + TestExecution: &testkube.Execution{ + Name: "test-1", + TestName: "Test 1", + TestType: "ginkgo/test", + TestNamespace: "default", + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + TestSuiteName: "Suite 1", + ExecutionResult: &result, + }, + } + clusterID := "cluster-1" + defaultNamespace := "default" + + ev, err := MapTestkubeEventFinishTestToCDEvent(event, clusterID, defaultNamespace) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "test-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestCaseRunFinishedEvent) + assert.Equal(t, true, ok) + + testID := cde.Subject.Content.TestCase.Id + if testID != "Test 1" { + t.Errorf("Unexpected test case id: %s", testID) + } + + testType := cde.Subject.Content.TestCase.Type + if testType != "unit" { + t.Errorf("Unexpected test case type: %s", testType) + } + + envID := cde.Subject.Content.Environment.Id + if envID != defaultNamespace { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + suiteID := cde.Subject.Content.TestSuiteRun.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + suiteSource := cde.Subject.Content.TestSuiteRun.Source + if suiteSource != clusterID { + t.Errorf("Unexpected test suite source: %s", suiteSource) + } + + outcome := cde.Subject.Content.Outcome + if outcome != "fail" { + t.Errorf("Unexpected outcome: %s", outcome) + } + + reason := cde.Subject.Content.Reason + if reason != "fake" { + t.Errorf("Unexpected reason: %s", reason) + } +} + +func TestMapTestkubeEventQueuedTestSuiteToCDEvent(t *testing.T) { + t.Parallel() + + event := testkube.Event{ + TestSuiteExecution: &testkube.TestSuiteExecution{ + Name: "suite-1", + TestSuite: &testkube.ObjectRef{ + Namespace: "default", + Name: "Suite 1", + }, + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + }, + } + clusterID := "cluster-1" + + ev, err := MapTestkubeEventQueuedTestSuiteToCDEvent(event, clusterID) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "suite-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestSuiteRunQueuedEvent) + assert.Equal(t, true, ok) + + suiteID := cde.Subject.Content.TestSuite.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + envID := cde.Subject.Content.Environment.Id + if envID != "default" { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + triggerType := cde.Subject.Content.Trigger.Type + if triggerType != "schedule" { + t.Errorf("Unexpected trigger type: %s", triggerType) + } +} + +func TestMapTestkubeEventStartTestSuiteToCDEvent(t *testing.T) { + t.Parallel() + + event := testkube.Event{ + TestSuiteExecution: &testkube.TestSuiteExecution{ + Name: "suite-1", + TestSuite: &testkube.ObjectRef{ + Namespace: "default", + Name: "Suite 1", + }, + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + }, + } + clusterID := "cluster-1" + + ev, err := MapTestkubeEventStartTestSuiteToCDEvent(event, clusterID) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "suite-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestSuiteRunStartedEvent) + assert.Equal(t, true, ok) + + suiteID := cde.Subject.Content.TestSuite.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + envID := cde.Subject.Content.Environment.Id + if envID != "default" { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + triggerType := cde.Subject.Content.Trigger.Type + if triggerType != "schedule" { + t.Errorf("Unexpected trigger type: %s", triggerType) + } +} + +func TestMapTestkubeEventFinishTestSuiteToCDEvent(t *testing.T) { + t.Parallel() + + execution := testkube.NewFailedExecution(errors.New("fake")) + event := testkube.Event{ + TestSuiteExecution: &testkube.TestSuiteExecution{ + Name: "suite-1", + TestSuite: &testkube.ObjectRef{ + Namespace: "default", + Name: "Suite 1", + }, + RunningContext: &testkube.RunningContext{ + Type_: "scheduler", + }, + Status: testkube.TestSuiteExecutionStatusFailed, + StepResults: []testkube.TestSuiteStepExecutionResult{ + { + Execution: &execution, + }, + }, + }, + } + clusterID := "cluster-1" + + ev, err := MapTestkubeEventFinishTestSuiteToCDEvent(event, clusterID) + if err != nil { + t.Errorf("Error mapping event: %v", err) + return + } + + subjectID := ev.GetSubjectId() + if subjectID != "suite-1" { + t.Errorf("Unexpected subject ID: %s", subjectID) + } + + subjectSource := ev.GetSubjectSource() + if subjectSource != clusterID { + t.Errorf("Unexpected subject source: %s", subjectSource) + } + + source := ev.GetSource() + if source != clusterID { + t.Errorf("Unexpected source: %s", source) + } + + cde, ok := ev.(*cdevents.TestSuiteRunFinishedEvent) + assert.Equal(t, true, ok) + + suiteID := cde.Subject.Content.TestSuite.Id + if suiteID != "Suite 1" { + t.Errorf("Unexpected test suite id: %s", suiteID) + } + + envID := cde.Subject.Content.Environment.Id + if envID != "default" { + t.Errorf("Unexpected environment id: %s", envID) + } + + envSource := cde.Subject.Content.Environment.Source + if envSource != clusterID { + t.Errorf("Unexpected environment source: %s", envSource) + } + + outcome := cde.Subject.Content.Outcome + if outcome != "fail" { + t.Errorf("Unexpected outcome: %s", outcome) + } + + reason := cde.Subject.Content.Reason + if reason != "fake" { + t.Errorf("Unexpected reason: %s", reason) + } +} diff --git a/pkg/scheduler/test_scheduler.go b/pkg/scheduler/test_scheduler.go index f9ecab55736..c9a828b2da3 100644 --- a/pkg/scheduler/test_scheduler.go +++ b/pkg/scheduler/test_scheduler.go @@ -115,11 +115,6 @@ func (s *Scheduler) executeTest(ctx context.Context, test testkube.Test, request s.logger.Infow("test started", "executionId", execution.Id, "status", execution.ExecutionResult.Status) - // notify immediately only when sync run otherwise job results handler need notify about test finish - if options.Sync && execution.ExecutionResult != nil && *execution.ExecutionResult.Status != testkube.RUNNING_ExecutionStatus { - s.events.Notify(testkube.NewEventEndTestSuccess(&execution)) - } - return execution, nil }