Skip to content

Commit

Permalink
acceptor wrapper and rate limiting implementation (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed May 29, 2019
1 parent 3064590 commit d1c207f
Show file tree
Hide file tree
Showing 18 changed files with 776 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Makefile
Expand Up @@ -49,6 +49,9 @@ run-cluster-worker-example-worker:
run-custom-metrics-example:
@cd examples/demo/custom_metrics && go run main.go --port 3250

run-rate-limiting-example:
@go run examples/demo/rate_limiting/main.go

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down Expand Up @@ -139,3 +142,6 @@ test-coverage-func coverage-func: test-coverage merge-profiles

serializer-mock:
@mockgen github.com/topfreegames/pitaya/serialize Serializer | sed 's/mock_serialize/mocks/' > serialize/mocks/serializer.go

acceptor-mock:
@mockgen github.com/topfreegames/pitaya/acceptor Acceptor | sed 's/mock_acceptor/mocks/' > mocks/acceptor.go
69 changes: 69 additions & 0 deletions acceptorwrapper/base.go
@@ -0,0 +1,69 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"net"

"github.com/topfreegames/pitaya/acceptor"
)

// BaseWrapper implements Wrapper by saving the acceptor as an attribute.
// Conns from acceptor.GetConnChan are processed by wrapConn and
// forwarded to its own connChan.
// Any new wrapper can inherit from BaseWrapper and just implement wrapConn.
type BaseWrapper struct {
acceptor.Acceptor
connChan chan net.Conn
wrapConn func(net.Conn) net.Conn
}

// NewBaseWrapper returns an instance of BaseWrapper.
func NewBaseWrapper(wrapConn func(net.Conn) net.Conn) BaseWrapper {
return BaseWrapper{
connChan: make(chan net.Conn),
wrapConn: wrapConn,
}
}

// Wrap saves acceptor as an attribute
func (b *BaseWrapper) Wrap(a acceptor.Acceptor) acceptor.Acceptor {
b.Acceptor = a
return b
}

// ListenAndServe starts a goroutine that wraps acceptor's conn
// and calls acceptor's listenAndServe
func (b *BaseWrapper) ListenAndServe() {
go b.pipe()
b.Acceptor.ListenAndServe()
}

// GetConnChan returns the wrapper conn chan
func (b *BaseWrapper) GetConnChan() chan net.Conn {
return b.connChan
}

func (b *BaseWrapper) pipe() {
for conn := range b.Acceptor.GetConnChan() {
b.connChan <- b.wrapConn(conn)
}
}
74 changes: 74 additions & 0 deletions acceptorwrapper/base_test.go
@@ -0,0 +1,74 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"net"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/mocks"
)

func TestListenAndServe(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockAcceptor := mocks.NewMockAcceptor(ctrl)
mockConn := mocks.NewMockConn(ctrl)

conns := make(chan net.Conn)
exit := make(chan struct{})
reads := 3
go func() {
for i := 0; i < reads; i++ {
mockConn.EXPECT().Read([]byte{})
conns <- mockConn
}
}()

mockAcceptor.EXPECT().GetConnChan().Return(conns)
wrapper := &BaseWrapper{
Acceptor: mockAcceptor,
connChan: make(chan net.Conn),
wrapConn: func(c net.Conn) net.Conn {
_, err := c.Read([]byte{})
assert.NoError(t, err)
return c
},
}

go func() {
i := 0
for range wrapper.GetConnChan() {
i++
if i == reads {
close(exit)
}
}
}()

mockAcceptor.EXPECT().ListenAndServe().Do(func() { <-exit })
wrapper.ListenAndServe()
}
108 changes: 108 additions & 0 deletions acceptorwrapper/rate_limiter.go
@@ -0,0 +1,108 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"container/list"
"net"
"time"

"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/metrics"
)

// RateLimiter wraps net.Conn by applying rate limiting and return empty
// if exceeded. It uses the leaky bucket
// algorithm (https://en.wikipedia.org/wiki/Leaky_bucket).
// Here, "limit" is the number of requests it accepts during an "interval" duration.
// After making a request, a slot is occupied and only freed after "interval"
// duration. If a new request comes when no slots are available, the buffer from
// Read is droped and ignored by pitaya.
// On the client side, this will yield a timeout error and the client must
// be prepared to handle it.
type RateLimiter struct {
net.Conn
limit int
interval time.Duration
times list.List
forceDisable bool
}

// NewRateLimiter returns an initialized *RateLimiting
func NewRateLimiter(
conn net.Conn,
limit int,
interval time.Duration,
forceDisable bool,
) *RateLimiter {
r := &RateLimiter{
Conn: conn,
limit: limit,
interval: interval,
forceDisable: forceDisable,
}

r.times.Init()

return r
}

func (r *RateLimiter) Read(b []byte) (n int, err error) {
if r.forceDisable {
return r.Conn.Read(b)
}

for {
n, err = r.Conn.Read(b)
if err != nil {
return n, err
}

now := time.Now()
if r.shouldRateLimit(now) {
logger.Log.Errorf("Data=%s, Error=%s", b, constants.ErrRateLimitExceeded)
metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
continue
}

return n, err
}
}

// shouldRateLimit saves the now as time taken or returns an error if
// in the limit of rate limiting
func (r *RateLimiter) shouldRateLimit(now time.Time) bool {
if r.times.Len() < r.limit {
r.times.PushBack(now)
return false
}

front := r.times.Front()
if diff := now.Sub(front.Value.(time.Time)); diff < r.interval {
return true
}

front.Value = now
r.times.MoveToBack(front)
return false
}

0 comments on commit d1c207f

Please sign in to comment.