Skip to content

Commit

Permalink
Merge pull request #94 from Random-Liu/add-multiple-log-monitor-support
Browse files Browse the repository at this point in the history
Add multiple system log monitor support
  • Loading branch information
dchen1107 committed Feb 17, 2017
2 parents 92e67b8 + 889d9ef commit 0f5db9e
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.in
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN test -h /etc/localtime && rm -f /etc/localtime && cp /usr/share/zoneinfo/UTC

ADD ./bin/node-problem-detector /node-problem-detector
ADD config /config
ENTRYPOINT ["/node-problem-detector", "--system-log-monitor=/config/kernel-monitor.json"]
ENTRYPOINT ["/node-problem-detector", "--system-log-monitors=/config/kernel-monitor.json"]
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ List of supported problem daemons:
# Usage
## Flags
* `--version`: Print current version of node-problem-detector.
* `--system-log-monitor`: The configuration used by the system log monitor, e.g.
* `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g.
[config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json).
Node problem detector will start a separate log monitor for each configuration. You can
use different log monitors to monitor different system log.
* `--apiserver-override`: A URI parameter used to customize how node-problem-detector
connects the apiserver. The format is same as the
[`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes)
Expand Down
12 changes: 10 additions & 2 deletions cmd/node_problem_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,17 @@ func main() {
os.Exit(0)
}

l := systemlogmonitor.NewLogMonitorOrDie(npdo.SystemLogMonitorConfigPath)
monitors := make(map[string]systemlogmonitor.LogMonitor)
for _, config := range npdo.SystemLogMonitorConfigPaths {
if _, ok := monitors[config]; ok {
// Skip the config if it's duplictaed.
glog.Warningf("Duplicated log monitor configuration %q", config)
continue
}
monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config)
}
c := problemclient.NewClientOrDie(npdo)
p := problemdetector.NewProblemDetector(l, c)
p := problemdetector.NewProblemDetector(monitors, c)

// Start http server.
if npdo.ServerPort > 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
type NodeProblemDetectorOptions struct {
// command line options

// SystemLogMonitorConfigPath specifies the path to system log monitor configuration file.
SystemLogMonitorConfigPath string
// SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration
// files.
SystemLogMonitorConfigPaths []string
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// PrintVersion is the flag determining whether version information is printed.
Expand All @@ -55,8 +56,8 @@ func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions {

// AddFlags adds node problem detector command line options to pflag.
func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&npdo.SystemLogMonitorConfigPath, "system-log-monitor",
"/config/kernel-monitor.json", "The path to the system log monitor config file")
fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors",
[]string{}, "List of paths to system log monitor config files, comma separated.")
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
"", "Custom URI used to connect to Kubernetes ApiServer")
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")
Expand Down
39 changes: 32 additions & 7 deletions pkg/problemdetector/problem_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package problemdetector

import (
"fmt"
"net/http"

"github.com/golang/glog"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)

Expand All @@ -38,28 +40,39 @@ type ProblemDetector interface {
type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
// TODO(random-liu): Use slices of problem daemons if multiple monitors are needed in the future
monitor systemlogmonitor.LogMonitor
monitors map[string]systemlogmonitor.LogMonitor
}

// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitor systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
func NewProblemDetector(monitors map[string]systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
monitor: monitor,
monitors: monitors,
}
}

// Run starts the problem detector.
func (p *problemDetector) Run() error {
p.conditionManager.Start()
ch, err := p.monitor.Start()
if err != nil {
return err
// Start the log monitors one by one.
var chans []<-chan *types.Status
for cfg, m := range p.monitors {
ch, err := m.Start()
if err != nil {
// Do not return error and keep on trying the following config files.
glog.Errorf("Failed to start log monitor %q: %v", cfg, err)
continue
}
chans = append(chans, ch)
}
if len(chans) == 0 {
return fmt.Errorf("no log montior is successfully setup")
}
ch := groupChannel(chans)
glog.Info("Problem detector started")

for {
select {
case status := <-ch:
Expand All @@ -80,3 +93,15 @@ func (p *problemDetector) RegisterHTTPHandlers() {
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
})
}

func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
statuses := make(chan *types.Status)
for _, ch := range chans {
go func(c <-chan *types.Status) {
for status := range c {
statuses <- status
}
}(ch)
}
return statuses
}
15 changes: 15 additions & 0 deletions pkg/systemlogmonitor/log_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ limitations under the License.
package systemlogmonitor

import (
"fmt"
"reflect"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"

watchertest "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/testing"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
"k8s.io/node-problem-detector/pkg/types"
)
Expand Down Expand Up @@ -131,3 +136,13 @@ func TestGenerateStatus(t *testing.T) {
}
}
}

