Skip to content

Commit

Permalink
Merge 8b60c25 into 35d3a49
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed May 24, 2019
2 parents 35d3a49 + 8b60c25 commit 2280bc7
Show file tree
Hide file tree
Showing 18 changed files with 704 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ run-cluster-worker-example-worker:
run-custom-metrics-example:
@cd examples/demo/custom_metrics && go run main.go --port 3250

run-rate-limiting-example:
@go run examples/demo/rate_limiting/main.go

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down Expand Up @@ -139,3 +142,6 @@ test-coverage-func coverage-func: test-coverage merge-profiles

serializer-mock:
@mockgen github.com/topfreegames/pitaya/serialize Serializer | sed 's/mock_serialize/mocks/' > serialize/mocks/serializer.go

acceptor-mock:
@mockgen github.com/topfreegames/pitaya/acceptor Acceptor | sed 's/mock_acceptor/mocks/' > mocks/acceptor.go
69 changes: 69 additions & 0 deletions acceptorwrapper/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"net"

"github.com/topfreegames/pitaya/acceptor"
)

// BaseWrapper implements Wrapper by saving the acceptor as an attribute.
// Conns from acceptor.GetConnChan are processed by wrapConn and
// forwarded to its own connChan.
// Any new wrapper can inherit from BaseWrapper and just implement wrapConn.
type BaseWrapper struct {
acceptor.Acceptor
connChan chan net.Conn
wrapConn func(net.Conn) net.Conn
}

// NewBaseWrapper returns an instance of BaseWrapper.
func NewBaseWrapper(wrapConn func(net.Conn) net.Conn) BaseWrapper {
return BaseWrapper{
connChan: make(chan net.Conn),
wrapConn: wrapConn,
}
}

// Wrap saves acceptor as an attribute
func (b *BaseWrapper) Wrap(a acceptor.Acceptor) acceptor.Acceptor {
b.Acceptor = a
return b
}

// ListenAndServe starts a goroutine that wraps acceptor's conn
// and calls acceptor's listenAndServe
func (b *BaseWrapper) ListenAndServe() {
go b.pipe()
b.Acceptor.ListenAndServe()
}

// GetConnChan returns the wrapper conn chan
func (b *BaseWrapper) GetConnChan() chan net.Conn {
return b.connChan
}

func (b *BaseWrapper) pipe() {
for conn := range b.Acceptor.GetConnChan() {
b.connChan <- b.wrapConn(conn)
}
}
52 changes: 52 additions & 0 deletions acceptorwrapper/base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package acceptorwrapper

import (
"net"
"testing"

"github.com/golang/mock/gomock"
"github.com/topfreegames/pitaya/mocks"
)

func TestListenAndServe(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockAcceptor := mocks.NewMockAcceptor(ctrl)
mockConn := mocks.NewMockConn(ctrl)

conns := make(chan net.Conn)
exit := make(chan struct{})
reads := 3
go func() {
for i := 0; i < reads; i++ {
mockConn.EXPECT().Read([]byte{})
conns <- mockConn
}
}()

mockAcceptor.EXPECT().GetConnChan().Return(conns)
wrapper := &BaseWrapper{
Acceptor: mockAcceptor,
connChan: make(chan net.Conn),
wrapConn: func(c net.Conn) net.Conn {
c.Read([]byte{})
return c
},
}

go func() {
i := 0
for range wrapper.GetConnChan() {
i++
if i == reads {
close(exit)
}
}
}()

mockAcceptor.EXPECT().ListenAndServe().Do(func() { <-exit })
wrapper.ListenAndServe()
}
50 changes: 50 additions & 0 deletions acceptorwrapper/rate_limiting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"net"

"github.com/topfreegames/pitaya/config"
)

// RateLimitingWrapper rate limits for each connection
// received
type RateLimitingWrapper struct {
BaseWrapper
}

// NewRateLimitingWrapper returns an instance of *RateLimitingWrapper
func NewRateLimitingWrapper(c *config.Config) *RateLimitingWrapper {
r := &RateLimitingWrapper{}

r.BaseWrapper = NewBaseWrapper(func(conn net.Conn) net.Conn {
var (
limit = c.GetInt("pitaya.conn.ratelimiting.limit")
interval = c.GetDuration("pitaya.conn.ratelimiting.interval")
forceDisable = c.GetBool("pitaya.conn.ratelimiting.forcedisable")
)

return NewRateLimitingConn(conn, limit, interval, forceDisable)
})

return r
}
100 changes: 100 additions & 0 deletions acceptorwrapper/rate_limiting_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package acceptorwrapper

import (
"container/list"
"net"
"time"

"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/metrics"
)

// RateLimitingConn wraps net.Conn by applying rate limiting
// and return empty if exceeded
type RateLimitingConn struct {
net.Conn
limit int
interval time.Duration
times list.List
forceDisable bool
}

// NewRateLimitingConn returns an initialized *RateLimiting
func NewRateLimitingConn(
conn net.Conn,
limit int,
interval time.Duration,
forceDisable bool,
) *RateLimitingConn {
r := &RateLimitingConn{
Conn: conn,
limit: limit,
interval: interval,
forceDisable: forceDisable,
}

r.times.Init()

return r
}

func (r *RateLimitingConn) Read(b []byte) (n int, err error) {
if r.forceDisable {
return r.Conn.Read(b)
}

n, err = r.Conn.Read(b)
if err != nil {
return n, err
}

now := time.Now()
err = r.take(now)
if err != nil {
logger.Log.Errorf("Data=%s, Error=%s", b, err)
metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
return 0, nil
}

return n, err
}

// take saves the now as time taken or returns an error if
// in the limit of rate limiting
func (r *RateLimitingConn) take(now time.Time) error {
if r.times.Len() < r.limit {
r.times.PushBack(now)
return nil
}

front := r.times.Front()
if diff := now.Sub(front.Value.(time.Time)); diff < r.interval {
return constants.ErrExceededRateLimiting
}

front.Value = now
r.times.MoveToBack(front)
return nil
}
Loading

0 comments on commit 2280bc7

Please sign in to comment.