Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract daemon statsCollector to its own package #29887

Merged
merged 1 commit into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/events"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/initlayer"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/plugin"
"github.com/docker/libnetwork/cluster"
// register graph drivers
_ "github.com/docker/docker/daemon/graphdriver/register"
"github.com/docker/docker/daemon/initlayer"
"github.com/docker/docker/daemon/stats"
dmetadata "github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
Expand All @@ -46,13 +45,15 @@ import (
"github.com/docker/docker/pkg/sysinfo"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/pkg/truncindex"
"github.com/docker/docker/plugin"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
volumedrivers "github.com/docker/docker/volume/drivers"
"github.com/docker/docker/volume/local"
"github.com/docker/docker/volume/store"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/cluster"
nwconfig "github.com/docker/libnetwork/config"
"github.com/docker/libtrust"
"github.com/pkg/errors"
Expand Down Expand Up @@ -82,7 +83,7 @@ type Daemon struct {
trustKey libtrust.PrivateKey
idIndex *truncindex.TruncIndex
configStore *Config
statsCollector *statsCollector
statsCollector *stats.Collector
defaultLogConfig containertypes.LogConfig
RegistryService registry.Service
EventsService *events.Events
Expand All @@ -106,6 +107,8 @@ type Daemon struct {
clusterProvider cluster.Provider
cluster Cluster

machineMemory uint64

seccompProfile []byte
seccompProfilePath string
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/daemon_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,8 +1125,8 @@ func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
Limit: mem.Limit,
}
// if the container does not set memory limit, use the machineMemory
if mem.Limit > daemon.statsCollector.machineMemory && daemon.statsCollector.machineMemory > 0 {
s.MemoryStats.Limit = daemon.statsCollector.machineMemory
if mem.Limit > daemon.machineMemory && daemon.machineMemory > 0 {
s.MemoryStats.Limit = daemon.machineMemory
}
if cgs.PidsStats != nil {
s.PidsStats = types.PidsStats{
Expand Down
2 changes: 1 addition & 1 deletion daemon/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo

// stop collection of stats for the container regardless
// if stats are currently getting collected.
daemon.statsCollector.stopCollection(container)
daemon.statsCollector.StopCollection(container)

if err = daemon.containerStop(container, 3); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions daemon/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
}

func (daemon *Daemon) subscribeToContainerStats(c *container.Container) chan interface{} {
return daemon.statsCollector.collect(c)
return daemon.statsCollector.Collect(c)
}

func (daemon *Daemon) unsubscribeToContainerStats(c *container.Container, ch chan interface{}) {
daemon.statsCollector.unsubscribe(c, ch)
daemon.statsCollector.Unsubscribe(c, ch)
}

// GetContainerStats collects all the stats published by a container
Expand Down
101 changes: 101 additions & 0 deletions daemon/stats/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// +build !solaris

package stats

import (
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/container"
"github.com/docker/docker/pkg/pubsub"
)

// Collect registers the container with the collector and adds it to
// the event loop for collection on the specified interval returning
// a channel for the subscriber to receive on.
func (s *Collector) Collect(c *container.Container) chan interface{} {
s.m.Lock()
defer s.m.Unlock()
publisher, exists := s.publishers[c]
if !exists {
publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
s.publishers[c] = publisher
}
return publisher.Subscribe()
}

// StopCollection closes the channels for all subscribers and removes
// the container from metrics collection.
func (s *Collector) StopCollection(c *container.Container) {
s.m.Lock()
if publisher, exists := s.publishers[c]; exists {
publisher.Close()
delete(s.publishers, c)
}
s.m.Unlock()
}

// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
s.m.Lock()
publisher := s.publishers[c]
if publisher != nil {
publisher.Evict(ch)
if publisher.Len() == 0 {
delete(s.publishers, c)
}
}
s.m.Unlock()
}

// Run starts the collectors and will indefinitely collect stats from the supervisor
func (s *Collector) Run() {
type publishersPair struct {
container *container.Container
publisher *pubsub.Publisher
}
// we cannot determine the capacity here.
// it will grow enough in first iteration
var pairs []publishersPair

for range time.Tick(s.interval) {
// it does not make sense in the first iteration,
// but saves allocations in further iterations
pairs = pairs[:0]

s.m.Lock()
for container, publisher := range s.publishers {
// copy pointers here to release the lock ASAP
pairs = append(pairs, publishersPair{container, publisher})
}
s.m.Unlock()
if len(pairs) == 0 {
continue
}

systemUsage, err := s.getSystemCPUUsage()
if err != nil {
logrus.Errorf("collecting system cpu usage: %v", err)
continue
}

for _, pair := range pairs {
stats, err := s.supervisor.GetContainerStats(pair.container)
if err != nil {
if _, ok := err.(notRunningErr); !ok {
logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
}
continue
}
// FIXME: move to containerd on Linux (not Windows)
stats.CPUStats.SystemUsage = systemUsage

pair.publisher.Publish(*stats)
}
}
}

type notRunningErr interface {
error
ContainerIsRunning() bool
}
29 changes: 29 additions & 0 deletions daemon/stats/collector_solaris.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solaris 😿


import (
"github.com/docker/docker/container"
)

// platformNewStatsCollector performs platform specific initialisation of the
// Collector structure. This is a no-op on Windows.
func platformNewStatsCollector(s *Collector) {
}

// Collect registers the container with the collector and adds it to
// the event loop for collection on the specified interval returning
// a channel for the subscriber to receive on.
// Currently not supported on Solaris
func (s *Collector) Collect(c *container.Container) chan interface{} {
return nil
}

// StopCollection closes the channels for all subscribers and removes
// the container from metrics collection.
// Currently not supported on Solaris
func (s *Collector) StopCollection(c *container.Container) {
}

// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
// Currently not supported on Solaris
func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
// +build !windows,!solaris

package daemon
package stats

import (
"fmt"
"os"
"strconv"
"strings"

sysinfo "github.com/docker/docker/pkg/system"
"github.com/opencontainers/runc/libcontainer/system"
)

// platformNewStatsCollector performs platform specific initialisation of the
// statsCollector structure.
func platformNewStatsCollector(s *statsCollector) {
// Collector structure.
func platformNewStatsCollector(s *Collector) {
s.clockTicksPerSecond = uint64(system.GetClockTicks())
meminfo, err := sysinfo.ReadMemInfo()
if err == nil && meminfo.MemTotal > 0 {
s.machineMemory = uint64(meminfo.MemTotal)
}
}

const nanoSecondsPerSecond = 1e9
Expand All @@ -32,7 +27,7 @@ const nanoSecondsPerSecond = 1e9
// statistics line and then sums up the first seven fields
// provided. See `man 5 proc` for details on specific field
// information.
func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
func (s *Collector) getSystemCPUUsage() (uint64, error) {
var line string
f, err := os.Open("/proc/stat")
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// +build windows

package daemon
package stats

// platformNewStatsCollector performs platform specific initialisation of the
// statsCollector structure. This is a no-op on Windows.
func platformNewStatsCollector(s *statsCollector) {
// Collector structure. This is a no-op on Windows.
func platformNewStatsCollector(s *Collector) {
}

// getSystemCPUUsage returns the host system's cpu usage in
// nanoseconds. An error is returned if the format of the underlying
// file does not match. This is a no-op on Windows.
func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
func (s *Collector) getSystemCPUUsage() (uint64, error) {
return 0, nil
}
42 changes: 42 additions & 0 deletions daemon/stats/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package stats

import (
"bufio"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/container"
"github.com/docker/docker/pkg/pubsub"
)

type supervisor interface {
// GetContainerStats collects all the stats related to a container
GetContainerStats(container *container.Container) (*types.StatsJSON, error)
}

// NewCollector creates a stats collector that will poll the supervisor with the specified interval
func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
s := &Collector{
interval: interval,
supervisor: supervisor,
publishers: make(map[*container.Container]*pubsub.Publisher),
bufReader: bufio.NewReaderSize(nil, 128),
}

platformNewStatsCollector(s)

return s
}

// Collector manages and provides container resource stats
type Collector struct {
m sync.Mutex
supervisor supervisor
interval time.Duration
publishers map[*container.Container]*pubsub.Publisher
bufReader *bufio.Reader

// The following fields are not set on Windows currently.
clockTicksPerSecond uint64
}