Skip to content

Commit

Permalink
fix client dead locks and racing condition, fix benchmark tests
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed May 18, 2018
1 parent e1162b0 commit bc5ecce
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.dll
*.so
*.dylib
*.PID

# Test binary, build with `go test -c`
*.test
Expand Down
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ e2e-test: ensure-testing-deps ensure-testing-bin

benchmark-test: ensure-testing-deps ensure-testing-bin
@echo "===============RUNNING BENCHMARK TESTS==============="
@echo "--- starting testing servers"
@./examples/testing/server -type game -frontend=false > /dev/null 2>&1 & echo $$! > back.PID
@./examples/testing/server -type connector -frontend=true > /dev/null 2>&1 & echo $$! > front.PID
@echo "--- sleeping for 5 seconds"
@sleep 5
@go test ./benchmark/benchmark_test.go -bench=.
@echo "--- killing testing servers"
@kill `cat back.PID` && rm back.PID
@kill `cat front.PID` && rm front.PID


unit-test-coverage: kill-testing-deps
@echo "===============RUNNING UNIT TESTS==============="
Expand Down
56 changes: 11 additions & 45 deletions benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"os"
"testing"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/topfreegames/pitaya/client"
"github.com/topfreegames/pitaya/helpers"
)

var clients []*client.Client
Expand All @@ -52,14 +50,11 @@ func TestMain(m *testing.M) {
}

func BenchmarkCreateManyClients(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()

b.ResetTimer()
for i := 0; i < b.N; i++ {
g := client.New(logrus.FatalLevel)
err := g.ConnectTo(fmt.Sprintf("%s:%d", "localhost", port))
err := g.ConnectTo(fmt.Sprintf("%s:%d", "localhost", 32222))
defer g.Disconnect()
if err != nil {
b.Logf("failed to connect")
Expand All @@ -70,10 +65,7 @@ func BenchmarkCreateManyClients(b *testing.B) {
}

func BenchmarkFrontHandlerWithSessionAndRawReturnsRaw(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -87,10 +79,7 @@ func BenchmarkFrontHandlerWithSessionAndRawReturnsRaw(b *testing.B) {
}

func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtr(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -105,10 +94,7 @@ func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtr(b *testing.B) {

func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtrManyClientsParallel(b *testing.B) {
numClients := 1000
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(numClients, port)
clients := getClients(numClients, 32222)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -124,10 +110,7 @@ func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtrManyClientsParallel(b *test
}

func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtrParallel(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()

Expand All @@ -144,10 +127,7 @@ func BenchmarkFrontHandlerWithSessionAndPtrReturnsPtrParallel(b *testing.B) {
}

func BenchmarkFrontHandlerWithSessionOnlyReturnsPtr(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -161,10 +141,7 @@ func BenchmarkFrontHandlerWithSessionOnlyReturnsPtr(b *testing.B) {
}

func BenchmarkFrontHandlerWithSessionOnlyReturnsPtrParallel(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -180,11 +157,7 @@ func BenchmarkFrontHandlerWithSessionOnlyReturnsPtrParallel(b *testing.B) {
}

func BenchmarkBackHandlerWithSessionOnlyReturnsPtr(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
defer helpers.StartServer(b, false, false, "game", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -193,16 +166,13 @@ func BenchmarkBackHandlerWithSessionOnlyReturnsPtr(b *testing.B) {
b.Logf("failed to send request to server")
b.FailNow()
}

<-clients[0].IncomingMsgChan
}
}

func BenchmarkBackHandlerWithSessionOnlyReturnsPtrParallel(b *testing.B) {
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
defer helpers.StartServer(b, false, false, "game", port, sdPrefix)()
clients := getClients(1, port)
clients := getClients(1, 32222)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -219,11 +189,7 @@ func BenchmarkBackHandlerWithSessionOnlyReturnsPtrParallel(b *testing.B) {

func BenchmarkBackHandlerWithSessionOnlyReturnsPtrParallelMultipleClients(b *testing.B) {
numClients := 100
port := helpers.GetFreePort(b)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(b, true, false, "connector", port, sdPrefix)()
defer helpers.StartServer(b, false, false, "game", port, sdPrefix)()
clients := getClients(numClients, port)
clients := getClients(numClients, 32222)

b.ResetTimer()

Expand Down
10 changes: 5 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (c *Client) handlePackets() {
delete(c.pendingRequests, m.ID)
<-c.pendingChan
} else {
c.pendingReqMutex.Unlock()
continue // do not process msg for already timedout request
}
c.pendingReqMutex.Unlock()
Expand Down Expand Up @@ -351,24 +352,23 @@ func (c *Client) buildPacket(msg message.Message) ([]byte, error) {

// sendMsg sends the request to the server
func (c *Client) sendMsg(msgType message.Type, route string, data []byte) error {
atomic.AddUint32(&c.nextID, 1)
// TODO mount msg and encode
m := message.Message{
Type: msgType,
ID: uint(c.nextID),
ID: uint(atomic.AddUint32(&c.nextID, 1)),
Route: route,
Data: data,
Err: false,
}
p, err := c.buildPacket(m)
if msgType == message.Request {
c.pendingChan <- true
c.pendingReqMutex.Lock()
if _, ok := c.pendingRequests[uint(c.nextID)]; !ok {
c.pendingRequests[uint(c.nextID)] = &pendingRequest{
if _, ok := c.pendingRequests[m.ID]; !ok {
c.pendingRequests[m.ID] = &pendingRequest{
msg: &m,
sentAt: time.Now(),
}
c.pendingChan <- true
}
c.pendingReqMutex.Unlock()
}
Expand Down

0 comments on commit bc5ecce

Please sign in to comment.