From 1e343780eb31579bfd264550c90c7951b3af2626 Mon Sep 17 00:00:00 2001 From: bradub Date: Fri, 31 May 2024 11:02:26 +0300 Subject: [PATCH] feat(pubsub): add type paramter for event field --- pubsub/go.mod | 28 ++++---- pubsub/go.sum | 72 +++++++++---------- pubsub/inmem/pubsub.go | 26 +++---- pubsub/inmem/pubsub_test.go | 8 +-- pubsub/inmem/subscription.go | 12 ++-- pubsub/kafka/kafka_publisher.go | 4 +- pubsub/kafka/kafka_subscriber.go | 14 ++-- pubsub/kafka/kafka_test.go | 4 +- pubsub/kafkasarama/kafkasarama_publisher.go | 4 +- pubsub/kafkasarama/kafkasarama_subscriber.go | 22 +++--- pubsub/kafkasarama/kafkasarama_test.go | 4 +- .../kafkashopifysarama_server_test.go | 4 +- .../kafkashopifysarama_subscriber.go | 14 ++-- pubsub/pubsub.go | 24 +++---- 14 files changed, 121 insertions(+), 119 deletions(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index 836cbbd..0d51297 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -5,23 +5,23 @@ go 1.21 toolchain go1.21.2 require ( - github.com/IBM/sarama v1.41.3 + github.com/IBM/sarama v1.43.2 github.com/Shopify/sarama v1.38.1 github.com/ThreeDotsLabs/watermill v1.3.5 github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0 - github.com/google/uuid v1.3.1 - github.com/matryer/is v1.4.0 + github.com/google/uuid v1.6.0 + github.com/matryer/is v1.4.1 github.com/xdg-go/scram v1.1.2 - go.uber.org/zap v1.26.0 + go.uber.org/zap v1.27.0 go.uber.org/zap/exp v0.2.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -32,20 +32,20 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/oklog/ulid v1.3.1 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect - go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.14.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/text v0.15.0 // indirect ) diff --git a/pubsub/go.sum b/pubsub/go.sum index 0bdd014..9e63a77 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -1,5 +1,5 @@ -github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= -github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= +github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= @@ -11,8 +11,8 @@ github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0/go.mod h1:w+9jhI7x5ZP67ceSUII github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= -github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -20,17 +20,17 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -53,16 +53,16 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= -github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= +github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -76,8 +76,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -87,25 +87,25 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 h1:/RxdhdIi0HrKSzdWHLjureinjnGL5YQEYevaC/EAg1k= go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0/go.mod h1:BKzh9a9EE+vHuq99EwD2cEa+T+Ts1fQ6W3ovO80mjkY= -go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= -go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= -go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= -go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= -go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= -go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -113,12 +113,12 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -133,8 +133,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/pubsub/inmem/pubsub.go b/pubsub/inmem/pubsub.go index 25f0186..643136c 100644 --- a/pubsub/inmem/pubsub.go +++ b/pubsub/inmem/pubsub.go @@ -11,14 +11,14 @@ import ( ) // Ensure type inmem.PubSub implements interface pubsub.PublishSubscriber. -var _ pubsub.PublishSubscriber[any] = (*PubSub[any])(nil) +var _ pubsub.PublishSubscriber[string, any] = (*PubSub[string, any])(nil) // PubSub represents a PubSub backed my an in memory storage. -type PubSub[T any] struct { +type PubSub[T, P any] struct { mu sync.Mutex // map having channels as keys and subscriptions as value - channelsSubs map[string]map[*Subscription[T]]struct{} + channelsSubs map[string]map[*Subscription[T, P]]struct{} // eventBufferSize is the buffer size of the channel for each subscription. eventBufferSize int @@ -26,15 +26,15 @@ type PubSub[T any] struct { // NewPubSub returns a new instance of PubSub backed // by an in memory storage. -func NewPubSub[T any](eventBufferSize int) *PubSub[T] { - return &PubSub[T]{ - channelsSubs: make(map[string]map[*Subscription[T]]struct{}), +func NewPubSub[T, P any](eventBufferSize int) *PubSub[T, P] { + return &PubSub[T, P]{ + channelsSubs: make(map[string]map[*Subscription[T, P]]struct{}), eventBufferSize: eventBufferSize, } } // Publish publishes event to all the subscriptions of the channels provided. -func (ps *PubSub[T]) Publish(event pubsub.Event[T], channels ...string) error { +func (ps *PubSub[T, P]) Publish(event pubsub.Event[T, P], channels ...string) error { // Ensure at least one channel is provided. if len(channels) == 0 { return ErrNoChannel @@ -75,16 +75,16 @@ func (ps *PubSub[T]) Publish(event pubsub.Event[T], channels ...string) error { var ErrNoChannel = errors.New("no channel given") // Subscribe creates a new subscription for the provided channels. -func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], error) { +func (ps *PubSub[T, P]) Subscribe(channels ...string) (pubsub.Subscription[T, P], error) { // Ensure at least one channel is provided. if len(channels) == 0 { return nil, ErrNoChannel } // Create a new subscription. - sub := &Subscription[T]{ + sub := &Subscription[T, P]{ channels: channels, - c: make(chan pubsub.Event[T], ps.eventBufferSize), + c: make(chan pubsub.Event[T, P], ps.eventBufferSize), pubsub: ps, } @@ -97,7 +97,7 @@ func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], erro subs, ok := ps.channelsSubs[c] if !ok { // Create the subs map if it does not exist. - subs = make(map[*Subscription[T]]struct{}) + subs = make(map[*Subscription[T, P]]struct{}) ps.channelsSubs[c] = subs } @@ -115,7 +115,7 @@ func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], erro // This method wraps the removeSubscription method // with the mutexes. So it's safe to be from external // entities. -func (ps *PubSub[T]) Unsubscribe(sub *Subscription[T]) { +func (ps *PubSub[T, P]) Unsubscribe(sub *Subscription[T, P]) { ps.mu.Lock() defer ps.mu.Unlock() @@ -124,7 +124,7 @@ func (ps *PubSub[T]) Unsubscribe(sub *Subscription[T]) { // removeSubscription closes the subscriptions go channel and // removes it from the pubsubs storage. -func (ps *PubSub[T]) removeSubscription(sub *Subscription[T]) { +func (ps *PubSub[T, P]) removeSubscription(sub *Subscription[T, P]) { // Only close the underlying channel once. sub.once.Do(func() { close(sub.c) diff --git a/pubsub/inmem/pubsub_test.go b/pubsub/inmem/pubsub_test.go index 66f0db3..9079c48 100644 --- a/pubsub/inmem/pubsub_test.go +++ b/pubsub/inmem/pubsub_test.go @@ -18,7 +18,7 @@ func TestPubSub_SubscribeSuccess(t *testing.T) { channelC = "c" ) - ps := NewPubSub[string](eventBufferSize) + ps := NewPubSub[string, string](eventBufferSize) subA, err := ps.Subscribe(channelA) i.NoErr(err) @@ -30,7 +30,7 @@ func TestPubSub_SubscribeSuccess(t *testing.T) { i.NoErr(err) // Publish event for first 2 subscriptions. - _ = ps.Publish(pubsub.Event[string]{Type: "test", Payload: "test"}, channelA) + _ = ps.Publish(pubsub.Event[string, string]{Type: "test", Payload: "test"}, channelA) select { case <-subA.C(): @@ -60,12 +60,12 @@ func TestPubSub_UnsubscribeSuccess(t *testing.T) { channelA = "a" ) - ps := NewPubSub[string](eventBufferSize) + ps := NewPubSub[string, string](eventBufferSize) subscription, err := ps.Subscribe(channelA) i.NoErr(err) - err = ps.Publish(pubsub.Event[string]{Type: "test", Payload: "test"}, channelA) + err = ps.Publish(pubsub.Event[string, string]{Type: "test", Payload: "test"}, channelA) i.NoErr(err) err = subscription.Close() diff --git a/pubsub/inmem/subscription.go b/pubsub/inmem/subscription.go index 3d0537c..f490d0d 100644 --- a/pubsub/inmem/subscription.go +++ b/pubsub/inmem/subscription.go @@ -7,30 +7,30 @@ import ( ) // Ensure type inmem.Subscription implements interface pubsub.Subscription. -var _ pubsub.Subscription[any] = (*Subscription[any])(nil) +var _ pubsub.Subscription[string, any] = (*Subscription[string, any])(nil) // Subscription represents a stream of events published to the channels // of this subscription. -type Subscription[T any] struct { +type Subscription[T, P any] struct { // Channels this subscription is subscribed to. channels []string // Ensures c only closed once once sync.Once // Channel of events - c chan pubsub.Event[T] + c chan pubsub.Event[T, P] - pubsub *PubSub[T] + pubsub *PubSub[T, P] } // Close disconnects the subscription from the service it was created from. -func (s *Subscription[T]) Close() error { +func (s *Subscription[T, P]) Close() error { s.pubsub.Unsubscribe(s) return nil } // C returns a receive-only go channel of events published // on the channels this subscription is subscribed to. -func (s *Subscription[T]) C() <-chan pubsub.Event[T] { +func (s *Subscription[T, P]) C() <-chan pubsub.Event[T, P] { return s.c } diff --git a/pubsub/kafka/kafka_publisher.go b/pubsub/kafka/kafka_publisher.go index 46bc358..266bc5b 100644 --- a/pubsub/kafka/kafka_publisher.go +++ b/pubsub/kafka/kafka_publisher.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -var _ pubsub.Publisher[[]byte] = (*Publisher)(nil) +var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil) // Publisher represents a kafka publisher. type Publisher struct { @@ -42,7 +42,7 @@ func NewPublisher( } // Publish publishes an event to a kafka topic. -func (p Publisher) Publish(event pubsub.Event[[]byte], channels ...string) error { +func (p Publisher) Publish(event pubsub.Event[string, []byte], channels ...string) error { if len(channels) != 1 { return pubsub.ErrExactlyOneChannelAllowed } diff --git a/pubsub/kafka/kafka_subscriber.go b/pubsub/kafka/kafka_subscriber.go index d690cd5..d75ee72 100644 --- a/pubsub/kafka/kafka_subscriber.go +++ b/pubsub/kafka/kafka_subscriber.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap" ) -var _ pubsub.Subscriber[[]byte] = (*Subscriber)(nil) +var _ pubsub.Subscriber[string, []byte] = (*Subscriber)(nil) // Subscriber represents a kafka subscriber. type Subscriber struct { @@ -45,7 +45,7 @@ func NewSubscriber( } // Subscribe subscribes to a kafka topic. -func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], error) { +func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) { if len(channels) != 1 { return nil, pubsub.ErrExactlyOneChannelAllowed } @@ -63,11 +63,11 @@ func (s Subscriber) Close() error { return s.kafkaSubscriber.Close() } -var _ pubsub.Subscription[[]byte] = (*Subscription)(nil) +var _ pubsub.Subscription[string, []byte] = (*Subscription)(nil) // Subscription represents a stream of events published to a kafka topic. type Subscription struct { - eventCh chan pubsub.Event[[]byte] + eventCh chan pubsub.Event[string, []byte] closeCh chan struct{} } @@ -76,7 +76,7 @@ type Subscription struct { func newSubscription( mesCh <-chan *message.Message, ) *Subscription { - eventCh := make(chan pubsub.Event[[]byte]) + eventCh := make(chan pubsub.Event[string, []byte]) closeCh := make(chan struct{}) go func() { @@ -90,7 +90,7 @@ func newSubscription( return } - eventCh <- pubsub.Event[[]byte]{ + eventCh <- pubsub.Event[string, []byte]{ Type: mes.Metadata.Get("type"), Payload: mes.Payload, } @@ -105,7 +105,7 @@ func newSubscription( } // C returns a receive-only go channel of events published. -func (s Subscription) C() <-chan pubsub.Event[[]byte] { +func (s Subscription) C() <-chan pubsub.Event[string, []byte] { return s.eventCh } diff --git a/pubsub/kafka/kafka_test.go b/pubsub/kafka/kafka_test.go index 9972e98..d48e681 100644 --- a/pubsub/kafka/kafka_test.go +++ b/pubsub/kafka/kafka_test.go @@ -74,7 +74,7 @@ func TestPubSub(t *testing.T) { t.Cleanup(func() { is.NoErr(sub2.Close()) }) - mes := pubsub.Event[[]byte]{ + mes := pubsub.Event[string, []byte]{ Type: "test", Payload: []byte("test"), } @@ -213,7 +213,7 @@ func TestConsumerGroups(t *testing.T) { t.Cleanup(func() { is.NoErr(pub.Close()) }) - err = pub.Publish(pubsub.Event[[]byte]{ + err = pub.Publish(pubsub.Event[string, []byte]{ Type: "", Payload: []byte("brad"), }, topic) diff --git a/pubsub/kafkasarama/kafkasarama_publisher.go b/pubsub/kafkasarama/kafkasarama_publisher.go index 3900f32..6371e90 100644 --- a/pubsub/kafkasarama/kafkasarama_publisher.go +++ b/pubsub/kafkasarama/kafkasarama_publisher.go @@ -9,7 +9,7 @@ import ( "github.com/purposeinplay/go-commons/pubsub" ) -var _ pubsub.Publisher[[]byte] = (*Publisher)(nil) +var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil) // Publisher represents a kafka publisher. type Publisher struct { @@ -45,7 +45,7 @@ func NewPublisher( } // Publish publishes an event to a kafka topic. -func (p Publisher) Publish(event pubsub.Event[[]byte], channels ...string) error { +func (p Publisher) Publish(event pubsub.Event[string, []byte], channels ...string) error { if len(channels) != 1 { return pubsub.ErrExactlyOneChannelAllowed } diff --git a/pubsub/kafkasarama/kafkasarama_subscriber.go b/pubsub/kafkasarama/kafkasarama_subscriber.go index 1e68804..4dfa6c8 100644 --- a/pubsub/kafkasarama/kafkasarama_subscriber.go +++ b/pubsub/kafkasarama/kafkasarama_subscriber.go @@ -11,7 +11,7 @@ import ( "github.com/purposeinplay/go-commons/pubsub" ) -var _ pubsub.Subscriber[[]byte] = (*Subscriber)(nil) +var _ pubsub.Subscriber[string, []byte] = (*Subscriber)(nil) // Subscriber represents a kafka subscriber. type Subscriber struct { @@ -46,12 +46,14 @@ func NewSubscriber( } // Subscribe creates a new subscription that runs in the background. -func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], error) { +func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) { if len(channels) != 1 { return nil, pubsub.ErrExactlyOneChannelAllowed } - sarama.NewConsumerGroup(s.brokers, s.consumerGroup, s.cfg) + if _, err := sarama.NewConsumerGroup(s.brokers, s.consumerGroup, s.cfg); err != nil { + return nil, fmt.Errorf("new sarama consumer group: %w", err) + } consumer, err := sarama.NewConsumer(s.brokers, s.cfg) if err != nil { @@ -63,11 +65,11 @@ func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], return newSubscription(s.logger, consumer, topic) } -var _ pubsub.Subscription[[]byte] = (*Subscription)(nil) +var _ pubsub.Subscription[string, []byte] = (*Subscription)(nil) // Subscription represents a stream of events published to a kafka topic. type Subscription struct { - eventCh chan pubsub.Event[[]byte] + eventCh chan pubsub.Event[string, []byte] cancelFunc context.CancelFunc wg *sync.WaitGroup consumer sarama.Consumer @@ -85,7 +87,7 @@ func newSubscription( return nil, fmt.Errorf("get topic %q partitions: %w", topic, err) } - eventCh := make(chan pubsub.Event[[]byte]) + eventCh := make(chan pubsub.Event[string, []byte]) ctx, cancel := context.WithCancel(context.Background()) @@ -123,7 +125,7 @@ func consumePartition( ctx context.Context, logger *slog.Logger, partitionConsumer sarama.PartitionConsumer, - eventCh chan<- pubsub.Event[[]byte], + eventCh chan<- pubsub.Event[string, []byte], ) { for { select { @@ -138,13 +140,13 @@ func consumePartition( } } - eventCh <- pubsub.Event[[]byte]{ + eventCh <- pubsub.Event[string, []byte]{ Type: typ, Payload: m.Value, } case err := <-partitionConsumer.Errors(): - eventCh <- pubsub.Event[[]byte]{ + eventCh <- pubsub.Event[string, []byte]{ Type: pubsub.EventTypeError, Error: err, } @@ -163,7 +165,7 @@ func consumePartition( } // C returns a receive-only go channel of events published. -func (s Subscription) C() <-chan pubsub.Event[[]byte] { +func (s Subscription) C() <-chan pubsub.Event[string, []byte] { return s.eventCh } diff --git a/pubsub/kafkasarama/kafkasarama_test.go b/pubsub/kafkasarama/kafkasarama_test.go index d3459ce..2c9a385 100644 --- a/pubsub/kafkasarama/kafkasarama_test.go +++ b/pubsub/kafkasarama/kafkasarama_test.go @@ -63,7 +63,7 @@ func TestPubSub(t *testing.T) { t.Cleanup(func() { is.NoErr(sub2.Close()) }) - mes := pubsub.Event[[]byte]{ + mes := pubsub.Event[string, []byte]{ Type: "test", Payload: []byte("test"), } @@ -202,7 +202,7 @@ func TestConsumerGroups(t *testing.T) { t.Cleanup(func() { is.NoErr(pub.Close()) }) - err = pub.Publish(pubsub.Event[[]byte]{ + err = pub.Publish(pubsub.Event[string, []byte]{ Type: "", Payload: []byte("brad"), }, topic) diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go b/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go index 75fd667..55b5e06 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go @@ -50,7 +50,7 @@ func newPublisher( } // Publish publishes an event to a kafka topic. -func (p *publisher) Publish(event pubsub.Event[[]byte], channels ...string) error { +func (p *publisher) Publish(event pubsub.Event[string, []byte], channels ...string) error { if len(channels) != 1 { return pubsub.ErrExactlyOneChannelAllowed } @@ -80,7 +80,7 @@ func (ks *kafkaServer) SendMessage(t *testing.T, topic, msg string) { i := is.New(t) i.Helper() - err := ks.publisher.Publish(pubsub.Event[[]byte]{ + err := ks.publisher.Publish(pubsub.Event[string, []byte]{ Type: "test", Payload: []byte(msg), }, topic) diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go b/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go index 1587831..3f06c5a 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go @@ -13,7 +13,7 @@ import ( "github.com/purposeinplay/go-commons/pubsub" ) -var _ pubsub.Subscriber[[]byte] = (*Subscriber)(nil) +var _ pubsub.Subscriber[string, []byte] = (*Subscriber)(nil) // NewConsumerGroup generates a new kafka consumer to be used by the subscriber, // allowing for dependency injection for testing with a Sarama mock. @@ -61,17 +61,17 @@ func NewSubscriber( } // Subscribe creates a new subscription that runs in the background. -func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], error) { +func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) { return newSubscription(s.logger, s.consumerGroup, channels) } -var _ pubsub.Subscription[[]byte] = (*Subscription)(nil) +var _ pubsub.Subscription[string, []byte] = (*Subscription)(nil) // Subscription represents a stream of events published to a kafka topic. type Subscription struct { logger *slog.Logger consumerGroup sarama.ConsumerGroup - eventCh chan pubsub.Event[[]byte] + eventCh chan pubsub.Event[string, []byte] cancelFunc context.CancelFunc wg *sync.WaitGroup ready chan bool @@ -82,7 +82,7 @@ func newSubscription( consumerGroup sarama.ConsumerGroup, topics []string, ) (*Subscription, error) { - eventCh := make(chan pubsub.Event[[]byte]) + eventCh := make(chan pubsub.Event[string, []byte]) ctx, cancel := context.WithCancel(context.Background()) @@ -124,7 +124,7 @@ func newSubscription( } // C returns a receive-only go channel of events published. -func (s *Subscription) C() <-chan pubsub.Event[[]byte] { +func (s *Subscription) C() <-chan pubsub.Event[string, []byte] { return s.eventCh } @@ -183,7 +183,7 @@ func (s *Subscription) ConsumeClaim( session.Commit() } - s.eventCh <- pubsub.Event[[]byte]{ + s.eventCh <- pubsub.Event[string, []byte]{ Type: typ, Payload: msg.Value, Ack: markFunc, diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 4f9f509..77fc829 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -9,30 +9,30 @@ package pubsub // Publisher is the interface that wraps the basic Publish method. -type Publisher[T any] interface { +type Publisher[T, P any] interface { // Publish publishes an event to specified channels. - Publish(event Event[T], channels ...string) error + Publish(event Event[T, P], channels ...string) error } // Subscriber is the interface that wraps the Subscribe method. -type Subscriber[T any] interface { +type Subscriber[T, P any] interface { // Subscribe creates a new subscription for the events published // in the specified channels. - Subscribe(channels ...string) (Subscription[T], error) + Subscribe(channels ...string) (Subscription[T, P], error) } // PublishSubscriber is the interface that groups the basic // Publish and Subscribe methods. -type PublishSubscriber[T any] interface { - Publisher[T] - Subscriber[T] +type PublishSubscriber[T, P any] interface { + Publisher[T, P] + Subscriber[T, P] } // Subscription represents a stream of events for a single user. -type Subscription[T any] interface { +type Subscription[T, P any] interface { // C represents an even stream for all events that are published // in the channels that of this Subscription. - C() <-chan Event[T] + C() <-chan Event[T, P] // Close disconnects the subscription, from the PubSub service. // It also closes the event stream channel, thus the subscription @@ -44,12 +44,12 @@ type Subscription[T any] interface { var EventTypeError = "error" // Event represents an event that occurs in the system. -type Event[T any] struct { +type Event[T, P any] struct { // Specifies the type of event that is occurring. - Type string `json:"type"` + Type T `json:"type"` // The actual data from the event. - Payload T `json:"payload"` + Payload P `json:"payload"` // Carries an error produced by the underlying subscriber. Error error