Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Exponential Retry Mechanism with Idempotency Headers #62

Merged
merged 1 commit into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down
49 changes: 46 additions & 3 deletions lib/novu.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-retryablehttp"
"github.com/pkg/errors"
)

Expand All @@ -17,9 +21,17 @@ const (
NovuVersion = "v1"
)

type RetryConfigType struct {
InitialDelay time.Duration // inital delay
WaitMin time.Duration // Minimum time to wait
WaitMax time.Duration // Maximum time to wait
RetryMax int // Maximum number of retries
}

type Config struct {
BackendURL *url.URL
HttpClient *http.Client
BackendURL *url.URL
HttpClient *http.Client
RetryConfig *RetryConfigType
}

type APIClient struct {
Expand Down Expand Up @@ -47,7 +59,37 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient {
cfg.BackendURL = buildBackendURL(cfg)

if cfg.HttpClient == nil {
cfg.HttpClient = &http.Client{Timeout: 20 * time.Second}
retyableClient := retryablehttp.NewClient()
if cfg.RetryConfig != nil {
retyableClient.RetryWaitMin = cfg.RetryConfig.WaitMin
retyableClient.RetryWaitMax = cfg.RetryConfig.WaitMax
retyableClient.RetryMax = cfg.RetryConfig.RetryMax
retyableClient.Backoff = func(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
if resp != nil {
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
if s, ok := resp.Header["Retry-After"]; ok {
if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil {
return time.Second * time.Duration(sleep)
}
}
}
}
if attemptNum == 0 {
return cfg.RetryConfig.InitialDelay //wait for InitialDelay on 1st retry
}
mult := math.Pow(2, float64(attemptNum)) * float64(min)
sleep := time.Duration(mult)
//float64(sleep) != mult is to make sure there is no conversion error
//if there is a conversion error, number is huge and we set the sleep to max
if float64(sleep) != mult || sleep > max {
sleep = max
}
return sleep
}
} else {
retyableClient.RetryMax = 0 //by default no retry
}
cfg.HttpClient = retyableClient.StandardClient()
}

c := &APIClient{apiKey: apiKey}
Expand All @@ -70,6 +112,7 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient {
func (c APIClient) sendRequest(req *http.Request, resp interface{}) (*http.Response, error) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
req.Header.Set("Idempotency-Key", uuid.New().String())

res, err := c.config.HttpClient.Do(req)
if err != nil {
Expand Down
168 changes: 168 additions & 0 deletions lib/novu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package lib_test

import (
"context"
"encoding/json"
"log"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"

"github.com/novuhq/go-novu/lib"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestError_Retry_With_Custom_Config(t *testing.T) {
var (
subscriberBulkPayload lib.SubscriberBulkPayload
receivedBody lib.SubscriberBulkPayload
expectedRequest lib.SubscriberBulkPayload
)
reqCount := 0
var idempotencyHeader []string
allElementsSame := func(arr []string) bool {
if len(arr) == 0 {
return true // An empty array is considered to have all elements the same.
}
firstElement := arr[0]
for _, element := range arr {
if element != firstElement {
return false
}
}
return true
}
subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil {
log.Printf("error in unmarshalling %+v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

reqCount++

t.Run("Header must contain Idempotency-Key", func(t *testing.T) {
idKey := req.Header.Get("Idempotency-Key")
idempotencyHeader = append(idempotencyHeader, idKey)
assert.NotNil(t, idKey)
})
t.Run("Header must contain ApiKey", func(t *testing.T) {
authKey := req.Header.Get("Authorization")
assert.True(t, strings.Contains(authKey, novuApiKey))
assert.True(t, strings.HasPrefix(authKey, "ApiKey"))
})

t.Run("URL and request method is as expected", func(t *testing.T) {
expectedURL := "/v1/subscribers/bulk"
assert.Equal(t, http.MethodPost, req.Method)
assert.Equal(t, expectedURL, req.RequestURI)
})

t.Run("Request is as expected", func(t *testing.T) {
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest)
assert.Equal(t, expectedRequest, receivedBody)
})

var resp lib.SubscriberResponse
fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp)

w.WriteHeader(http.StatusInternalServerError)
bb, _ := json.Marshal(resp)
w.Write(bb)
}))

defer subscriberService.Close()

ctx := context.Background()
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload)

c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL), RetryConfig: &lib.RetryConfigType{RetryMax: 5, InitialDelay: 0 * time.Second}})

resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload)
require.NotNil(t, err)
assert.NotNil(t, resp)

//idempotency and retry tests
assert.Equal(t, reqCount, 6)
assert.Equal(t, len(idempotencyHeader), 6)
assert.True(t, allElementsSame(idempotencyHeader))
}
func TestError_Retry_With_Default_Config(t *testing.T) {
var (
subscriberBulkPayload lib.SubscriberBulkPayload
receivedBody lib.SubscriberBulkPayload
expectedRequest lib.SubscriberBulkPayload
)
reqCount := 0
var idempotencyHeader []string
allElementsSame := func(arr []string) bool {
if len(arr) == 0 {
return true // An empty array is considered to have all elements the same.
}
firstElement := arr[0]
for _, element := range arr {
if element != firstElement {
return false
}
}
return true
}
subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil {
log.Printf("error in unmarshalling %+v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

reqCount++

t.Run("Header must contain Idempotency-Key", func(t *testing.T) {
idKey := req.Header.Get("Idempotency-Key")
idempotencyHeader = append(idempotencyHeader, idKey)
assert.NotNil(t, idKey)
})
t.Run("Header must contain ApiKey", func(t *testing.T) {
authKey := req.Header.Get("Authorization")
assert.True(t, strings.Contains(authKey, novuApiKey))
assert.True(t, strings.HasPrefix(authKey, "ApiKey"))
})

t.Run("URL and request method is as expected", func(t *testing.T) {
expectedURL := "/v1/subscribers/bulk"
assert.Equal(t, http.MethodPost, req.Method)
assert.Equal(t, expectedURL, req.RequestURI)
})

t.Run("Request is as expected", func(t *testing.T) {
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest)
assert.Equal(t, expectedRequest, receivedBody)
})

var resp lib.SubscriberResponse
fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp)

w.WriteHeader(http.StatusInternalServerError)
bb, _ := json.Marshal(resp)
w.Write(bb)
}))

defer subscriberService.Close()

ctx := context.Background()
fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload)

c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL)})

resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload)
require.NotNil(t, err)
assert.NotNil(t, resp)

//idempotency and retry tests
assert.Equal(t, reqCount, 1)
assert.True(t, allElementsSame(idempotencyHeader))
assert.Equal(t, len(idempotencyHeader), 1)
}
Loading