Skip to content

Commit

Permalink
feat: Enhance Initial Request Target Selection (#78)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashwin Vasani <ashwin.vasani@granica.ai>
  • Loading branch information
avasani committed Aug 21, 2023
1 parent 9d67bbb commit dbbe3c4
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 28 deletions.
17 changes: 12 additions & 5 deletions boltrouter/bolt_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (br *BoltRouter) GetCleanerStatus() (bool, error) {
return cleanerOnBool, nil
}

// SelectInitialRequestTarget based on cluster_health_metrics and client_behavior_params
func (br *BoltRouter) SelectInitialRequestTarget() (target string, reason string, err error) {
// select initial request destination based on cluster_health_metrics and client_behavior_params
func (br *BoltRouter) SelectInitialRequestTarget(boltReq *BoltRequest) (target string, reason string, err error) {
boltInfo := br.boltVars.BoltInfo.Get()

clusterHealthy := boltInfo["cluster_healthy"]
Expand Down Expand Up @@ -195,9 +195,16 @@ func (br *BoltRouter) SelectInitialRequestTarget() (target string, reason string
return "", "", fmt.Errorf("could not parse crunchTrafficPercent to int. %v", err)
}

// Randomly select bolt with crunchTrafficPercentInt % probability.
if rand.Intn(100) < crunchTrafficPercentInt {
return "bolt", "traffic splitting", nil
if br.config.CrunchTrafficSplit == CrunchTrafficSplitByObjectKeyHash {
// Take a mod of hashValue and check if it is less than crunchTrafficPercentInt
if int(boltReq.crcHash)%100 < crunchTrafficPercentInt {
return "bolt", "traffic splitting", nil
}
} else {
// Randomly select bolt with crunchTrafficPercentInt % probability.
if rand.Intn(100) < crunchTrafficPercentInt {
return "bolt", "traffic splitting", nil
}
}

return "s3", "traffic splitting", nil
Expand Down
27 changes: 21 additions & 6 deletions boltrouter/bolt_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package boltrouter

import (
"context"
"github.com/Pallinder/go-randomdata"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -72,6 +74,7 @@ func TestSelectInitialRequestTarget(t *testing.T) {
logger := zaptest.NewLogger(t)

testCases := []struct {
URL string
name string
cfg Config
clusterHealthy bool
Expand All @@ -80,10 +83,19 @@ func TestSelectInitialRequestTarget(t *testing.T) {
reason string
intelligentQS bool
}{
{name: "ClusterUnhealthy", cfg: Config{Local: false}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "70"}, expected: "s3", reason: "cluster unhealthy", intelligentQS: true},
{name: "ClusterHealthyCrunchTrafficZeroPercent", cfg: Config{Local: false}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "0"}, expected: "s3", reason: "traffic splitting", intelligentQS: true},
{name: "ClusterHealthyCrunchTrafficHundredPercent", cfg: Config{Local: false}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "100"}, expected: "bolt", reason: "traffic splitting", intelligentQS: true},
{name: "BackwardsCompat", cfg: Config{Local: false}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{}, expected: "bolt", reason: "backwards compatibility", intelligentQS: false},
// Check with CrunchTrafficSplitByObjectKeyHash
{URL: "pqr.txt", name: "ClusterUnhealthy", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "70"}, expected: "s3", reason: "cluster unhealthy", intelligentQS: true},
{URL: "pqr.txt", name: "ClusterHealthyCrunchTrafficZeroPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "0"}, expected: "s3", reason: "traffic splitting", intelligentQS: true},
{URL: "pqr.txt", name: "ClusterHealthyCrunchTrafficHundredPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "100"}, expected: "bolt", reason: "traffic splitting", intelligentQS: true},
{URL: "pqr.txt", name: "BackwardsCompat", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{}, expected: "bolt", reason: "backwards compatibility", intelligentQS: false},
{URL: "xyz123.txt", name: "ClusterHealthyCrunchTrafficFiftyPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "50"}, expected: "s3", reason: "traffic splitting", intelligentQS: true},
{URL: "abc123.txt", name: "ClusterHealthyCrunchTrafficFiftyPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "50"}, expected: "bolt", reason: "traffic splitting", intelligentQS: true},
{URL: "abc/abc123.txt", name: "ClusterHealthyCrunchTrafficFiftyPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "50"}, expected: "s3", reason: "traffic splitting", intelligentQS: true},
// Check with CrunchTrafficSplitByRandomRequest
{URL: "pqr.txt", name: "ClusterUnhealthy", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByRandomRequest}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "70"}, expected: "s3", reason: "cluster unhealthy", intelligentQS: true},
{URL: "pqr.txt", name: "ClusterHealthyCrunchTrafficZeroPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByRandomRequest}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "0"}, expected: "s3", reason: "traffic splitting", intelligentQS: true},
{URL: "pqr.txt", name: "ClusterHealthyCrunchTrafficHundredPercent", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByRandomRequest}, clusterHealthy: true, clientBehaviorParams: map[string]interface{}{"crunch_traffic_percent": "100"}, expected: "bolt", reason: "traffic splitting", intelligentQS: true},
{URL: "pqr.txt", name: "BackwardsCompat", cfg: Config{Local: false, CrunchTrafficSplit: CrunchTrafficSplitByRandomRequest}, clusterHealthy: false, clientBehaviorParams: map[string]interface{}{}, expected: "bolt", reason: "backwards compatibility", intelligentQS: false},
}

for _, tt := range testCases {
Expand All @@ -93,8 +105,11 @@ func TestSelectInitialRequestTarget(t *testing.T) {
require.NoError(t, err)
err = br.RefreshBoltInfo(ctx)
require.NoError(t, err)

target, reason, err := br.SelectInitialRequestTarget()
body := strings.NewReader(randomdata.Paragraph())
req, err := http.NewRequest(http.MethodGet, tt.URL, body)
req.Header.Set("Authorization", "AWS4-HMAC-SHA256 Credential=AKIA3Y7DLM2EYWSYCN5P/20230511/us-west-2/s3/aws4_request, SignedHeaders=accept-encoding;amz-sdk-invocation-id;amz-sdk-request;host;x-amz-content-sha256;x-amz-date, Signature=6447287d46d333a010e224191d64c31b9738cc37886aadb7753a0a579a30edc6")
boltReq, err := br.NewBoltRequest(ctx, logger, req)
target, reason, err := br.SelectInitialRequestTarget(boltReq)
require.NoError(t, err)
require.Equal(t, tt.expected, target)
require.Equal(t, tt.reason, reason)
Expand Down
27 changes: 20 additions & 7 deletions boltrouter/bolt_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -14,13 +16,14 @@ import (
)

type BoltRequest struct {
Bolt *http.Request
Aws *http.Request
Bolt *http.Request
Aws *http.Request
crcHash uint32
}

type BoltRequestAnalytics struct {
ObjectKey string
RequestBodySize int
RequestBodySize uint32
Method string
InitialRequestTarget string
InitialRequestTargetReason string
Expand Down Expand Up @@ -60,8 +63,17 @@ func (br *BoltRouter) NewBoltRequest(ctx context.Context, logger *zap.Logger, re
if err != nil {
return nil, fmt.Errorf("could not make signed aws head request: %w", err)
}
bucketAndObjPath := ""
if sourceBucket.Bucket == "n-auth-dummy" {
// Special Case to handle dummy bucket
bucketAndObjPath, _ = url.JoinPath(sourceBucket.Bucket, req.URL.Path)
} else {
bucketAndObjPath = req.URL.Path
}

crcHash := crc32.ChecksumIEEE([]byte(bucketAndObjPath))
BoltURL, err := br.SelectBoltEndpoint(req.Method)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,8 +121,9 @@ func (br *BoltRouter) NewBoltRequest(ctx context.Context, logger *zap.Logger, re
}

return &BoltRequest{
Bolt: req.Clone(ctx),
Aws: awsRequest.Clone(ctx),
Bolt: req.Clone(ctx),
Aws: awsRequest.Clone(ctx),
crcHash: crcHash,
}, nil
}

Expand Down Expand Up @@ -183,11 +196,11 @@ func newFailoverAwsRequest(ctx context.Context, req *http.Request, awsCred aws.C
// DoboltRequest will return a bool indicating if the request was a failover.
// DoRequest will return a BoltRequestAnalytics struct with analytics about the request.
func (br *BoltRouter) DoRequest(logger *zap.Logger, boltReq *BoltRequest) (*http.Response, bool, *BoltRequestAnalytics, error) {
initialRequestTarget, reason, err := br.SelectInitialRequestTarget()
initialRequestTarget, reason, err := br.SelectInitialRequestTarget(boltReq)

boltRequestAnalytics := &BoltRequestAnalytics{
ObjectKey: boltReq.Bolt.URL.Path,
RequestBodySize: int(boltReq.Bolt.ContentLength),
RequestBodySize: uint32(boltReq.Bolt.ContentLength),
Method: boltReq.Bolt.Method,
InitialRequestTarget: initialRequestTarget,
InitialRequestTargetReason: reason,
Expand Down
17 changes: 9 additions & 8 deletions boltrouter/bolt_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func TestBoltRequestFailover(t *testing.T) {
allReadEndpoints := append(mainReadEndpoints, failoverReadEndpoints...)
// register error responders for all read endpoints
for _, endpoint := range allReadEndpoints {
httpmock.RegisterResponder("GET", fmt.Sprintf("https://%s/test.projectn.co", endpoint),
httpmock.RegisterResponder("GET", fmt.Sprintf("https://%s/test.granica.ai123456", endpoint),
func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponse(500, "SERVER ERROR"), fmt.Errorf("s3 error")
})
}

httpmock.RegisterResponder("GET", "https://bolt.s3.us-west-2.amazonaws.com/test.projectn.co",
httpmock.RegisterResponder("GET", "https://bolt.s3.us-west-2.amazonaws.com/test.granica.ai123456",

func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponse(200, "OK"), nil
})
Expand All @@ -97,15 +98,15 @@ func TestBoltRequestFailover(t *testing.T) {
overrideCrunchTrafficPct(br, "100")

body := strings.NewReader(randomdata.Paragraph())
req, err := http.NewRequest(http.MethodGet, "test.projectn.co", body)
req, err := http.NewRequest(http.MethodGet, "test.granica.ai123456", body)
req.Header.Set("Authorization", "AWS4-HMAC-SHA256 Credential=AKIA3Y7DLM2EYWSYCN5P/20230511/us-west-2/s3/aws4_request, SignedHeaders=accept-encoding;amz-sdk-invocation-id;amz-sdk-request;host;x-amz-content-sha256;x-amz-date, Signature=6447287d46d333a010e224191d64c31b9738cc37886aadb7753a0a579a30edc6")
require.NoError(t, err)

boltReq, err := br.NewBoltRequest(ctx, logger, req)
require.NoError(t, err)
require.NotNil(t, boltReq)

_, failover, _, err := br.DoRequest(logger, boltReq)

require.Error(t, err, failover)
require.False(t, failover, err)

Expand All @@ -129,13 +130,14 @@ func TestBoltRequestPanic(t *testing.T) {
allReadEndpoints := append(mainReadEndpoints, failoverReadEndpoints...)
// register panic responders for all read endpoints
for _, endpoint := range allReadEndpoints {
httpmock.RegisterResponder("GET", fmt.Sprintf("https://%s/test.projectn.co", endpoint),
httpmock.RegisterResponder("GET", fmt.Sprintf("https://%s/test.granica.ai123456", endpoint),
func(req *http.Request) (*http.Response, error) {
panic("Simulated panic during request")
})
}

httpmock.RegisterResponder("GET", "https://bolt.s3.us-west-2.amazonaws.com/test.granica.ai123456",

httpmock.RegisterResponder("GET", "https://bolt.s3.us-west-2.amazonaws.com/test.projectn.co",
func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponse(200, "OK"), nil
})
Expand All @@ -151,10 +153,9 @@ func TestBoltRequestPanic(t *testing.T) {
overrideCrunchTrafficPct(br, "100")

body := strings.NewReader(randomdata.Paragraph())
req, err := http.NewRequest(http.MethodGet, "test.projectn.co", body)
req, err := http.NewRequest(http.MethodGet, "test.granica.ai123456", body)
req.Header.Set("Authorization", "AWS4-HMAC-SHA256 Credential=AKIA3Y7DLM2EYWSYCN5P/20230511/us-west-2/s3/aws4_request, SignedHeaders=accept-encoding;amz-sdk-invocation-id;amz-sdk-request;host;x-amz-content-sha256;x-amz-date, Signature=6447287d46d333a010e224191d64c31b9738cc37886aadb7753a0a579a30edc6")
require.NoError(t, err)

boltReq, err := br.NewBoltRequest(ctx, logger, req)
require.NoError(t, err)
require.NotNil(t, boltReq)
Expand Down
3 changes: 1 addition & 2 deletions boltrouter/bolt_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ type BoltRouter struct {
boltHttpClient *http.Client
standardHttpClient *http.Client
boltVars *BoltVars

logger *zap.Logger
logger *zap.Logger
}

// NewBoltRouter creates a new BoltRouter.
Expand Down
14 changes: 14 additions & 0 deletions boltrouter/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package boltrouter

type CrunchTrafficSplitType string

const (
CrunchTrafficSplitByObjectKeyHash CrunchTrafficSplitType = "objectkeyhash"
CrunchTrafficSplitByRandomRequest CrunchTrafficSplitType = "random"
)

type Config struct {
// If set, boltrouter will run in local mode.
// For example, it will not query quicksilver to get endpoints.
Expand All @@ -13,11 +20,18 @@ type Config struct {

// Enable failover to a AWS request if the Bolt request fails or vice-versa.
Failover bool `yaml:"Failover"`

// There are two ways to split the traffic between bolt and object store
// 1. Random Crunch Traffic Split
// 2. Hash Based Crunch Traffic Split
// Random approach could cause data inconsistency if the requests are mix of GET and PUT.
CrunchTrafficSplit CrunchTrafficSplitType `yaml:"CrunchTrafficSplit"`
}

var DefaultConfig = Config{
Local: false,
Passthrough: false,
Failover: true,
BoltEndpointOverride: "",
CrunchTrafficSplit: CrunchTrafficSplitByObjectKeyHash,
}
5 changes: 5 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func initServerFlags(cmd *cobra.Command) {
cmd.Flags().String("bolt-endpoint-override", "", "Specify the local bolt endpoint with port to override in local mode. e.g: <local-bolt-ip>:9000")
cmd.Flags().Bool("passthrough", false, "Set passthrough flag to bolt requests.")
cmd.Flags().BoolP("failover", "f", true, "Enables aws request failover if bolt request fails.")
cmd.Flags().String("crunch-traffic-split", "objectkeyhash", "Specify the crunch traffic split strategy: random or objectkeyhash")
}

var serveCmd = &cobra.Command{
Expand Down Expand Up @@ -86,5 +87,9 @@ func getBoltRouterConfig(cmd *cobra.Command) boltrouter.Config {
if cmd.Flags().Lookup("failover").Changed {
boltRouterConfig.Failover, _ = cmd.Flags().GetBool("failover")
}
if cmd.Flags().Lookup("crunch-traffic-split").Changed {
crunchTrafficSplitStr, _ := cmd.Flags().GetString("crunch-traffic-split")
boltRouterConfig.CrunchTrafficSplit = boltrouter.CrunchTrafficSplitType(crunchTrafficSplitStr)
}
return boltRouterConfig
}

0 comments on commit dbbe3c4

Please sign in to comment.