Skip to content

Commit

Permalink
UDP service listener performance enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Nov 5, 2015
1 parent 780df57 commit a8c1c10
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 27 deletions.
13 changes: 11 additions & 2 deletions services/udp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ const (
DefaultDatabase = "udp"

// DefaultBatchSize is the default UDP batch size.
DefaultBatchSize = 1000
DefaultBatchSize = 5000

// DefaultBatchPending is the default number of pending UDP batches.
DefaultBatchPending = 5
DefaultBatchPending = 10

// DefaultBatchTimeout is the default UDP batch timeout.
DefaultBatchTimeout = time.Second

// DefaultReadBuffer is the default buffer size for the UDP listener.
// Increasing this increases the number of packets that the listener can handle,
// but also increases the memory usage.
DefaultReadBuffer = 8 * 1024 * 1024
)

type Config struct {
Expand All @@ -28,6 +33,7 @@ type Config struct {
RetentionPolicy string `toml:"retention-policy"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
ReadBuffer int `toml:"read-buffer"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
}

Expand All @@ -47,5 +53,8 @@ func (c *Config) WithDefaults() *Config {
if d.BatchTimeout == 0 {
d.BatchTimeout = toml.Duration(DefaultBatchTimeout)
}
if d.ReadBuffer == 0 {
d.ReadBuffer = DefaultReadBuffer
}
return &d
}
69 changes: 44 additions & 25 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import (
"github.com/influxdb/influxdb/tsdb"
)

// from https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const (
// Maximum UDP packet size
// see https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDPBufferSize = 65536

// Arbitrary, testing indicated that this doesn't typically get over 10
parserChanLen = 1000
)

// statistics gathered by the UDP package.
Expand All @@ -44,8 +48,9 @@ type Service struct {
wg sync.WaitGroup
done chan struct{}

batcher *tsdb.PointBatcher
config Config
parserChan chan []byte
batcher *tsdb.PointBatcher
config Config

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -62,10 +67,11 @@ type Service struct {
func NewService(c Config) *Service {
d := *c.WithDefaults()
return &Service{
config: d,
done: make(chan struct{}),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
config: d,
done: make(chan struct{}),
parserChan: make(chan []byte, parserChanLen),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
}
}

Expand Down Expand Up @@ -98,11 +104,13 @@ func (s *Service) Open() (err error) {
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err)
return err
}
s.conn.SetReadBuffer(s.config.ReadBuffer)

s.Logger.Printf("Started listening on UDP: %s", s.config.BindAddress)

s.wg.Add(2)
s.wg.Add(3)
go s.serve()
go s.parser()
go s.writePoints()

return nil
Expand Down Expand Up @@ -138,35 +146,46 @@ func (s *Service) serve() {

s.batcher.Start()
for {
buf := make([]byte, UDPBufferSize)

select {
case <-s.done:
// We closed the connection, time to go.
return
default:
// Keep processing.
buf := make([]byte, UDPBufferSize)
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("Failed to read UDP message: %s", err)
continue
}
s.statMap.Add(statBytesReceived, int64(n))
s.parserChan <- buf[:n]
}
}
}

n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("Failed to read UDP message: %s", err)
continue
}
s.statMap.Add(statBytesReceived, int64(n))
func (s *Service) parser() {
defer s.wg.Done()

points, err := models.ParsePoints(buf[:n])
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Failed to parse points: %s", err)
continue
}
for {
select {
case <-s.done:
return
case buf := <-s.parserChan:
points, err := models.ParsePoints(buf)
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Failed to parse points: %s", err)
return
}

for _, point := range points {
s.batcher.In() <- point
for _, point := range points {
s.batcher.In() <- point
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
}

Expand Down

0 comments on commit a8c1c10

Please sign in to comment.