Skip to content

Commit

Permalink
Merge f692c0e into 63b0e63
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jun 27, 2018
2 parents 63b0e63 + f692c0e commit 9821476
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 101 deletions.
57 changes: 0 additions & 57 deletions server/client_test.go
Expand Up @@ -1021,60 +1021,3 @@ func TestQueueAutoUnsubscribe(t *testing.T) {
t.Fatalf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'\n",
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
}

func TestAvoidSlowConsumerBigMessages(t *testing.T) {
opts := DefaultOptions() // Use defaults to make sure they avoid pending slow consumer.
s := RunServer(opts)
defer s.Shutdown()

nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()

nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

data := make([]byte, 1024*1024) // 1MB payload
rand.Read(data)

expected := int32(1000)
received := int32(0)

done := make(chan bool)

// Create Subscription.
nc1.Subscribe("slow.consumer", func(m *nats.Msg) {
// Just eat it so that we are not measuring
// code time, just delivery.
atomic.AddInt32(&received, 1)
if received >= expected {
done <- true
}
})

// Create Error handler
nc1.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
t.Fatalf("Received an error on the subscription's connection: %v\n", err)
})

for i := int32(0); i < expected; i++ {
nc2.Publish("slow.consumer", data)
}
nc2.Flush()

select {
case <-done:
return
case <-time.After(10 * time.Second):
r := atomic.LoadInt32(&received)
if s.NumSlowConsumers() > 0 {
t.Fatalf("Did not receive all large messages due to slow consumer status: %d of %d", r, expected)
}
t.Fatalf("Failed to receive all large messages: %d of %d\n", r, expected)
}
}
4 changes: 4 additions & 0 deletions server/closed_conns_test.go
Expand Up @@ -81,6 +81,10 @@ func TestClosedConnsAccounting(t *testing.T) {
t.Fatalf("Error on connect: %v", err)
}
nc.Close()
// FIXME: For now just sleep a bit to ensure that closed connections
// are added in the expected order for tests down below where we
// check for cid.
time.Sleep(15 * time.Millisecond)
}

