Skip to content

Commit

Permalink
change logkit conf
Browse files Browse the repository at this point in the history
  • Loading branch information
redHJ committed Jan 22, 2019
1 parent badcba5 commit abfaf3b
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 315 deletions.
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -60,7 +60,6 @@ logkit
stats
.logkitconfs/
public/.project
parser/grok/patterns.go

# logkitWeb ignore
# See https://help.github.com/ignore-files/ for more about ignoring files.
Expand Down
9 changes: 8 additions & 1 deletion logkit.conf
Expand Up @@ -6,5 +6,12 @@
"profile_host":":3002",
"disable_web":false,
"static_root_path":"./public",
"confs_path": ["confs*"]
"confs_path": ["confs*"],
"collect_log_enable": false,
"pandora_name": "",
"pandora_ak": "",
"pandora_sk": "",
"pandora_pipeline": "",
"pandora_logdb": "",
"pandora_region": ""
}
25 changes: 18 additions & 7 deletions logkit.go
Expand Up @@ -263,17 +263,21 @@ func main() {
runtime.GOMAXPROCS(conf.MaxProcs)
log.SetOutputLevel(conf.DebugLevel)

stopRotate := make(chan struct{}, 0)
var (
stopRotate = make(chan struct{}, 0)
logdir, logpattern string
err error
)
defer close(stopRotate)
if conf.LogPath != "" {
logdir, logpattern, err := LogDirAndPattern(conf.LogPath)
logdir, logpattern, err = LogDirAndPattern(conf.LogPath)
if err != nil {
log.Fatal(err)
}
go loopRotateLogs(filepath.Join(logdir, logpattern), defaultRotateSize, 10*time.Second, stopRotate)
conf.CleanSelfPattern = logpattern + "-*"
conf.CleanSelfDir = logdir
conf.ManagerConfig.SelfLogSet.LogPath = conf.LogPath
conf.ManagerConfig.CollectLogPath = filepath.Join(logdir, logpattern+"-*")
}

log.Infof("Welcome to use Logkit, Version: %v \n\nConfig: %#v", NextVersion, conf)
Expand All @@ -283,6 +287,11 @@ func main() {
}
m.Version = NextVersion

if m.CollectLogRunner != nil {
go m.CollectLogRunner.Run()
time.Sleep(time.Second) // 等待1秒让收集器启动
}

paths := getValidPath(conf.ConfsPath)
if len(paths) <= 0 {
log.Warnf("Cannot read or create any ConfsPath %v", conf.ConfsPath)
Expand All @@ -292,13 +301,15 @@ func main() {
}
m.RestoreWebDir()

if m.SelfLogRunner != nil {
go m.SelfLogRunner.Run()
}

stopClean := make(chan struct{}, 0)
defer close(stopClean)
if conf.CleanSelfLog {
if conf.CleanSelfDir == "" && logdir != "" {
conf.CleanSelfDir = logdir
}
if conf.CleanSelfPattern == "" && logpattern != "" {
conf.CleanSelfPattern = logpattern + "-*"
}
go loopCleanLogkitLog(conf.CleanSelfDir, conf.CleanSelfPattern, conf.CleanSelfLogCnt, conf.CleanSelfDuration, stopClean)
}
if len(conf.BindHost) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion mgr/cluster_design.md
Expand Up @@ -46,7 +46,7 @@ logkit 的 cluster 功能配置非常简单,只需要在 logkit 的主配置
"bind_host":"127.0.0.1:4000", # 选填,默认自己找一个4000以上的可用端口开启
"profile_host":"localhost:6060", # 选填,默认为空,不开启
"clean_self_log":true, # 选填,默认false
"clean_self_dir":"./run", # 选填,clean_self_log 为true时候生效,默认 "./run"
"clean_self_dir":"./", # 选填,clean_self_log 为true时候生效,默认 "./"
"clean_self_pattern":"*.log-*", # 选填,clean_self_log 为true时候生效,默认 "*.log-*"
"clean_self_cnt":5, # 选填,clean_self_log 为true时候生效,默认 5
"rest_dir":"./.logkitconfs", # 选填,通过web页面存放的logkit配置文件夹,默认为logkit程序运行目录的子目录`.logkitconfs`下
Expand Down
1 change: 1 addition & 0 deletions mgr/dataflow.go
Expand Up @@ -98,6 +98,7 @@ func RawData(readerConfig conf.MapConf) ([]string, error) {

var rawData []string
timeout := time.NewTimer(time.Minute)
defer timeout.Stop()
select {
case de := <-readChan:
rawData, err = de.data, de.lastErr
Expand Down
3 changes: 2 additions & 1 deletion mgr/metric_runner.go
Expand Up @@ -316,7 +316,7 @@ func (r *MetricRunner) Run() {
continue
}
if len(tags) > 0 {
datas = addTagsToData(tags, datas, r.Name())
datas = AddTagsToData(tags, datas, r.Name())
}
r.rsMutex.Lock()
r.rs.ReadDataCount += int64(dataCnt)
Expand Down Expand Up @@ -391,6 +391,7 @@ func (mr *MetricRunner) Stop() {

log.Warnf("wait for MetricRunner " + mr.Name() + " stopped")
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case <-mr.exitChan:
log.Warnf("MetricRunner " + mr.Name() + " has been stopped ")
Expand Down
64 changes: 34 additions & 30 deletions mgr/mgr.go
Expand Up @@ -44,12 +44,15 @@ type ManagerConfig struct {
ServerBackup bool `json:"-"`
AuditDir string `json:"audit_dir"`

SelfLog
CollectLog
}

type SelfLog struct {
SelfLogSet
SelfLogEnable bool `json:"self_log_enable"`
type CollectLog struct {
CollectLogPath string `json:"collect_log_path"`
CollectLogEnable bool `json:"collect_log_enable"`
ReadFrom string `json:"read_from"`
EnvTag string `json:"-"`
Pandora
}

type cleanQueue struct {
Expand Down Expand Up @@ -86,7 +89,7 @@ type Manager struct {
Version string
SystemInfo string

SelfLogRunner *self.LogRunner
CollectLogRunner *self.LogRunner
}

func NewManager(conf ManagerConfig) (*Manager, error) {
Expand Down Expand Up @@ -121,33 +124,34 @@ func NewCustomManager(conf ManagerConfig, rr *reader.Registry, pr *parser.Regist
if err != nil {
return nil, err
}
var selfLogRunner *self.LogRunner
if conf.SelfLogEnable {
rdConf := self.SetReaderConfig(self.GetReaderConfig(), conf.LogPath, "", conf.ReadFrom)
var collectLogRunner *self.LogRunner
if conf.CollectLogEnable {
rdConf := self.SetReaderConfig(self.GetReaderConfig(), conf.CollectLogPath, "", conf.ReadFrom)
sdConf := self.SetSenderConfig(self.GetSenderConfig(), conf.Pandora)
selfLogRunner, err = self.NewLogRunner(rdConf, self.GetParserConfig(), self.GetTransformerConfig(), sdConf)
collectLogRunner, err = self.NewLogRunner(rdConf, self.GetParserConfig(), sdConf, conf.EnvTag)
if err != nil {
return nil, err
log.Errorf("new collect log runner failed: %v", err)
err = nil
}
}

m := &Manager{
ManagerConfig: conf,
cleanLock: new(sync.RWMutex),
watcherMux: new(sync.RWMutex),
cleanChan: make(chan cleaner.CleanSignal),
cleanQueues: make(map[string]*cleanQueue),
runners: make(map[string]Runner),
runnerConfigs: make(map[string]RunnerConfig),
runnerPaths: make(map[string]string),
watchers: make(map[string]*fsnotify.Watcher),
rregistry: rr,
pregistry: pr,
sregistry: sr,
SystemInfo: utilsos.GetOSInfo().String(),
audit: audt,
auditChan: make(chan audit.Message, 100),
SelfLogRunner: selfLogRunner,
ManagerConfig: conf,
cleanLock: new(sync.RWMutex),
watcherMux: new(sync.RWMutex),
cleanChan: make(chan cleaner.CleanSignal),
cleanQueues: make(map[string]*cleanQueue),
runners: make(map[string]Runner),
runnerConfigs: make(map[string]RunnerConfig),
runnerPaths: make(map[string]string),
watchers: make(map[string]*fsnotify.Watcher),
rregistry: rr,
pregistry: pr,
sregistry: sr,
SystemInfo: utilsos.GetOSInfo().String(),
audit: audt,
auditChan: make(chan audit.Message, 100),
CollectLogRunner: collectLogRunner,
}
return m, nil
}
Expand All @@ -174,8 +178,8 @@ func (m *Manager) Stop() error {
//在所有runner close以后,就保证了不会有audit message发送到Channel里
close(m.auditChan)

if m.SelfLogRunner != nil {
m.SelfLogRunner.Stop()
if m.CollectLogRunner != nil {
m.CollectLogRunner.Stop()
}
return nil
}
Expand Down Expand Up @@ -823,8 +827,8 @@ func (m *Manager) UpdateToken(tokens []AuthTokens) (err error) {
errMsg := make([]string, 0)
for _, token := range tokens {
runnerPath := token.RunnerName
if strings.HasPrefix(runnerPath, DefaultInternalPrefix) && m.SelfLogRunner != nil {
m.SelfLogRunner.TokenRefresh(token)
if strings.HasPrefix(runnerPath, DefaultInternalPrefix) && m.CollectLogRunner != nil {
m.CollectLogRunner.TokenRefresh(token)
continue
}

Expand Down
43 changes: 2 additions & 41 deletions mgr/runner.go
Expand Up @@ -743,7 +743,7 @@ func (r *LogExportRunner) readLines(dataSourceTag string) []Data {
tags = MergeExtraInfoTags(r.meta, tags)
tags["lst"] = curTimeStr
if len(tags) > 0 {
datas = addTagsToData(tags, datas, r.Name())
datas = AddTagsToData(tags, datas, r.Name())
}

// 把 source 加到 data 里,前提是认为 []line 变成 []data 以后是一一对应的,一旦错位就不加
Expand Down Expand Up @@ -1009,20 +1009,6 @@ func addSourceToData(sourceFroms []string, se *StatsError, datas []Data, datasou
return datas
}

func addTagsToData(tags map[string]interface{}, datas []Data, runnername string) []Data {
for j, data := range datas {
for k, v := range tags {
if dt, ok := data[k]; ok {
log.Debugf("Runner[%v] datasource tag already has data %v, ignore %v", runnername, dt, v)
} else {
data[k] = v
}
}
datas[j] = data
}
return datas
}

// Stop 清理所有使用到的资源, 等待10秒尝试读取完毕
// 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。
// Parser 无状态,无需stop。
Expand All @@ -1044,6 +1030,7 @@ func (r *LogExportRunner) Stop() {

log.Infof("Runner[%v] waiting for Run() stopped signal", r.Name())
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case <-r.exitChan:
log.Warnf("runner %v has been stopped", r.Name())
Expand Down Expand Up @@ -1478,32 +1465,6 @@ func (r *LogExportRunner) StatusBackup() {
}
}

// MergeEnvTags 获取环境变量里的内容
func MergeEnvTags(name string, tags map[string]interface{}) map[string]interface{} {
if name == "" {
return tags
}

envTags := make(map[string]interface{})
if value, exist := os.LookupEnv(name); exist {
err := jsoniter.Unmarshal([]byte(value), &envTags)
if err != nil {
log.Warnf("get env tags unmarshal error: %v", err)
return tags
}
} else {
log.Warnf("env[%s] not exist", name)
}

if tags == nil {
tags = make(map[string]interface{})
}
for k, v := range envTags {
tags[k] = v
}
return tags
}

func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[string]interface{} {
if tags == nil {
tags = make(map[string]interface{})
Expand Down
2 changes: 1 addition & 1 deletion reader/config/config.go
Expand Up @@ -129,7 +129,7 @@ var (
Description: "读取速度限制(readio_limit)",
CheckRegex: "\\d+",
Advance: true,
ToolTip: "读取文件的磁盘限速,填写正整数,单位为MB/s, 默认限速20MB/s",
ToolTip: "读取文件的磁盘限速,填写正整数,单位为MB/s, 默认不限速",
}
OptionHeadPattern = Option{
KeyName: KeyHeadPattern,
Expand Down
12 changes: 8 additions & 4 deletions reader/tailx/tailx.go
Expand Up @@ -183,7 +183,11 @@ func (ar *ActiveReader) Stop() error {
cnt++
//超过3个1s,即3s,就强行退出
if cnt > 3 {
log.Errorf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath)
if !IsSelfRunner(ar.runnerName) {
log.Errorf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath)
} else {
log.Debugf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath)
}
break
}
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -250,7 +254,7 @@ func (ar *ActiveReader) Run() {
continue
}
}
log.Debugf("Runner[%v] %v >>>>>>readcache <%v> linecache <%v>", ar.runnerName, ar.originpath, ar.readcache, string(ar.br.FormMutiLine()))
log.Debugf("Runner[%v] %v >>>>>>readcache <%v> linecache <%v>", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache), string(ar.br.FormMutiLine()))
repeat := 0
for {
if ar.readcache == "" {
Expand All @@ -259,9 +263,9 @@ func (ar *ActiveReader) Run() {
repeat++
if repeat%3000 == 0 {
if !IsSelfRunner(ar.runnerName) {
log.Errorf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, ar.readcache)
log.Errorf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache))
} else {
log.Debugf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, ar.readcache)
log.Debugf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache))
}
}

Expand Down

0 comments on commit abfaf3b

Please sign in to comment.