Skip to content

Commit

Permalink
Fix aws_msk_iam_v2 module Authentication failure bug
Browse files Browse the repository at this point in the history
Use `CredentialsProvider` instead of static `Credentials`. See segmentio#976
  • Loading branch information
kikyomits committed Sep 3, 2022
1 parent ba6f442 commit 54fe6c1
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 110 deletions.
47 changes: 1 addition & 46 deletions sasl/aws_msk_iam_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,4 @@ You can add this module to your dependency by running the command below.
go get github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2
```

You can use the `Mechanism` for SASL authentication, like below.

```go
package main

import (
"context"
"crypto/tls"
"time"

signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
ctx := context.Background()

// using aws-sdk-go-v2
// NOTE: address error properly

cfg, _ := awsCfg.LoadDefaultConfig(ctx)
creds, _ := cfg.Credentials.Retrieve(ctx)
m := &aws_msk_iam_v2.Mechanism{
Signer: signer.NewSigner(),
Credentials: creds,
Region: "us-east-1",
SignTime: time.Now(),
Expiry: time.Minute * 5,
}
config := kafka.ReaderConfig{
Brokers: []string{"https://localhost"},
GroupID: "some-consumer-group",
GroupTopics: []string{"some-topic"},
Dialer: &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: m,
TLS: &tls.Config{},
},
}
}


