Skip to content

Commit

Permalink
monitor: new listener v1.2 with reusing en/decoder
Browse files Browse the repository at this point in the history
We previously encoded the payload with a new gob encoder per message.
This had a significant overhead because gob introspected and generated
type information for the encoding. This increased CPU usage and the
amount of data sent to listeners. The new v1.2 encoding scheme reuses a
gob encoder per listener, thus only sending the type information once per
session.

Signed-off-by: Ray Bejjani <ray@covalent.io>
  • Loading branch information
raybejjani authored and tgraf committed Aug 10, 2018
1 parent c2011e6 commit b88afa2
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 20 deletions.
21 changes: 21 additions & 0 deletions cilium/cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ func setupSigHandler() {
func openMonitorSock() (conn net.Conn, version listener.Version, err error) {
errors := make([]string, 0)

// try the 1.2 socket
conn, err = net.Dial("unix", defaults.MonitorSockPath1_2)
if err == nil {
return conn, listener.Version1_2, nil
}
errors = append(errors, defaults.MonitorSockPath1_2+": "+err.Error())

// try the 1.1 socket
conn, err = net.Dial("unix", defaults.MonitorSockPath1_0)
if err == nil {
Expand Down Expand Up @@ -384,6 +391,20 @@ func getMonitorParser(conn net.Conn, version listener.Version) (parser eventPars
return &pl, nil
}, nil

case listener.Version1_2:
var (
pl payload.Payload
dec = gob.NewDecoder(conn)
)
// This implemenents the newer 1.2 API. Each listener maintains its own gob
// session, and type information is only ever sent once.
return func() (*payload.Payload, error) {
if err := pl.DecodeBinary(dec); err != nil {
return nil, err
}
return &pl, nil
}, nil

default:
return nil, fmt.Errorf("unsupported version %s", version)
}
Expand Down
7 changes: 1 addition & 6 deletions monitor/launch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,7 @@ func (nm *NodeMonitor) send(data []byte) error {

p := payload.Payload{Data: data, CPU: 0, Lost: nm.lostSinceLastTime(), Type: payload.EventSample}

buf, err := p.BuildMessage()
if err != nil {
return err
}

if _, err := nm.pipe.Write(buf); err != nil {
if err := p.WriteBinary(nm.pipe); err != nil {
nm.pipe.Close()
nm.pipe = nil
return fmt.Errorf("Unable to write message buffer to pipe: %s", err)
Expand Down
3 changes: 3 additions & 0 deletions monitor/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (

// Version1_0 is the API 1.0 version of the protocol (see above).
Version1_0 = Version("1.0")

// Version1_2 is the API 1.0 version of the protocol (see above).
Version1_2 = Version("1.2")
)

// MonitorListener is a generic consumer of monitor events. Implementers are
Expand Down
80 changes: 80 additions & 0 deletions monitor/listener1_2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2018 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/gob"
"net"

"github.com/cilium/cilium/monitor/listener"
"github.com/cilium/cilium/monitor/payload"
)

// listenerv1_2 implements the ciliim-node-monitor API protocol compatible with
// cilium 1.2
// cleanupFn is called on exit
type listenerv1_2 struct {
conn net.Conn
queue chan *payload.Payload
cleanupFn func(listener.MonitorListener)
}

func newListenerv1_2(c net.Conn, queueSize int, cleanupFn func(listener.MonitorListener)) *listenerv1_2 {
ml := &listenerv1_2{
conn: c,
queue: make(chan *payload.Payload, queueSize),
cleanupFn: cleanupFn,
}

go ml.drainQueue()

return ml
}

func (ml *listenerv1_2) Enqueue(pl *payload.Payload) {
select {
case ml.queue <- pl:
default:
log.Debugf("Per listener queue is full, dropping message")
}
}

// drainQueue encodes and sends monitor payloads to the listener. It is
// intended to be a goroutine.
func (ml *listenerv1_2) drainQueue() {
defer func() {
ml.conn.Close()
ml.cleanupFn(ml)
}()

enc := gob.NewEncoder(ml.conn)
for pl := range ml.queue {
if err := pl.EncodeBinary(enc); err != nil {
switch {
case listener.IsDisconnected(err):
log.Info("Listener disconnected")
return

default:
log.WithError(err).Warn("Removing listener due to write failure")
return
}
}
}
}

func (ml *listenerv1_2) Version() listener.Version {
return listener.Version1_2
}
6 changes: 5 additions & 1 deletion monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,13 @@ func runNodeMonitor() {
defer server1_0.Close() // Stop accepting new v1.0 connections
log.Infof("Serving cilium node monitor v1.0 API at unix://%s", defaults.MonitorSockPath1_0)

server1_2 := buildServerOrExit(defaults.MonitorSockPath1_2)
defer server1_2.Close() // Stop accepting new v1.2 connections
log.Infof("Serving cilium node monitor v1.2 API at unix://%s", defaults.MonitorSockPath1_2)

mainCtx, mainCtxCancel := context.WithCancel(context.Background())

monitorSingleton, err = NewMonitor(mainCtx, npages, pipe, server1_0)
monitorSingleton, err = NewMonitor(mainCtx, npages, pipe, server1_0, server1_2)
if err != nil {
log.WithError(err).Fatal("Error initialising monitor handlers")
}
Expand Down
36 changes: 31 additions & 5 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (m *Monitor) agentPipeReader(ctx context.Context, agentPipe io.Reader) {
log.Info("Beginning to read cilium agent events")
defer log.Info("Stopped reading cilium agent events")

meta, p := payload.Meta{}, payload.Payload{}
p := payload.Payload{}
for !isCtxDone(ctx) {
err := payload.ReadMetaPayload(agentPipe, &meta, &p)
err := p.ReadBinary(agentPipe)
switch {
// this captures the case where we are shutting down and main closes the
// pipe socket
Expand All @@ -100,7 +100,7 @@ func (m *Monitor) agentPipeReader(ctx context.Context, agentPipe io.Reader) {
// handling.
// Note that the perf buffer reader is started only when listeners are
// connected.
func NewMonitor(ctx context.Context, nPages int, agentPipe io.Reader, server1_0 net.Listener) (m *Monitor, err error) {
func NewMonitor(ctx context.Context, nPages int, agentPipe io.Reader, server1_0, server1_2 net.Listener) (m *Monitor, err error) {
m = &Monitor{
ctx: ctx,
listeners: make(map[listener.MonitorListener]struct{}),
Expand All @@ -110,6 +110,7 @@ func NewMonitor(ctx context.Context, nPages int, agentPipe io.Reader, server1_0

// start new MonitorListener handler
go m.connectionHandler1_0(ctx, server1_0)
go m.connectionHandler1_2(ctx, server1_2)

// start agent event pipe reader
go m.agentPipeReader(ctx, agentPipe)
Expand Down Expand Up @@ -139,7 +140,9 @@ func (m *Monitor) registerNewListener(parentCtx context.Context, conn net.Conn,
newListener := newListenerv1_0(conn, queueSize, m.removeListener)
m.listeners[newListener] = struct{}{}

log.WithField("count.listener", len(m.listeners)).Info("New listener connected.")
case listener.Version1_2:
newListener := newListenerv1_2(conn, queueSize, m.removeListener)
m.listeners[newListener] = struct{}{}

default:
conn.Close()
Expand Down Expand Up @@ -248,7 +251,7 @@ func (m *Monitor) dumpStat() {
fmt.Println(string(mp))
}

// connectionHandler handles all the incoming connections and sets up the
// connectionHandler1_0 handles all the incoming connections and sets up the
// listener objects. It will block on Accept, but expects the caller to close
// server, inducing a return.
func (m *Monitor) connectionHandler1_0(parentCtx context.Context, server net.Listener) {
Expand All @@ -271,6 +274,29 @@ func (m *Monitor) connectionHandler1_0(parentCtx context.Context, server net.Lis
}
}

// connectionHandler1_2 handles all the incoming connections and sets up the
// listener objects. It will block on Accept, but expects the caller to close
// server, inducing a return.
func (m *Monitor) connectionHandler1_2(parentCtx context.Context, server net.Listener) {
for !isCtxDone(parentCtx) {
conn, err := server.Accept()
switch {
case isCtxDone(parentCtx) && conn != nil:
conn.Close()
fallthrough

case isCtxDone(parentCtx) && conn == nil:
return

case err != nil:
log.WithError(err).Warn("error accepting connection")
continue
}

m.registerNewListener(parentCtx, conn, listener.Version1_2)
}
}

// send enqueues the payload to all listeners.
func (m *Monitor) send(pl *payload.Payload) {
m.Lock()
Expand Down
12 changes: 11 additions & 1 deletion monitor/payload/monitor_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,25 @@ func (pl *Payload) Encode() ([]byte, error) {
// ReadBinary reads the payload from its binary representation.
func (pl *Payload) ReadBinary(r io.Reader) error {
dec := gob.NewDecoder(r)
return dec.Decode(pl)
return pl.DecodeBinary(dec)
}

// WriteBinary writes the payload into its binary representation.
func (pl *Payload) WriteBinary(w io.Writer) error {
enc := gob.NewEncoder(w)
return pl.EncodeBinary(enc)
}

// EncodeBinary writes the payload into its binary representation.
func (pl *Payload) EncodeBinary(enc *gob.Encoder) error {
return enc.Encode(pl)
}

// DecodeBinary reads the payload from its binary representation.
func (pl *Payload) DecodeBinary(dec *gob.Decoder) error {
return dec.Decode(pl)
}

// ReadMetaPayload reads a Meta and Payload from a Cilium monitor connection.
func ReadMetaPayload(r io.Reader, meta *Meta, pl *Payload) error {
if err := meta.ReadBinary(r); err != nil {
Expand Down
58 changes: 51 additions & 7 deletions monitor/payload/monitor_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package payload

import (
"bytes"
"encoding/gob"
"testing"

"github.com/cilium/cilium/pkg/comparator"
Expand Down Expand Up @@ -101,23 +102,66 @@ func (s *PayloadSuite) BenchmarkWriteMetaPayload(c *C) {
}

func (s *PayloadSuite) BenchmarkReadMetaPayload(c *C) {
meta1 := Meta{Size: 1234}
payload1 := Payload{
payload := Payload{
Data: []byte{1, 2, 3, 4},
Lost: 5243,
CPU: 12,
Type: 9,
}

var buf bytes.Buffer
err := WriteMetaPayload(&buf, &meta1, &payload1)
for i := 0; i < c.N; i++ {
err := payload.WriteBinary(&buf)
c.Assert(err, Equals, nil)
}

for i := 0; i < c.N; i++ {
err := payload.ReadBinary(&buf)
c.Assert(err, Equals, nil)
}
}

func (s *PayloadSuite) BenchmarkWritePayloadReuseEncoder(c *C) {
payload := Payload{
Data: []byte{1, 2, 3, 4},
Lost: 5243,
CPU: 12,
Type: 9,
}

// Do a first dry run to pre-allocate the buffer capacity.
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := payload.EncodeBinary(enc)
c.Assert(err, Equals, nil)

var meta2 Meta
var payload2 Payload
for i := 0; i < c.N; i++ {
readBuf := bytes.NewBuffer(buf.Bytes())
err = ReadMetaPayload(readBuf, &meta2, &payload2)
buf.Reset()
err := payload.EncodeBinary(enc)
c.Assert(err, Equals, nil)
}
}

func (s *PayloadSuite) BenchmarkReadPayloadRuseEncoder(c *C) {
payload := Payload{
Data: []byte{1, 2, 3, 4},
Lost: 5243,
CPU: 12,
Type: 9,
}

var buf bytes.Buffer

// prefill an encoded stream
enc := gob.NewEncoder(&buf)
for i := 0; i < c.N; i++ {
err := payload.EncodeBinary(enc)
c.Assert(err, Equals, nil)
}

dec := gob.NewDecoder(&buf)
for i := 0; i < c.N; i++ {
err := payload.DecodeBinary(dec)
c.Assert(err, Equals, nil)
}
}
5 changes: 5 additions & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ const (
// This is the 1.0 protocol version.
MonitorSockPath1_0 = RuntimePath + "/monitor.sock"

// MonitorSockPath1_2 is the path to the UNIX domain socket used to
// distribute BPF and agent events to listeners.
// This is the 1.2 protocol version.
MonitorSockPath1_2 = RuntimePath + "/monitor1_2.sock"

// PidFilePath is the path to the pid file for the agent.
PidFilePath = RuntimePath + "/cilium.pid"

Expand Down

0 comments on commit b88afa2

Please sign in to comment.