if !closedConnsEqual(s, opts.MaxClosedClients, wait) {
Expand Down
5 changes: 4 additions & 1 deletion server/monitor_test.go
Expand Up @@ -491,7 +491,7 @@ func TestConnzRTT(t *testing.T) {

rtt, err := time.ParseDuration(ci.RTT)
if err != nil {
t.Fatalf("Could not parse RTT properly, %v", err)
t.Fatalf("Could not parse RTT properly, %v (ci.RTT=%v)", err, ci.RTT)
}
if rtt <= 0 {
t.Fatal("Expected RTT to be valid and non-zero\n")
Expand Down Expand Up @@ -1436,6 +1436,9 @@ func TestConnzTLSInHandshake(t *testing.T) {
}
defer c.Close()

// Wait for the connection to be registered
waitForClientConnCount(t, s, 1)

start := time.Now()
endpoint := fmt.Sprintf("http://%s:%d/connz", opts.HTTPHost, s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
Expand Down
85 changes: 85 additions & 0 deletions server/norace_test.go
@@ -0,0 +1,85 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !race

package server

import (
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/nats-io/go-nats"
)

// IMPORTANT: Tests in this file are not executed when running with the -race flag.

func TestAvoidSlowConsumerBigMessages(t *testing.T) {
opts := DefaultOptions() // Use defaults to make sure they avoid pending slow consumer.
s := RunServer(opts)
defer s.Shutdown()

nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc1.Close()

nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()

data := make([]byte, 1024*1024) // 1MB payload
rand.Read(data)

expected := int32(1000)
received := int32(0)

done := make(chan bool)

// Create Subscription.
nc1.Subscribe("slow.consumer", func(m *nats.Msg) {
// Just eat it so that we are not measuring
// code time, just delivery.
atomic.AddInt32(&received, 1)
if received >= expected {
done <- true
}
})

// Create Error handler
nc1.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
t.Fatalf("Received an error on the subscription's connection: %v\n", err)
})

for i := int32(0); i < expected; i++ {
nc2.Publish("slow.consumer", data)
}
nc2.Flush()

select {
case <-done:
return
case <-time.After(10 * time.Second):
r := atomic.LoadInt32(&received)
if s.NumSlowConsumers() > 0 {
t.Fatalf("Did not receive all large messages due to slow consumer status: %d of %d", r, expected)
}
t.Fatalf("Failed to receive all large messages: %d of %d\n", r, expected)
}
}
2 changes: 1 addition & 1 deletion server/reload_test.go
Expand Up @@ -1855,12 +1855,12 @@ func TestConfigReloadRotateFiles(t *testing.T) {
func runServerWithSymlinkConfig(t *testing.T, symlinkName, configName string) (*Server, *Options, string) {
opts, config := newOptionsWithSymlinkConfig(t, symlinkName, configName)
opts.NoLog = true
opts.NoSigs = true
return RunServer(opts), opts, config
}

func newServerWithSymlinkConfig(t *testing.T, symlinkName, configName string) (*Server, *Options, string) {
opts, config := newOptionsWithSymlinkConfig(t, symlinkName, configName)
opts.NoSigs = true
return New(opts), opts, config
}

Expand Down
13 changes: 11 additions & 2 deletions server/server_test.go
Expand Up @@ -419,7 +419,7 @@ func TestProcessCommandLineArgs(t *testing.T) {

func TestWriteDeadline(t *testing.T) {
opts := DefaultOptions()
opts.WriteDeadline = 1 * time.Millisecond
opts.WriteDeadline = 30 * time.Millisecond
s := RunServer(opts)
defer s.Shutdown()

Expand Down Expand Up @@ -593,7 +593,16 @@ func TestCustomRouterAuthentication(t *testing.T) {
opts2.Routes = RoutesFromStr(fmt.Sprintf("nats://invalid@127.0.0.1:%d", clusterPort))
s2 := RunServer(opts2)
defer s2.Shutdown()
if nr := s2.NumRoutes(); nr != 0 {
timeout := time.Now().Add(2 * time.Second)
nr := 0
for time.Now().Before(timeout) {
nr = s2.NumRoutes()
if nr == 0 {
break
}
time.Sleep(15 * time.Millisecond)
}
if nr != 0 {
t.Fatalf("Expected no route, got %v", nr)
}

Expand Down
40 changes: 0 additions & 40 deletions test/cluster_test.go
Expand Up @@ -488,43 +488,3 @@ func TestAutoUnsubscribePropagationOnClientDisconnect(t *testing.T) {
t.Fatalf("%v", err)
}
}

func TestRouteFormTimeWithHighSubscriptions(t *testing.T) {
srvA, optsA := RunServerWithConfig("./configs/srv_a.conf")
defer srvA.Shutdown()

clientA := createClientConn(t, optsA.Host, optsA.Port)
defer clientA.Close()

sendA, expectA := setupConn(t, clientA)

// Now add lots of subscriptions. These will need to be forwarded
// to new routes when they are added.
subsTotal := 100000
for i := 0; i < subsTotal; i++ {
subject := fmt.Sprintf("FOO.BAR.BAZ.%d", i)
sendA(fmt.Sprintf("SUB %s %d\r\n", subject, i))
}
sendA("PING\r\n")
expectA(pongRe)

srvB, _ := RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()

checkClusterFormed(t, srvA, srvB)

// Now wait for all subscriptions to be processed.
if err := checkExpectedSubs(subsTotal, srvB); err != nil {
// Make sure we are not a slow consumer
// Check for slow consumer status
if srvA.NumSlowConsumers() > 0 {
t.Fatal("Did not receive all subscriptions due to slow consumer")
} else {
t.Fatalf("%v", err)
}
}
// Just double check the slow consumer status.
if srvA.NumSlowConsumers() > 0 {
t.Fatalf("Received a slow consumer notification: %d", srvA.NumSlowConsumers())
}
}
42 changes: 42 additions & 0 deletions test/fanout_test.go
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/nats-io/go-nats"
)

// IMPORTANT: Tests in this file are not executed when running with the -race flag.

// As we look to improve high fanout situations make sure we
// have a test that checks ordering for all subscriptions from a single subscriber.
func TestHighFanoutOrdering(t *testing.T) {
Expand Down Expand Up @@ -84,3 +86,43 @@ func TestHighFanoutOrdering(t *testing.T) {

wg.Wait()
}

func TestRouteFormTimeWithHighSubscriptions(t *testing.T) {
srvA, optsA := RunServerWithConfig("./configs/srv_a.conf")
defer srvA.Shutdown()

clientA := createClientConn(t, optsA.Host, optsA.Port)
defer clientA.Close()

sendA, expectA := setupConn(t, clientA)

// Now add lots of subscriptions. These will need to be forwarded
// to new routes when they are added.
subsTotal := 100000
for i := 0; i < subsTotal; i++ {
subject := fmt.Sprintf("FOO.BAR.BAZ.%d", i)
sendA(fmt.Sprintf("SUB %s %d\r\n", subject, i))
}
sendA("PING\r\n")
expectA(pongRe)

srvB, _ := RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()

checkClusterFormed(t, srvA, srvB)

// Now wait for all subscriptions to be processed.
if err := checkExpectedSubs(subsTotal, srvB); err != nil {
// Make sure we are not a slow consumer
// Check for slow consumer status
if srvA.NumSlowConsumers() > 0 {
t.Fatal("Did not receive all subscriptions due to slow consumer")
} else {
t.Fatalf("%v", err)
}
}
// Just double check the slow consumer status.
if srvA.NumSlowConsumers() > 0 {
t.Fatalf("Received a slow consumer notification: %d", srvA.NumSlowConsumers())
}
}
37 changes: 37 additions & 0 deletions test/routes_test.go
Expand Up @@ -890,6 +890,9 @@ func TestRouteBasicPermissions(t *testing.T) {
}
defer subBbaz.Unsubscribe()
ncb.Flush()
if err := checkExpectedSubs(2, srvA, srvB); err != nil {
t.Fatal(err.Error())
}

// Create a connection to server A
nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Port))
Expand Down Expand Up @@ -920,6 +923,14 @@ func TestRouteBasicPermissions(t *testing.T) {
}
defer subBfoo.Unsubscribe()
ncb.Flush()
// B should have now 3 subs
if err := checkExpectedSubs(3, srvB); err != nil {
t.Fatal(err.Error())
}
// and A still 2.
if err := checkExpectedSubs(2, srvA); err != nil {
t.Fatal(err.Error())
}
// So producing on "foo" from A should not be forwarded to B.
if err := nca.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
Expand All @@ -938,6 +949,10 @@ func TestRouteBasicPermissions(t *testing.T) {
}
defer subAbat.Unsubscribe()
nca.Flush()
// A should have 3 subs
if err := checkExpectedSubs(3, srvA); err != nil {
t.Fatal(err.Error())
}
// And from B, send a message on that subject and make sure it is not received.
if err := ncb.Publish("bat", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
Expand All @@ -951,6 +966,10 @@ func TestRouteBasicPermissions(t *testing.T) {
// Stop subscription on foo from B
subBfoo.Unsubscribe()
ncb.Flush()
// Back to 2 subs on B
if err := checkExpectedSubs(2, srvB); err != nil {
t.Fatal(err.Error())
}

// Create subscription on foo from A, this should be forwared to B.
subAfoo, err := nca.Subscribe("foo", cb)
Expand All @@ -965,6 +984,14 @@ func TestRouteBasicPermissions(t *testing.T) {
}
defer subAfoo2.Unsubscribe()
nca.Flush()
// A should have 5 subs
if err := checkExpectedSubs(5, srvA); err != nil {
t.Fatal(err.Error())
}
// B should have 4
if err := checkExpectedSubs(4, srvB); err != nil {
t.Fatal(err.Error())
}
// Send a message from B and check that it is received.
if err := ncb.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
Expand All @@ -982,9 +1009,19 @@ func TestRouteBasicPermissions(t *testing.T) {
ncb.Close()
srvB.Shutdown()

// Since B had 2 local subs, A should go from 5 to 3
if err := checkExpectedSubs(3, srvA); err != nil {
t.Fatal(err.Error())
}

// Restart server B
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()
// Check that subs from A that can be sent to B are sent.
// That would be 2 (the 2 subscriptions on foo).
if err := checkExpectedSubs(2, srvB); err != nil {
t.Fatal(err.Error())
}

// Connect to B and send on "foo" and make sure we receive
ncb, err = nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsB.Port))
Expand Down

0 comments on commit 9821476

Please sign in to comment.