Skip to content

Commit

Permalink
fix: Quota fixes
Browse files Browse the repository at this point in the history
* Allow blacklisting of namespaces
* Optionally limited default_namespace
* Attach retry info to rate errors
* Fix Datadog connection
* Return throttling statistics in quota usage API
* Return non-retriable error if request hard limit exceeded
* Properly categorize observability, management and admin APIs
* Other bugfixes and refactoring
* Skip DropCollection, DropDatabase, Delete APIs from storage quota check
  • Loading branch information
efirs committed Oct 10, 2022
1 parent c5e3513 commit a061089
Show file tree
Hide file tree
Showing 22 changed files with 349 additions and 133 deletions.
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from 96cb7e to bd0c85
41 changes: 24 additions & 17 deletions api/server/v1/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,38 @@ import (
)

const (
methodPrefix = "/tigrisdata.v1.Tigris/"
HealthMethodName = "/HealthAPI/Health"

InsertMethodName = methodPrefix + "Insert"
ReplaceMethodName = methodPrefix + "Replace"
UpdateMethodName = methodPrefix + "Update"
DeleteMethodName = methodPrefix + "Delete"
ReadMethodName = methodPrefix + "Read"
apiMethodPrefix = "/tigrisdata.v1.Tigris/"

SearchMethodName = methodPrefix + "Search"
InsertMethodName = apiMethodPrefix + "Insert"
ReplaceMethodName = apiMethodPrefix + "Replace"
UpdateMethodName = apiMethodPrefix + "Update"
DeleteMethodName = apiMethodPrefix + "Delete"
ReadMethodName = apiMethodPrefix + "Read"

SubscribeMethodName = methodPrefix + "Subscribe"
SearchMethodName = apiMethodPrefix + "Search"

EventsMethodName = methodPrefix + "Events"
SubscribeMethodName = apiMethodPrefix + "Subscribe"

CommitTransactionMethodName = methodPrefix + "CommitTransaction"
RollbackTransactionMethodName = methodPrefix + "RollbackTransaction"
EventsMethodName = apiMethodPrefix + "Events"

CreateOrUpdateCollectionMethodName = methodPrefix + "CreateOrUpdateCollection"
DropCollectionMethodName = methodPrefix + "DropCollection"
CommitTransactionMethodName = apiMethodPrefix + "CommitTransaction"
RollbackTransactionMethodName = apiMethodPrefix + "RollbackTransaction"

ListDatabasesMethodName = methodPrefix + "ListDatabases"
ListCollectionsMethodName = methodPrefix + "ListCollections"
CreateOrUpdateCollectionMethodName = apiMethodPrefix + "CreateOrUpdateCollection"
DropCollectionMethodName = apiMethodPrefix + "DropCollection"

DescribeDatabaseMethodName = methodPrefix + "DescribeDatabase"
DescribeCollectionMethodName = methodPrefix + "DescribeCollection"
DropDatabaseMethodName = apiMethodPrefix + "DropDatabase"

ListDatabasesMethodName = apiMethodPrefix + "ListDatabases"
ListCollectionsMethodName = apiMethodPrefix + "ListCollections"

DescribeDatabaseMethodName = apiMethodPrefix + "DescribeDatabase"
DescribeCollectionMethodName = apiMethodPrefix + "DescribeCollection"

ObservabilityMethodPrefix = "/tigrisdata.observability.v1.Observability/"
ManagementMethodPrefix = "/tigrisdata.management.v1.Management/"
)