func TestGoroutineLeak(t *testing.T) {
orignal := runtime.NumGoroutine()
f := watchertest.NewFakeLogWatcher(10)
f.InjectError(fmt.Errorf("unexpected error"))
l := &logMonitor{watcher: f}
_, err := l.Start()
assert.Error(t, err)
assert.Equal(t, orignal, runtime.NumGoroutine())
}
14 changes: 14 additions & 0 deletions pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filelog
import (
"io/ioutil"
"os"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -170,3 +171,16 @@ Jan 2 03:04:05 kernel: [2.000000] 3
}
}
}

func TestGoroutineLeak(t *testing.T) {
orignal := runtime.NumGoroutine()
w := NewSyslogWatcherOrDie(types.WatcherConfig{
Plugin: "filelog",
PluginConfig: getTestPluginConfig(),
LogPath: "/not/exist/path",
Lookback: "10m",
})
_, err := w.Watch()
assert.Error(t, err)
assert.Equal(t, orignal, runtime.NumGoroutine())
}
6 changes: 6 additions & 0 deletions pkg/systemlogmonitor/logwatchers/journald/log_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package journald

import (
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -134,6 +135,11 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err)
}
// If the path doesn't present, NewJournalFromDir will create it instead of
// returning error. So check the path existence ourselves.
if _, err := os.Stat(path); err != nil {
return nil, fmt.Errorf("failed to stat the log path %q: %v", path, err)
}
// Get journal client from the log path.
journal, err := sdjournal.NewJournalFromDir(path)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ limitations under the License.
package journald

import (
"runtime"
"testing"
"time"

"github.com/coreos/go-systemd/sdjournal"
"github.com/stretchr/testify/assert"

"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
)

Expand Down Expand Up @@ -62,3 +64,16 @@ func TestTranslate(t *testing.T) {
assert.Equal(t, test.log, translate(test.entry))
}
}

func TestGoroutineLeak(t *testing.T) {
orignal := runtime.NumGoroutine()
w := NewJournaldWatcher(types.WatcherConfig{
Plugin: "journald",
PluginConfig: map[string]string{"source": "not-exist-service"},
LogPath: "/not/exist/path",
Lookback: "10m",
})
_, err := w.Watch()
assert.Error(t, err)
assert.Equal(t, orignal, runtime.NumGoroutine())
}
59 changes: 59 additions & 0 deletions pkg/systemlogmonitor/logwatchers/testing/fake_log_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2017 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testing

import (
"sync"

"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
)

// FakeLogWatcher is a fake mock of log watcher.
type FakeLogWatcher struct {
sync.Mutex
buf chan *logtypes.Log
err error
}

var _ types.LogWatcher = &FakeLogWatcher{}

func NewFakeLogWatcher(bufferSize int) *FakeLogWatcher {
return &FakeLogWatcher{buf: make(chan *logtypes.Log, bufferSize)}
}

// InjectLog injects a fake log into the watch channel
func (f *FakeLogWatcher) InjectLog(log *logtypes.Log) {
f.buf <- log
}

// InjectError injects an error of Watch function.
func (f *FakeLogWatcher) InjectError(err error) {
f.Lock()
defer f.Unlock()
f.err = err
}

// Watch is the fake watch function.
func (f *FakeLogWatcher) Watch() (<-chan *logtypes.Log, error) {
return f.buf, f.err
}

// Stop is the fake stop function.
func (f *FakeLogWatcher) Stop() {
close(f.buf)
}

0 comments on commit 0f5db9e

Please sign in to comment.