```
Please find the sample code in [example_test.go](./example_test.go), you can use the `Mechanism` for SASL authentication of `Reader` and `Writer`.
30 changes: 30 additions & 0 deletions sasl/aws_msk_iam_v2/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package aws_msk_iam_v2_test

import (
"context"
"crypto/tls"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
panic(err)
}
mechanism := aws_msk_iam_v2.NewMechanism(cfg)
_ = kafka.ReaderConfig{
Brokers: []string{"https://localhost"},
GroupID: "some-consumer-group",
GroupTopics: []string{"some-topic"},
Dialer: &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
}
}
9 changes: 5 additions & 4 deletions sasl/aws_msk_iam_v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2
go 1.15

require (
github.com/aws/aws-sdk-go-v2 v1.16.7
github.com/aws/aws-sdk-go-v2/credentials v1.12.9
github.com/segmentio/kafka-go v0.4.32
github.com/stretchr/testify v1.7.1
github.com/aws/aws-sdk-go-v2 v1.16.12
github.com/aws/aws-sdk-go-v2/config v1.17.2
github.com/aws/aws-sdk-go-v2/credentials v1.12.15
github.com/segmentio/kafka-go v0.4.34
github.com/stretchr/testify v1.8.0
)
88 changes: 54 additions & 34 deletions sasl/aws_msk_iam_v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,49 +1,69 @@
github.com/aws/aws-sdk-go-v2 v1.16.7 h1:zfBwXus3u14OszRxGcqCDS4MfMCv10e8SMJ2r8Xm0Ns=
github.com/aws/aws-sdk-go-v2 v1.16.7/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4eAf4RcEyuw=
github.com/aws/aws-sdk-go-v2/credentials v1.12.9 h1:DloAJr0/jbvm0iVRFDFh8GlWxrOd9XKyX82U+dfVeZs=
github.com/aws/aws-sdk-go-v2/credentials v1.12.9/go.mod h1:2Vavxl1qqQXJ8MUcQZTsIEW8cwenFCWYXtLRPba3L/o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.8/go.mod h1:oL1Q3KuCq1D4NykQnIvtRiBGLUXhcpY5pl6QZB2XEPU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.14/go.mod h1:kdjrMwHwrC3+FsKhNcCMJ7tUVj/8uSD5CZXeQ4wV6fM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.8/go.mod h1:ZIV8GYoC6WLBW5KGs+o4rsc65/ozd+eQ0L31XF5VDwk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.8/go.mod h1:rDVhIMAX9N2r8nWxDUlbubvvaFMnfsm+3jAV7q+rpM4=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.12/go.mod h1:MO4qguFjs3wPGcCSpQ7kOFTwRvb+eu+fn+1vKleGHUk=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.9/go.mod h1:O1IvkYxr+39hRf960Us6j0x1P8pDqhTX+oXM5kQNl/Y=
github.com/aws/smithy-go v1.12.0 h1:gXpeZel/jPoWQ7OEmLIgCUnhkFftqNfwWUwAHSlp1v0=
github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/aws-sdk-go-v2 v1.16.12 h1:wbMYa2PlFysFx2GLIQojr6FJV5+OWCM/BwyHXARxETA=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2/config v1.17.2 h1:V96WPd2a1H/MXGZjk4zto+KpYnwZI2kdIdy/cI8kYnQ=
github.com/aws/aws-sdk-go-v2/config v1.17.2/go.mod h1:jumS/AMwul4WaG8vyXsF6kUndG9zndR+yfYBwl4i9ds=
github.com/aws/aws-sdk-go-v2/credentials v1.12.15 h1:6DONxG9cR3pAuISj1Irh5u2SRqCfIJwyHNyDDes7SZw=
github.com/aws/aws-sdk-go-v2/credentials v1.12.15/go.mod h1:41zTC6U/78fUD7ZCa5NymTJANDjfqySg5YEAYVFl2Ic=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.13 h1:+uferi8SUDZtMloCDt24Zenyy/i71C/ua5mjUCpbpN0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.13/go.mod h1:y0eXmsNBFIVjUE8ZBjES8myOHlMsXDz7qGT93+MVdjk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.19 h1:gC5mudiFrWGhzcdoWj1iCGUfrzCpQG0MQIQf0CXFFQQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.19/go.mod h1:llxE6bwUZhuCas0K7qGiu5OgMis3N7kdWtFSxoHmJ7E=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.13 h1:qezY57na06d6kSE7uuB0N7XEflu914AXx/hg2L8Ykcw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.13/go.mod h1:lB12mkZqCSo5PsdBFLNqc2M/OOYgNAy8UtaktyuWvE8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.20 h1:GvszACAU8GSV3+Tant5GutW6smY8WavrP8ZuRS9Ku4Q=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.20/go.mod h1:bfTcsThj5a9P5pIGRy0QudJ8k4+issxXX+O6Djnd5Cs=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.13 h1:ObfthqDyhe7rMAOa7pqft6974VHIk8BAJB7kYdoIfTA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.13/go.mod h1:V390DK4MQxLpDdXxFqizyz8KUxuWImkW/xzgXMz0yyk=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.18 h1:gTn1a/FbcOXK5LQS88dD5k+PKwyjVvhAEEwyN4c6eW8=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.18/go.mod h1:ytmEi5+qwcSNcV2pVA8PIb1DnKT/0Bu/K4nfJHwoM6c=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.1 h1:p48IfndYbRk3iDsoQAmVXdCKEM5+7Y50JAPikjwk8gI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.1/go.mod h1:NY+G+8PW0ISyJ7/6t5mgOe6qpJiwZa9Jix05WPscJjg=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.14 h1:7kxso8VZLQ86Jg27QRBw6fjrQhQ8CMNMZ7SB0w7RQiA=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.14/go.mod h1:Y+BUV19q3OmQVqNUlbZ40zVi3NM6Biuxwkx/qdSD/CY=
github.com/aws/smithy-go v1.13.0 h1:YfyEmSJLo7fAv8FbuDK4R8F9aAmi9DZ88Zb/KJJmUl0=
github.com/aws/smithy-go v1.13.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM=
github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0=
github.com/segmentio/kafka-go v0.4.34 h1:Dm6YlLMiVSiwwav20KY0AoY63s661FXevwJ3CVHUERo=
github.com/segmentio/kafka-go v0.4.34/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60 h1:8NSylCMxLW4JvserAndSgFL7aPli6A68yf0bYFTcWCM=
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc=
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
50 changes: 32 additions & 18 deletions sasl/aws_msk_iam_v2/msk_iam.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"runtime"
Expand Down Expand Up @@ -32,15 +31,15 @@ const (
queryExpiryKey = "X-Amz-Expires"
)

var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version())
var signUserAgent = "kafka-go/sasl/aws_msk_iam_v2/" + runtime.Version()

// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation:
// https://github.com/aws/aws-msk-iam-auth
type Mechanism struct {
// The sigv4.Signer of aws-sdk-go-v2 to use when signing the request. Required.
Signer *signer.Signer
// The aws.Credentials of aws-sdk-go-v2. Required.
Credentials aws.Credentials
// The aws.Config.Credentials or config.CredentialsProvider of aws-sdk-go-v2. Required.
Credentials aws.CredentialsProvider
// The region where the msk cluster is hosted, e.g. "us-east-1". Required.
Region string
// The time the request is planned for. Optional, defaults to time.Now() at time of authentication.
Expand All @@ -62,19 +61,20 @@ func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, e

// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array,
// making use of the aws-sdk to produce the signed output.
// {
// "version" : "2020_10_22",
// "host" : "<broker host>",
// "user-agent": "<user agent string from the client>",
// "action": "kafka-cluster:Connect",
// "x-amz-algorithm" : "<algorithm>",
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
// "x-amz-security-token" : "<clientAWSSessionToken if any>",
// "x-amz-signedheaders" : "host",
// "x-amz-expires" : "<expiration in seconds>",
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
// }
//
// {
// "version" : "2020_10_22",
// "host" : "<broker host>",
// "user-agent": "<user agent string from the client>",
// "action": "kafka-cluster:Connect",
// "x-amz-algorithm" : "<algorithm>",
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
// "x-amz-security-token" : "<clientAWSSessionToken if any>",
// "x-amz-signedheaders" : "host",
// "x-amz-expires" : "<expiration in seconds>",
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
// }
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) {
signedMap, err := m.preSign(ctx)
if err != nil {
Expand All @@ -92,7 +92,12 @@ func (m *Mechanism) preSign(ctx context.Context) (map[string]string, error) {
return nil, err
}

signedUrl, header, err := m.Signer.PresignHTTP(ctx, m.Credentials, req, signPayload, signService, m.Region, defaultSignTime(m.SignTime))
creds, err := m.Credentials.Retrieve(ctx)
if err != nil {
return nil, err
}

signedUrl, header, err := m.Signer.PresignHTTP(ctx, creds, req, signPayload, signService, m.Region, defaultSignTime(m.SignTime))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,3 +169,12 @@ func defaultSignTime(v time.Time) time.Time {
}
return v
}

// NewMechanism provides
func NewMechanism(awsCfg aws.Config) *Mechanism {
return &Mechanism{
Signer: signer.NewSigner(),
Credentials: awsCfg.Credentials,
Region: awsCfg.Region,
}
}
23 changes: 15 additions & 8 deletions sasl/aws_msk_iam_v2/msk_iam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/segmentio/kafka-go/sasl"
"github.com/stretchr/testify/assert"

signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
credentialsv2 "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials"
)

const (
Expand All @@ -23,17 +24,12 @@ const (
var signTime = time.Date(2021, 10, 14, 13, 5, 0, 0, time.UTC)

func TestAwsMskIamMechanism(t *testing.T) {
creds, err := credentialsv2.NewStaticCredentialsProvider(accessKeyId, secretAccessKey, "").Retrieve(context.Background())
if err != nil {
t.Fatal(err)
}

creds := credentials.NewStaticCredentialsProvider(accessKeyId, secretAccessKey, "")
ctxWithMetadata := func() context.Context {
return sasl.WithMetadata(context.Background(), &sasl.Metadata{
Host: "localhost",
Port: 9092,
})

}

tests := []struct {
Expand Down Expand Up @@ -64,7 +60,6 @@ func TestAwsMskIamMechanism(t *testing.T) {
Region: "us-east-1",
SignTime: signTime,
}

sess, auth, err := mskMechanism.Start(ctx)
if tt.shouldFail { // if error is expected
if err == nil { // but we don't find one
Expand Down Expand Up @@ -154,3 +149,15 @@ func TestDefaultSignTime(t *testing.T) {
})
}
}

func TestNewMechanism(t *testing.T) {
region := "us-east-1"
creds := credentials.StaticCredentialsProvider{}
awsCfg := aws.Config{
Region: region,
Credentials: creds,
}
m := NewMechanism(awsCfg)
assert.Equal(t, m.Region, region)
assert.Equal(t, m.Credentials, creds)
}

0 comments on commit 54fe6c1

Please sign in to comment.