func IsTxSupported(ctx context.Context) bool {
Expand Down
2 changes: 1 addition & 1 deletion errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Unauthenticated(format string, args ...any) error {
}

// ResourceExhausted constructs too many requests error (HTTP: 429).
func ResourceExhausted(format string, args ...any) error {
func ResourceExhausted(format string, args ...any) *api.TigrisError {
return api.Errorf(api.Code_RESOURCE_EXHAUSTED, format, args...)
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/DataDog/datadog-api-client-go v1.16.0
github.com/DataDog/datadog-api-client-go/v2 v2.3.1
github.com/apple/foundationdb/bindings/go v0.0.0-20220521054011-a88e049b28d8
github.com/auth0/go-auth0 v0.9.3
github.com/auth0/go-jwt-middleware/v2 v2.0.1
Expand Down Expand Up @@ -37,6 +38,7 @@ require (
github.com/ugorji/go/codec v1.2.7
github.com/valyala/bytebufferpool v1.0.0
go.uber.org/atomic v1.9.0
golang.org/x/net v0.0.0-20220726230323-06994584191e
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b
google.golang.org/grpc v1.48.0
Expand Down Expand Up @@ -118,7 +120,6 @@ require (
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yudai/pp v2.0.1+incompatible // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/net v0.0.0-20220726230323-06994584191e // indirect
golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ github.com/DataDog/datadog-agent/pkg/obfuscate v0.38.0 h1:Mk1GqUitDrsZBvNIR4G/GF
github.com/DataDog/datadog-agent/pkg/obfuscate v0.38.0/go.mod h1:MxVcCIC42tBIjPm93BHdh9/vw2LivRiptj3HygI+GGQ=
github.com/DataDog/datadog-api-client-go v1.16.0 h1:5jOZv1m98criCvYTa3qpW8Hzv301nbZX3K9yJtwGyWY=
github.com/DataDog/datadog-api-client-go v1.16.0/go.mod h1:PgrP2ABuJWL3Auw2iEkemAJ/r72ghG4DQQmb5sgnKW4=
github.com/DataDog/datadog-api-client-go/v2 v2.3.1 h1:+0FHme5n4AuJEGmzaN8+n3OWKFLiJoBP+FNI60EqvuU=
github.com/DataDog/datadog-api-client-go/v2 v2.3.1/go.mod h1:98b/MtTwSAr/yhTfhCR1oxAqQ/4tMkdrgKH7fYiDA0g=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v4.8.2+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU=
Expand Down
24 changes: 22 additions & 2 deletions server/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ var DefaultConfig = Config{
Observability: ObservabilityConfig{
Enabled: false,
Provider: "datadog",
ProviderUrl: "https://us3.datadoghq.com",
ProviderUrl: "us3.datadoghq.com",
},
Management: ManagementConfig{
Enabled: true,
Expand All @@ -338,6 +338,14 @@ type LimitsConfig struct {
WriteUnits int `mapstructure:"write_units" yaml:"write_units" json:"write_units"`
}

func (l *LimitsConfig) Limit(isWrite bool) int {
if isWrite {
return l.WriteUnits
}

return l.ReadUnits
}

type NamespaceLimitsConfig struct {
Enabled bool
Default LimitsConfig // default per namespace limit
Expand All @@ -355,13 +363,25 @@ func (n *NamespaceLimitsConfig) NamespaceLimits(ns string) *LimitsConfig {
return &n.Default
}

type NamespaceStorageLimitsConfig struct {
Size int64
}

type StorageLimitsConfig struct {
Enabled bool
DataSizeLimit int64 `mapstructure:"data_size_limit" yaml:"data_size_limit" json:"data_size_limit"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" yaml:"refresh_interval" json:"refresh_interval"`

// Per namespace limits
Namespaces map[string]int64
Namespaces map[string]NamespaceStorageLimitsConfig
}

func (n *StorageLimitsConfig) NamespaceLimits(ns string) int64 {
cfg, ok := n.Namespaces[ns]
if ok {
return cfg.Size
}
return n.DataSizeLimit
}

type QuotaConfig struct {
Expand Down
15 changes: 10 additions & 5 deletions server/metrics/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (

type Datadog struct {
apiClient *datadog.APIClient
host map[string]string
}

func InitDatadog(cfg *config.Config) *Datadog {
Expand All @@ -50,11 +51,14 @@ func InitDatadog(cfg *config.Config) *Datadog {
c.AddDefaultHeader(dDAppKey, cfg.Observability.AppKey)

d.apiClient = datadog.NewAPIClient(c)
d.host = map[string]string{"site": cfg.Observability.ProviderUrl}

return &d
}

func (d *Datadog) Query(ctx context.Context, from int64, to int64, query string) (*datadog.MetricsQueryResponse, error) {
ctx = context.WithValue(ctx, datadog.ContextServerVariables, d.host)

resp, hResp, err := d.apiClient.MetricsApi.QueryMetrics(ctx, from, to, query)
if ulog.E(err) {
return nil, errors.Internal("Failed to query metrics: reason = " + err.Error())
Expand All @@ -73,7 +77,7 @@ func (d *Datadog) Query(ctx context.Context, from int64, to int64, query string)

if resp.HasError() {
log.Error().Msgf("Datadog response status code=%d", hResp.StatusCode)
return nil, api.Errorf(api.FromHttpCode(hResp.StatusCode), "Failed to get query metrics: reason = "+resp.GetError())
return nil, api.Errorf(api.Code_INTERNAL, "Failed to get query metrics: reason = "+resp.GetError())
}

return &resp, nil
Expand Down Expand Up @@ -182,8 +186,8 @@ func convertToDDAggregatorFunc(aggregator api.RollupAggregator) string {
}

func (d *Datadog) GetCurrentMetricValue(ctx context.Context, namespace string, metric string, tp api.TigrisOperation, avgLength time.Duration) (int64, error) {
from := time.Now()
to := time.Now().Add(-avgLength)
to := time.Now()
from := time.Now().Add(-avgLength)

rateQuery := &api.QueryTimeSeriesMetricsRequest{
TigrisOperation: tp,
Expand All @@ -210,12 +214,13 @@ func (d *Datadog) GetCurrentMetricValue(ctx context.Context, namespace string, m
return 0, err
}

if len(resp.GetSeries()) == 1 && len(resp.GetSeries()[0].Pointlist[0]) == 2 &&
if len(resp.GetSeries()) > 0 && len(resp.GetSeries()[0].Pointlist) > 0 &&
len(resp.GetSeries()[0].Pointlist[0]) > 1 &&
resp.GetSeries()[0].Pointlist[0][1] != nil {
return int64(*resp.GetSeries()[0].Pointlist[0][1]), nil
}

log.Debug().Interface("series", resp.GetSeries()).Msg("Unexpected series len")

return 0, errors.Internal("Broken remote metric response")
return 0, nil
}
2 changes: 1 addition & 1 deletion server/metrics/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ func UpdateQuotaCurrentNodeLimit(namespaceName string, value int, isWrite bool)
counter = "write_limit"
}

QuotaSet.Tagged(getQuotaUsageTags(namespaceName)).Counter(counter).Inc(int64(value))
QuotaSet.Tagged(getQuotaUsageTags(namespaceName)).Gauge(counter).Update(float64(value))
}
2 changes: 1 addition & 1 deletion server/middleware/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
headerAuthorize = "authorization"
UnknownNamespace = "unknown"
BypassAuthForTheseMethods = container.NewHashSet(
"/HealthAPI/Health",
api.HealthMethodName,
"/tigrisdata.auth.v1.Auth/GetAccessToken",
"/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
)
Expand Down
3 changes: 2 additions & 1 deletion server/middleware/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/server/metrics"
"github.com/tigrisdata/tigris/server/request"
"github.com/tigrisdata/tigris/util"
Expand All @@ -37,7 +38,7 @@ type wrappedStream struct {

func getNoMeasurementMethods() []string {
return []string{
"/HealthAPI/Health",
api.HealthMethodName,
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/middleware/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"context"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/lib/container"
"github.com/tigrisdata/tigris/server/request"
"google.golang.org/grpc"
)

var (
excludedMethods = container.NewHashSet(
"/HealthAPI/Health",
api.HealthMethodName,
"/tigrisdata.admin.v1.Admin/createNamespace",
"/tigrisdata.admin.v1.Admin/listNamespaces",
)
Expand Down
22 changes: 15 additions & 7 deletions server/middleware/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/server/quota"
"github.com/tigrisdata/tigris/server/request"
"google.golang.org/grpc"
Expand All @@ -32,22 +33,29 @@ type quotaStream struct {
func quotaUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ns, _ := request.GetNamespace(ctx)
if err := quota.Allow(ctx, ns, proto.Size(req.(proto.Message)), request.IsWrite(ctx)); err != nil {
return nil, err

if m := info.FullMethod; m != api.HealthMethodName && !request.IsAdminApi(m) {
if err := quota.Allow(ctx, ns, proto.Size(req.(proto.Message)), request.IsWrite(ctx)); err != nil {
return nil, err
}
}

return handler(ctx, req)
}
}

func quotaStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ns, _ := request.GetNamespace(stream.Context())
wrapped := &quotaStream{
WrappedServerStream: middleware.WrapServerStream(stream),
namespace: ns,
if m := info.FullMethod; m != api.HealthMethodName && !request.IsAdminApi(m) {
ns, _ := request.GetNamespace(stream.Context())
wrapped := &quotaStream{
WrappedServerStream: middleware.WrapServerStream(stream),
namespace: ns,
}
return handler(srv, wrapped)
}

return handler(srv, wrapped)
return handler(srv, stream)
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/quota/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ type Datadog struct {
}

func (d *Datadog) CurRates(ctx context.Context, namespace string) (int64, int64, error) {
r, err := d.Datadog.GetCurrentMetricValue(ctx, namespace, "tigris.quota.usage.read_units", api.TigrisOperation_ALL, RunningAverageLength)
r, err := d.Datadog.GetCurrentMetricValue(ctx, namespace, "tigris.quota_usage_read_units.count", api.TigrisOperation_ALL, RunningAverageLength)
if err != nil {
return 0, 0, err
}

w, err := d.Datadog.GetCurrentMetricValue(ctx, namespace, "tigris.quota.usage.write_units", api.TigrisOperation_ALL, RunningAverageLength)
w, err := d.Datadog.GetCurrentMetricValue(ctx, namespace, "tigris.quota_usage_write_units.count", api.TigrisOperation_ALL, RunningAverageLength)
if err != nil {
return 0, 0, err
}
Expand Down
8 changes: 4 additions & 4 deletions server/quota/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (l *Limiter) Allow(size int) (err error) {

if !rt.OK() || rt.Delay() > 0 {
if l.isWrite {
return ErrWriteUnitsExceeded
return ErrWriteUnitsExceeded.WithRetry(rt.Delay())
}
return ErrReadUnitsExceeded
return ErrReadUnitsExceeded.WithRetry(rt.Delay())
}

return nil
Expand Down Expand Up @@ -97,9 +97,9 @@ func (l *Limiter) Wait(ctx context.Context, size int) (err error) {

if !rt.OK() || dur < rt.DelayFrom(now) {
if l.isWrite {
return ErrWriteUnitsExceeded
return ErrWriteUnitsExceeded.WithRetry(rt.DelayFrom(now))
}
return ErrReadUnitsExceeded
return ErrReadUnitsExceeded.WithRetry(rt.DelayFrom(now))
}

delay := rt.DelayFrom(now)
Expand Down
Loading

0 comments on commit a061089

Please sign in to comment.