Skip to content

Commit

Permalink
websocket-bench: Count received broadcast messages
Browse files Browse the repository at this point in the history
Counting stops when no message arrives during 1 second.
  • Loading branch information
zyla committed Sep 3, 2016
1 parent f1c8f16 commit b4c7104
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
46 changes: 25 additions & 21 deletions go/src/hashrocket/websocket-bench/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ const (
)

type Client struct {
conn *websocket.Conn
config *websocket.Config
laddr *net.TCPAddr
dest string
origin string
serverType string
serverAdapter ServerAdapter
cmdChan <-chan int
rxErrChan chan error
rttResultChan chan time.Duration
doneChan chan error
payloadPadding string
conn *websocket.Conn
config *websocket.Config
laddr *net.TCPAddr
dest string
origin string
serverType string
serverAdapter ServerAdapter
cmdChan <-chan int
rxErrChan chan error
rttResultChan chan time.Duration
broadcastReceivedChan chan struct{}
doneChan chan error
payloadPadding string
}

type ServerAdapter interface {
Expand All @@ -53,6 +54,7 @@ func NewClient(
dest, origin, serverType string,
cmdChan <-chan int,
rttResultChan chan time.Duration,
broadcastReceivedChan chan struct{},
doneChan chan error,
padding string,
) (*Client, error) {
Expand All @@ -61,14 +63,15 @@ func NewClient(
}

c := &Client{
laddr: laddr,
dest: dest,
origin: origin,
cmdChan: cmdChan,
rxErrChan: make(chan error),
rttResultChan: rttResultChan,
doneChan: doneChan,
payloadPadding: padding,
laddr: laddr,
dest: dest,
origin: origin,
cmdChan: cmdChan,
rxErrChan: make(chan error),
rttResultChan: rttResultChan,
broadcastReceivedChan: broadcastReceivedChan,
doneChan: doneChan,
payloadPadding: padding,
}

config, err := websocket.NewConfig(dest, origin)
Expand Down Expand Up @@ -144,7 +147,7 @@ func (c *Client) Run() {
case clientResetCmd:
c.conn.Close()
<-c.rxErrChan
if c2, err := NewClient(c.laddr, c.dest, c.origin, c.serverType, c.cmdChan, c.rttResultChan, c.doneChan, c.payloadPadding); err == nil {
if c2, err := NewClient(c.laddr, c.dest, c.origin, c.serverType, c.cmdChan, c.rttResultChan, c.broadcastReceivedChan, c.doneChan, c.payloadPadding); err == nil {
go c2.Run()
} else {
c.doneChan <- err
Expand Down Expand Up @@ -182,6 +185,7 @@ func (c *Client) rx() {
c.rxErrChan <- fmt.Errorf("received unparsable %s payload: %v", msg.Type, msg.Payload)
}
case "broadcast":
c.broadcastReceivedChan <- struct{}{}
default:
c.rxErrChan <- fmt.Errorf("received unknown message type: %v", msg.Type)
}
Expand Down
34 changes: 27 additions & 7 deletions go/src/hashrocket/websocket-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ func Stress(cmd *cobra.Command, args []string) {
localAddrs := parseTCPAddrs(options.localAddrs)
cmdChan := make(chan int)
rttResultChan := make(chan time.Duration)
bcReceivedChan := make(chan struct{})
doneChan := make(chan error)

payloadPadding := strings.Repeat("1234567890", options.payloadPaddingSize/10+1)
payloadPadding = payloadPadding[:options.payloadPaddingSize]

clientCount := 0
for {
if err := startClients(options.serverType, options.stepSize, localAddrs, cmdChan, rttResultChan, doneChan, payloadPadding); err != nil {
if err := startClients(options.serverType, options.stepSize, localAddrs, cmdChan, rttResultChan, bcReceivedChan, doneChan, payloadPadding); err != nil {
log.Fatal(err)
}
clientCount += options.stepSize
Expand All @@ -100,29 +101,48 @@ func Stress(cmd *cobra.Command, args []string) {
inProgress += 1
}

expectedBcs := options.concurrent * clientCount
receivedBcs := 0

timeout := false

var rttAgg rttAggregate
for rttAgg.Count() < options.sampleSize {
for !timeout {
select {
case result := <-rttResultChan:
rttAgg.Add(result)
inProgress -= 1
if rttAgg.Count() < options.sampleSize {
rttAgg.Add(result)
inProgress -= 1
}
case <-bcReceivedChan:
receivedBcs += 1
case err := <-doneChan:
fmt.Println("doneChan err:", err)
clientCount--
case <-time.After(1 * time.Second):
timeout = true
}

if rttAgg.Count()+inProgress < options.sampleSize {
cmdChan <- clientCmd
inProgress += 1
expectedBcs += clientCount
}
}

if rttAgg.Count() == 0 {
fmt.Printf("No replies received!\n")
continue
}

if options.limitRTT < rttAgg.Percentile(options.limitPercentile) {
return
}

fmt.Printf("clients: %5d %dper-rtt: %3dms min-rtt: %3dms median-rtt: %3dms max-rtt: %3dms\n",
fmt.Printf("clients: %5d expected: %5d rcvd: %5d %dper-rtt: %3dms min-rtt: %3dms median-rtt: %3dms max-rtt: %3dms\n",
clientCount,
expectedBcs,
receivedBcs,
options.limitPercentile,
roundToMS(rttAgg.Percentile(options.limitPercentile)),
roundToMS(rttAgg.Min()),
Expand All @@ -135,10 +155,10 @@ func roundToMS(d time.Duration) int64 {
return int64((d + (500 * time.Microsecond)) / time.Millisecond)
}

func startClients(serverType string, count int, localAddrs []*net.TCPAddr, cmdChan <-chan int, rttResultChan chan time.Duration, doneChan chan error, padding string) error {
func startClients(serverType string, count int, localAddrs []*net.TCPAddr, cmdChan <-chan int, rttResultChan chan time.Duration, bcReceivedChan chan struct{}, doneChan chan error, padding string) error {
for i := 0; i < count; i++ {
laddr := localAddrs[i%len(localAddrs)]
c, err := NewClient(laddr, options.websocketURL, options.websocketOrigin, serverType, cmdChan, rttResultChan, doneChan, padding)
c, err := NewClient(laddr, options.websocketURL, options.websocketOrigin, serverType, cmdChan, rttResultChan, bcReceivedChan, doneChan, padding)
if err != nil {
return err
}
Expand Down

0 comments on commit b4c7104

Please sign in to comment.