Skip to content

Commit

Permalink
Fanout optimization: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
valer-cara committed Jan 27, 2021
1 parent 34a19c0 commit 5b54009
Showing 1 changed file with 151 additions and 82 deletions.
233 changes: 151 additions & 82 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,115 +112,184 @@ func (s *V1Instance) Close() error {
// rate limit `Name` and `UniqueKey` is not owned by this instance then we forward the request to the
// peer that does.
func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) {
var resp GetRateLimitsResp
if len(r.Requests) > maxBatchSize {
requestCount := len(r.Requests)

if requestCount > maxBatchSize {
return nil, status.Errorf(codes.OutOfRange,
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}

resp := GetRateLimitsResp{
Responses: make([]*RateLimitResp, requestCount),

This comment has been minimized.

Copy link
@valer-cara

valer-cara Jan 27, 2021

Author Owner

pre-allocated slice, results are written directly in here at their corresponding index

}

type InOut struct {
In *RateLimitReq
Idx int
Out *RateLimitResp
}

// Asynchronously fetch rate limits
out := make(chan InOut)
//out := make(chan InOut)

//outSync := make(chan GetRateLimitsResp)

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
fan := syncutil.NewFanOut(1000)
defer wg.Done()
//fan := syncutil.NewFanOut(1000)
// For each item in the request body
for i, item := range r.Requests {
fan.Run(func(data interface{}) error {
inOut := data.(InOut)
if len(item.UniqueKey) == 0 {
resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
continue
}

globalKey := inOut.In.Name + "_" + inOut.In.UniqueKey
var peer *PeerClient
var err error
if len(item.Name) == 0 {
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
continue
}

if len(inOut.In.UniqueKey) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
out <- inOut
return nil
}
globalKey := item.Name + "_" + item.UniqueKey

if len(inOut.In.Name) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
out <- inOut
return nil
}

var attempts int
getPeer:
if attempts > 5 {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("GetPeer() keeps returning peers that are not connected for '%s' - '%s'", globalKey, err),
}
out <- inOut
return nil
peer, err := s.GetPeer(globalKey)
if err != nil {
resp.Responses[i] = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
}
continue
}

peer, err = s.GetPeer(globalKey)
if peer.Info().IsOwner {
resp.Responses[i], err = s.getRateLimit(item)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
resp.Responses[i] = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
}
out <- inOut
return nil
}

// If our server instance is the owner of this rate limit
if peer.Info().IsOwner {
// Apply our rate limit algorithm to the request
inOut.Out, err = s.getRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
}
}
} else {
if HasBehavior(inOut.In.Behavior, Behavior_GLOBAL) {
inOut.Out, err = s.getGlobalRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{Error: err.Error()}
}

// Inform the client of the owner key of the key
inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}

out <- inOut
return nil
}

// Make an RPC call to the peer that owns this rate limit
inOut.Out, err = peer.GetPeerRateLimit(ctx, inOut.In)
if err != nil {
if IsNotReady(err) {
attempts++
goto getPeer
}
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),
}
}

// Inform the client of the owner key of the key
inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
} else {
resp.Responses[i] = &RateLimitResp{
Error: "Peer Limits currently disabled while testing this",
Metadata: map[string]string{"owner": peer.Info().GRPCAddress},
}

out <- inOut
return nil
}, InOut{In: item, Idx: i})
// Make an RPC call to the peer that owns this rate limit
//for attempts := 0; attempts < 5; attempts++ {
// resp.Responses[i], err = peer.GetPeerRateLimit(ctx, item)
// if err != nil {
// if IsNotReady(err) {
// continue
// } else {
// resp.Responses[i] = &RateLimitResp{
// Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),
// }
// break
// }
// }
//}
//if err != nil {

//}

// Inform the client of the owner key of the key
//inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
}
}
fan.Wait()
close(out)

//for i, item := range r.Requests {
// fan.Run(func(data interface{}) error {
// inOut := data.(InOut)

// globalKey := inOut.In.Name + "_" + inOut.In.UniqueKey
// var peer *PeerClient
// var err error

// if len(inOut.In.UniqueKey) == 0 {
// inOut.Out = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
// out <- inOut
// return nil
// }

// if len(inOut.In.Name) == 0 {
// inOut.Out = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
// out <- inOut
// return nil
// }

// var attempts int
// getPeer:
// if attempts > 5 {
// inOut.Out = &RateLimitResp{
// Error: fmt.Sprintf("GetPeer() keeps returning peers that are not connected for '%s' - '%s'", globalKey, err),
// }
// out <- inOut
// return nil
// }

// peer, err = s.GetPeer(globalKey)
// if err != nil {
// inOut.Out = &RateLimitResp{
// Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
// }
// out <- inOut
// return nil
// }

// // If our server instance is the owner of this rate limit
// if peer.Info().IsOwner {
// // Apply our rate limit algorithm to the request
// inOut.Out, err = s.getRateLimit(inOut.In)
// if err != nil {
// inOut.Out = &RateLimitResp{
// Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
// }
// }
// } else {
// if HasBehavior(inOut.In.Behavior, Behavior_GLOBAL) {
// inOut.Out, err = s.getGlobalRateLimit(inOut.In)
// if err != nil {
// inOut.Out = &RateLimitResp{Error: err.Error()}
// }

// // Inform the client of the owner key of the key
// inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}

// out <- inOut
// return nil
// }

// // Make an RPC call to the peer that owns this rate limit
// inOut.Out, err = peer.GetPeerRateLimit(ctx, inOut.In)
// if err != nil {
// if IsNotReady(err) {
// attempts++
// goto getPeer
// }
// inOut.Out = &RateLimitResp{
// Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),
// }
// }

// // Inform the client of the owner key of the key
// inOut.Out.Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
// }

// out <- inOut
// return nil
// }, InOut{In: item, Idx: i})
//}
//fan.Wait()
//close(out)
}()

resp.Responses = make([]*RateLimitResp, len(r.Requests))
// Collect the async responses as they return
for i := range out {
resp.Responses[i.Idx] = i.Out
}
//resp.Responses = make([]*RateLimitResp, len(r.Requests))
//// Collect the async responses as they return
//for i := range out {
// resp.Responses[i.Idx] = i.Out
//}

wg.Wait()
return &resp, nil
}

Expand Down

0 comments on commit 5b54009

Please sign in to comment.