From abfaf3b21cde49a66cdfe4b48a979684385fcbd6 Mon Sep 17 00:00:00 2001 From: redHJ <489534124@qq.com> Date: Wed, 26 Dec 2018 11:50:52 +0800 Subject: [PATCH] change logkit conf --- .gitignore | 1 - logkit.conf | 9 +- logkit.go | 25 ++-- mgr/cluster_design.md | 2 +- mgr/dataflow.go | 1 + mgr/metric_runner.go | 3 +- mgr/mgr.go | 64 +++++----- mgr/runner.go | 43 +------ reader/config/config.go | 2 +- reader/tailx/tailx.go | 12 +- self/logrunner.go | 238 +++++++++++++------------------------ self/logrunner_test.go | 84 +++++-------- utils/models/models.go | 30 ++--- utils/models/utils.go | 40 +++++++ utils/models/utils_test.go | 13 ++ utils/utils.go | 8 +- 16 files changed, 260 insertions(+), 315 deletions(-) diff --git a/.gitignore b/.gitignore index 7804c95a2..5bc6cee51 100644 --- a/.gitignore +++ b/.gitignore @@ -60,7 +60,6 @@ logkit stats .logkitconfs/ public/.project -parser/grok/patterns.go # logkitWeb ignore # See https://help.github.com/ignore-files/ for more about ignoring files. diff --git a/logkit.conf b/logkit.conf index f87d7d683..cdad0672a 100644 --- a/logkit.conf +++ b/logkit.conf @@ -6,5 +6,12 @@ "profile_host":":3002", "disable_web":false, "static_root_path":"./public", - "confs_path": ["confs*"] + "confs_path": ["confs*"], + "collect_log_enable": false, + "pandora_name": "", + "pandora_ak": "", + "pandora_sk": "", + "pandora_pipeline": "", + "pandora_logdb": "", + "pandora_region": "" } diff --git a/logkit.go b/logkit.go index 57288d0b1..b35ec9740 100644 --- a/logkit.go +++ b/logkit.go @@ -263,17 +263,21 @@ func main() { runtime.GOMAXPROCS(conf.MaxProcs) log.SetOutputLevel(conf.DebugLevel) - stopRotate := make(chan struct{}, 0) + var ( + stopRotate = make(chan struct{}, 0) + logdir, logpattern string + err error + ) defer close(stopRotate) if conf.LogPath != "" { - logdir, logpattern, err := LogDirAndPattern(conf.LogPath) + logdir, logpattern, err = LogDirAndPattern(conf.LogPath) if err != nil { log.Fatal(err) } go loopRotateLogs(filepath.Join(logdir, logpattern), defaultRotateSize, 10*time.Second, stopRotate) conf.CleanSelfPattern = logpattern + "-*" conf.CleanSelfDir = logdir - conf.ManagerConfig.SelfLogSet.LogPath = conf.LogPath + conf.ManagerConfig.CollectLogPath = filepath.Join(logdir, logpattern+"-*") } log.Infof("Welcome to use Logkit, Version: %v \n\nConfig: %#v", NextVersion, conf) @@ -283,6 +287,11 @@ func main() { } m.Version = NextVersion + if m.CollectLogRunner != nil { + go m.CollectLogRunner.Run() + time.Sleep(time.Second) // 等待1秒让收集器启动 + } + paths := getValidPath(conf.ConfsPath) if len(paths) <= 0 { log.Warnf("Cannot read or create any ConfsPath %v", conf.ConfsPath) @@ -292,13 +301,15 @@ func main() { } m.RestoreWebDir() - if m.SelfLogRunner != nil { - go m.SelfLogRunner.Run() - } - stopClean := make(chan struct{}, 0) defer close(stopClean) if conf.CleanSelfLog { + if conf.CleanSelfDir == "" && logdir != "" { + conf.CleanSelfDir = logdir + } + if conf.CleanSelfPattern == "" && logpattern != "" { + conf.CleanSelfPattern = logpattern + "-*" + } go loopCleanLogkitLog(conf.CleanSelfDir, conf.CleanSelfPattern, conf.CleanSelfLogCnt, conf.CleanSelfDuration, stopClean) } if len(conf.BindHost) > 0 { diff --git a/mgr/cluster_design.md b/mgr/cluster_design.md index abadacd5f..d21fd4a5c 100644 --- a/mgr/cluster_design.md +++ b/mgr/cluster_design.md @@ -46,7 +46,7 @@ logkit 的 cluster 功能配置非常简单,只需要在 logkit 的主配置 "bind_host":"127.0.0.1:4000", # 选填,默认自己找一个4000以上的可用端口开启 "profile_host":"localhost:6060", # 选填,默认为空,不开启 "clean_self_log":true, # 选填,默认false - "clean_self_dir":"./run", # 选填,clean_self_log 为true时候生效,默认 "./run" + "clean_self_dir":"./", # 选填,clean_self_log 为true时候生效,默认 "./" "clean_self_pattern":"*.log-*", # 选填,clean_self_log 为true时候生效,默认 "*.log-*" "clean_self_cnt":5, # 选填,clean_self_log 为true时候生效,默认 5 "rest_dir":"./.logkitconfs", # 选填,通过web页面存放的logkit配置文件夹,默认为logkit程序运行目录的子目录`.logkitconfs`下 diff --git a/mgr/dataflow.go b/mgr/dataflow.go index 1a359211a..43c2e28c9 100644 --- a/mgr/dataflow.go +++ b/mgr/dataflow.go @@ -98,6 +98,7 @@ func RawData(readerConfig conf.MapConf) ([]string, error) { var rawData []string timeout := time.NewTimer(time.Minute) + defer timeout.Stop() select { case de := <-readChan: rawData, err = de.data, de.lastErr diff --git a/mgr/metric_runner.go b/mgr/metric_runner.go index e94931ecf..d70949865 100644 --- a/mgr/metric_runner.go +++ b/mgr/metric_runner.go @@ -316,7 +316,7 @@ func (r *MetricRunner) Run() { continue } if len(tags) > 0 { - datas = addTagsToData(tags, datas, r.Name()) + datas = AddTagsToData(tags, datas, r.Name()) } r.rsMutex.Lock() r.rs.ReadDataCount += int64(dataCnt) @@ -391,6 +391,7 @@ func (mr *MetricRunner) Stop() { log.Warnf("wait for MetricRunner " + mr.Name() + " stopped") timer := time.NewTimer(time.Second * 10) + defer timer.Stop() select { case <-mr.exitChan: log.Warnf("MetricRunner " + mr.Name() + " has been stopped ") diff --git a/mgr/mgr.go b/mgr/mgr.go index 068e2f231..22ea97dc4 100644 --- a/mgr/mgr.go +++ b/mgr/mgr.go @@ -44,12 +44,15 @@ type ManagerConfig struct { ServerBackup bool `json:"-"` AuditDir string `json:"audit_dir"` - SelfLog + CollectLog } -type SelfLog struct { - SelfLogSet - SelfLogEnable bool `json:"self_log_enable"` +type CollectLog struct { + CollectLogPath string `json:"collect_log_path"` + CollectLogEnable bool `json:"collect_log_enable"` + ReadFrom string `json:"read_from"` + EnvTag string `json:"-"` + Pandora } type cleanQueue struct { @@ -86,7 +89,7 @@ type Manager struct { Version string SystemInfo string - SelfLogRunner *self.LogRunner + CollectLogRunner *self.LogRunner } func NewManager(conf ManagerConfig) (*Manager, error) { @@ -121,33 +124,34 @@ func NewCustomManager(conf ManagerConfig, rr *reader.Registry, pr *parser.Regist if err != nil { return nil, err } - var selfLogRunner *self.LogRunner - if conf.SelfLogEnable { - rdConf := self.SetReaderConfig(self.GetReaderConfig(), conf.LogPath, "", conf.ReadFrom) + var collectLogRunner *self.LogRunner + if conf.CollectLogEnable { + rdConf := self.SetReaderConfig(self.GetReaderConfig(), conf.CollectLogPath, "", conf.ReadFrom) sdConf := self.SetSenderConfig(self.GetSenderConfig(), conf.Pandora) - selfLogRunner, err = self.NewLogRunner(rdConf, self.GetParserConfig(), self.GetTransformerConfig(), sdConf) + collectLogRunner, err = self.NewLogRunner(rdConf, self.GetParserConfig(), sdConf, conf.EnvTag) if err != nil { - return nil, err + log.Errorf("new collect log runner failed: %v", err) + err = nil } } m := &Manager{ - ManagerConfig: conf, - cleanLock: new(sync.RWMutex), - watcherMux: new(sync.RWMutex), - cleanChan: make(chan cleaner.CleanSignal), - cleanQueues: make(map[string]*cleanQueue), - runners: make(map[string]Runner), - runnerConfigs: make(map[string]RunnerConfig), - runnerPaths: make(map[string]string), - watchers: make(map[string]*fsnotify.Watcher), - rregistry: rr, - pregistry: pr, - sregistry: sr, - SystemInfo: utilsos.GetOSInfo().String(), - audit: audt, - auditChan: make(chan audit.Message, 100), - SelfLogRunner: selfLogRunner, + ManagerConfig: conf, + cleanLock: new(sync.RWMutex), + watcherMux: new(sync.RWMutex), + cleanChan: make(chan cleaner.CleanSignal), + cleanQueues: make(map[string]*cleanQueue), + runners: make(map[string]Runner), + runnerConfigs: make(map[string]RunnerConfig), + runnerPaths: make(map[string]string), + watchers: make(map[string]*fsnotify.Watcher), + rregistry: rr, + pregistry: pr, + sregistry: sr, + SystemInfo: utilsos.GetOSInfo().String(), + audit: audt, + auditChan: make(chan audit.Message, 100), + CollectLogRunner: collectLogRunner, } return m, nil } @@ -174,8 +178,8 @@ func (m *Manager) Stop() error { //在所有runner close以后,就保证了不会有audit message发送到Channel里 close(m.auditChan) - if m.SelfLogRunner != nil { - m.SelfLogRunner.Stop() + if m.CollectLogRunner != nil { + m.CollectLogRunner.Stop() } return nil } @@ -823,8 +827,8 @@ func (m *Manager) UpdateToken(tokens []AuthTokens) (err error) { errMsg := make([]string, 0) for _, token := range tokens { runnerPath := token.RunnerName - if strings.HasPrefix(runnerPath, DefaultInternalPrefix) && m.SelfLogRunner != nil { - m.SelfLogRunner.TokenRefresh(token) + if strings.HasPrefix(runnerPath, DefaultInternalPrefix) && m.CollectLogRunner != nil { + m.CollectLogRunner.TokenRefresh(token) continue } diff --git a/mgr/runner.go b/mgr/runner.go index ac61ab562..cc546aa71 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -743,7 +743,7 @@ func (r *LogExportRunner) readLines(dataSourceTag string) []Data { tags = MergeExtraInfoTags(r.meta, tags) tags["lst"] = curTimeStr if len(tags) > 0 { - datas = addTagsToData(tags, datas, r.Name()) + datas = AddTagsToData(tags, datas, r.Name()) } // 把 source 加到 data 里,前提是认为 []line 变成 []data 以后是一一对应的,一旦错位就不加 @@ -1009,20 +1009,6 @@ func addSourceToData(sourceFroms []string, se *StatsError, datas []Data, datasou return datas } -func addTagsToData(tags map[string]interface{}, datas []Data, runnername string) []Data { - for j, data := range datas { - for k, v := range tags { - if dt, ok := data[k]; ok { - log.Debugf("Runner[%v] datasource tag already has data %v, ignore %v", runnername, dt, v) - } else { - data[k] = v - } - } - datas[j] = data - } - return datas -} - // Stop 清理所有使用到的资源, 等待10秒尝试读取完毕 // 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。 // Parser 无状态,无需stop。 @@ -1044,6 +1030,7 @@ func (r *LogExportRunner) Stop() { log.Infof("Runner[%v] waiting for Run() stopped signal", r.Name()) timer := time.NewTimer(time.Second * 10) + defer timer.Stop() select { case <-r.exitChan: log.Warnf("runner %v has been stopped", r.Name()) @@ -1478,32 +1465,6 @@ func (r *LogExportRunner) StatusBackup() { } } -// MergeEnvTags 获取环境变量里的内容 -func MergeEnvTags(name string, tags map[string]interface{}) map[string]interface{} { - if name == "" { - return tags - } - - envTags := make(map[string]interface{}) - if value, exist := os.LookupEnv(name); exist { - err := jsoniter.Unmarshal([]byte(value), &envTags) - if err != nil { - log.Warnf("get env tags unmarshal error: %v", err) - return tags - } - } else { - log.Warnf("env[%s] not exist", name) - } - - if tags == nil { - tags = make(map[string]interface{}) - } - for k, v := range envTags { - tags[k] = v - } - return tags -} - func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[string]interface{} { if tags == nil { tags = make(map[string]interface{}) diff --git a/reader/config/config.go b/reader/config/config.go index ec80ecc17..3a99fe1d6 100644 --- a/reader/config/config.go +++ b/reader/config/config.go @@ -129,7 +129,7 @@ var ( Description: "读取速度限制(readio_limit)", CheckRegex: "\\d+", Advance: true, - ToolTip: "读取文件的磁盘限速,填写正整数,单位为MB/s, 默认限速20MB/s", + ToolTip: "读取文件的磁盘限速,填写正整数,单位为MB/s, 默认不限速", } OptionHeadPattern = Option{ KeyName: KeyHeadPattern, diff --git a/reader/tailx/tailx.go b/reader/tailx/tailx.go index 59912805e..32ad4513a 100644 --- a/reader/tailx/tailx.go +++ b/reader/tailx/tailx.go @@ -183,7 +183,11 @@ func (ar *ActiveReader) Stop() error { cnt++ //超过3个1s,即3s,就强行退出 if cnt > 3 { - log.Errorf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath) + if !IsSelfRunner(ar.runnerName) { + log.Errorf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath) + } else { + log.Debugf("Runner[%v] ActiveReader %s was not closed after 3s, force closing it", ar.runnerName, ar.originpath) + } break } time.Sleep(1 * time.Second) @@ -250,7 +254,7 @@ func (ar *ActiveReader) Run() { continue } } - log.Debugf("Runner[%v] %v >>>>>>readcache <%v> linecache <%v>", ar.runnerName, ar.originpath, ar.readcache, string(ar.br.FormMutiLine())) + log.Debugf("Runner[%v] %v >>>>>>readcache <%v> linecache <%v>", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache), string(ar.br.FormMutiLine())) repeat := 0 for { if ar.readcache == "" { @@ -259,9 +263,9 @@ func (ar *ActiveReader) Run() { repeat++ if repeat%3000 == 0 { if !IsSelfRunner(ar.runnerName) { - log.Errorf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, ar.readcache) + log.Errorf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache)) } else { - log.Debugf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, ar.readcache) + log.Debugf("Runner[%v] %v ActiveReader has timeout 3000 times with readcache %v", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache)) } } diff --git a/self/logrunner.go b/self/logrunner.go index 65407a8ab..9e455ce49 100644 --- a/self/logrunner.go +++ b/self/logrunner.go @@ -6,16 +6,14 @@ import ( "io" "os" "path/filepath" + "regexp" "runtime/debug" "strings" "sync/atomic" "time" - "github.com/json-iterator/go" - "github.com/qiniu/log" "github.com/qiniu/pandora-go-sdk/base/config" - "github.com/qiniu/pandora-go-sdk/base/reqerr" "github.com/qiniu/logkit/conf" "github.com/qiniu/logkit/parser" @@ -26,8 +24,6 @@ import ( "github.com/qiniu/logkit/reader/tailx" "github.com/qiniu/logkit/sender" "github.com/qiniu/logkit/sender/pandora" - "github.com/qiniu/logkit/transforms" - "github.com/qiniu/logkit/transforms/mutate" "github.com/qiniu/logkit/utils" . "github.com/qiniu/logkit/utils/models" ) @@ -35,55 +31,50 @@ import ( const ( DefaultSendTime = 3 DefaultSelfLogRepoName = "logkit_self_log" + DebugPattern = `^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[DEBUG\]` ) var ( + debugRegex = regexp.MustCompile(DebugPattern) readerConfig = conf.MapConf{ "runner_name": DefaultSelfRunnerName, "name": DefaultSelfRunnerName, "mode": "tailx", "log_path": "", - "read_from": "oldest", + "read_from": WhenceNewest, "encoding": "UTF-8", "datasource_tag": "datasource", "expire": "0s", "submeta_expire": "0s", + "head_pattern": `^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(WARN)|(INFO)|(ERROR)]|(DEBUG)\])`, } parserConfig = conf.MapConf{ "type": "raw", "name": "parser", "disable_record_errdata": "true", } - transformerConfig = conf.MapConf{ - "key": "raw", - "remove_pattern": ".*\\[DEBUG\\]\\[.*", - "type": "filter", - } senderConfig = conf.MapConf{ - "sender_type": "pandora", - "pandora_workflow_name": DefaultSelfLogRepoName, - "pandora_repo_name": DefaultSelfLogRepoName, - "logkit_send_time": "true", - "pandora_region": "nb", - "pandora_host": config.DefaultPipelineEndpoint, - "pandora_schema_free": "true", - "pandora_enable_logdb": "true", - "pandora_logdb_host": config.DefaultLogDBEndpoint, - "pandora_enable_tsdb": "false", - "pandora_tsdb_host": config.DefaultTSDBEndpoint, - "pandora_enable_kodo": "false", - "pandora_kodo_low_frequency_file": "false", - "pandora_gzip": "true", - "pandora_uuid": "false", - "pandora_withip": "false", - "force_microsecond": "false", - "pandora_force_convert": "false", - "number_use_float": "true", - "ignore_invalid_field": "true", - "pandora_auto_convert_date": "false", - "pandora_unescape": "true", - "insecure_server": "false", - "runner_name": DefaultSelfRunnerName, + "sender_type": "pandora", + "pandora_workflow_name": DefaultSelfLogRepoName, + "pandora_repo_name": DefaultSelfLogRepoName, + "logkit_send_time": "true", + "pandora_region": "nb", + "pandora_host": config.DefaultPipelineEndpoint, + "pandora_schema_free": "true", + "pandora_enable_logdb": "true", + "pandora_logdb_host": config.DefaultLogDBEndpoint, + "pandora_gzip": "true", + "pandora_uuid": "false", + "pandora_withip": "false", + "force_microsecond": "false", + "pandora_force_convert": "false", + "number_use_float": "true", + "ignore_invalid_field": "true", + "pandora_auto_convert_date": "false", + "pandora_unescape": "true", + "insecure_server": "false", + "runner_name": DefaultSelfRunnerName, + "pandora_description": SelfLogAutoCreateDescription, } ) @@ -93,30 +84,27 @@ type LogRunner struct { transformerConfig conf.MapConf senderConfig conf.MapConf - reader reader.Reader - parser parser.Parser - transform transforms.Transformer - sender sender.Sender - meta *reader.Meta + reader reader.Reader + parser parser.Parser + sender sender.Sender + meta *reader.Meta batchLen int64 batchSize int64 lastSend time.Time + envTag string stopped int32 exitChan chan struct{} } -func NewLogRunner(rdConf, psConf, tsConf, sdConf conf.MapConf) (*LogRunner, error) { +func NewLogRunner(rdConf, psConf, sdConf conf.MapConf, envTag string) (*LogRunner, error) { if rdConf == nil { rdConf = conf.DeepCopy(readerConfig) } if psConf == nil { psConf = conf.DeepCopy(parserConfig) } - if tsConf == nil { - tsConf = conf.DeepCopy(transformerConfig) - } if sdConf == nil { sdConf = conf.DeepCopy(senderConfig) } @@ -132,13 +120,12 @@ func NewLogRunner(rdConf, psConf, tsConf, sdConf conf.MapConf) (*LogRunner, erro if err != nil { return nil, fmt.Errorf("get system current workdir error %v", err) } - rdConf["log_path"] = path + "*" + rdConf["log_path"] = path } var ( rd reader.Reader ps parser.Parser - ts transforms.Transformer sd sender.Sender err error ) @@ -158,44 +145,31 @@ func NewLogRunner(rdConf, psConf, tsConf, sdConf conf.MapConf) (*LogRunner, erro if ps, err = raw.NewParser(psConf); err != nil { return nil, err } - ts = &mutate.Filter{} - bts, err := jsoniter.Marshal(tsConf) - if err != nil { - return nil, errors.New("type filter of transformer marshal config error " + err.Error()) - } - err = jsoniter.Unmarshal(bts, ts) - if err != nil { - return nil, errors.New("type filter of transformer unmarshal config error " + err.Error()) - } - //transformer初始化 - if trans, ok := ts.(transforms.Initializer); ok { - err = trans.Init() - if err != nil { - return nil, errors.New("type filter of transformer init error " + err.Error()) - } - } if sd, err = pandora.NewSender(sdConf); err != nil { return nil, err } + // ft sender + if sd, err = sender.NewFtSender(sd, sdConf, meta.FtSaveLogPath()); err != nil { + return nil, err + } - return NewLogRunnerWithService(rdConf, psConf, tsConf, psConf, meta, rd, ps, ts, sd), nil + return NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, envTag), nil } -func NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf conf.MapConf, - meta *reader.Meta, rd reader.Reader, ps parser.Parser, ts transforms.Transformer, sd sender.Sender) *LogRunner { +func NewLogRunnerWithService(rdConf, psConf, sdConf conf.MapConf, + meta *reader.Meta, rd reader.Reader, ps parser.Parser, sd sender.Sender, envTag string) *LogRunner { return &LogRunner{ - readerConfig: rdConf, - parserConfig: psConf, - transformerConfig: tsConf, - senderConfig: sdConf, - - reader: rd, - parser: ps, - transform: ts, - sender: sd, - meta: meta, - exitChan: make(chan struct{}), + readerConfig: rdConf, + parserConfig: psConf, + senderConfig: sdConf, + envTag: envTag, + + reader: rd, + parser: ps, + sender: sd, + meta: meta, + exitChan: make(chan struct{}), } } @@ -217,8 +191,6 @@ func (lr *LogRunner) Run() { } }() - var err error - for { if atomic.LoadInt32(&lr.stopped) > 0 { log.Debugf("Runner[%s] exited from run", lr.Name()) @@ -236,20 +208,11 @@ func (lr *LogRunner) Run() { continue } - datas, err = lr.transform.Transform(datas) - if err != nil { - log.Debugf("Runner[%s] transform failed: %v", lr.Name(), err) - } - // send data log.Debugf("Runner[%s] reader %s start to send at: %v", lr.Name(), lr.reader.Name(), time.Now().Format(time.RFC3339)) - success := true - if !lr.trySend(lr.sender, datas) { - success = false - log.Errorf("Runner[%s] failed to send data finally", lr.Name()) - break - } - if success { + if err := lr.sender.Send(datas); err != nil { + log.Debugf("Runner[%s] failed to send data finally, error: %v", lr.Name(), err) + } else { lr.reader.SyncMeta() } log.Debugf("Runner[%s] send %s finish to send at: %v", lr.Name(), lr.reader.Name(), time.Now().Format(time.RFC3339)) @@ -257,8 +220,8 @@ func (lr *LogRunner) Run() { return } -func (lr *LogRunner) Stop() { - log.Infof("Runner[%s] wait for reader %v to stop", lr.Name(), lr.reader.Name()) +func (lr *LogRunner) Stop() error { + log.Debugf("Runner[%s] wait for reader %v to stop", lr.Name(), lr.reader.Name()) var errJoin string err := lr.reader.Close() if err != nil { @@ -267,8 +230,9 @@ func (lr *LogRunner) Stop() { } atomic.StoreInt32(&lr.stopped, 1) - log.Infof("Runner[%s] waiting for Run() stopped signal", lr.Name()) + log.Debugf("Runner[%s] waiting for Run() stopped signal", lr.Name()) timer := time.NewTimer(time.Second * 10) + defer timer.Stop() select { case <-lr.exitChan: log.Debugf("runner %s has been stopped", lr.Name()) @@ -277,13 +241,23 @@ func (lr *LogRunner) Stop() { atomic.StoreInt32(&lr.stopped, 1) } - log.Infof("Runner[%s] wait for sender %v to stop", lr.Name(), lr.reader.Name()) + log.Debugf("Runner[%s] wait for sender %v to stop", lr.Name(), lr.reader.Name()) if err := lr.sender.Close(); err != nil { log.Debugf("Runner[%v] cannot close sender name: %s, err: %v", lr.Name(), lr.sender.Name(), err) + errJoin += "\n" + err.Error() + } + + if err = lr.meta.Delete(); err != nil { + log.Debugf("Runner[%v] cannot delete meta, err: %v", lr.Name(), err) + errJoin += "\n" + err.Error() } + if errJoin != "" { + return errors.New(errJoin) + } log.Infof("Runner[%s] stopped successfully", lr.Name()) + return nil } func (lr *LogRunner) Name() string { @@ -300,7 +274,7 @@ func (lr *LogRunner) readLines() []Data { 0, DefaultMaxBatchSize, DefaultSendIntervalSeconds) { line, err = lr.reader.ReadLine() if os.IsNotExist(err) { - log.Errorf("Runner[%s] reader %s - error: %v, sleep 3 second...", lr.Name(), lr.reader.Name(), err) + log.Debugf("Runner[%s] reader %s - error: %v, sleep 3 second...", lr.Name(), lr.reader.Name(), err) time.Sleep(3 * time.Second) break } @@ -315,6 +289,12 @@ func (lr *LogRunner) readLines() []Data { continue } + if debugRegex.MatchString(line) { + continue + } + if strings.Contains(line, DefaultSelfRunnerName) { + continue + } lines = append(lines, line) lr.batchLen++ @@ -336,49 +316,14 @@ func (lr *LogRunner) readLines() []Data { if err != nil { log.Debugf("Runner[%s] parser %s error: %v ", lr.Name(), lr.parser.Name(), err) } - return datas -} - -// trySend 尝试发送数据,如果此时runner退出返回false,其他情况无论是达到最大重试次数还是发送成功,都返回true -func (lr *LogRunner) trySend(s sender.Sender, datas []Data) bool { - if len(datas) <= 0 { - return true - } - - var ( - err error - cnt = 1 - ) - - for { - // 至少尝试一次。如果任务已经停止,那么只尝试一次 - if cnt > 1 && atomic.LoadInt32(&lr.stopped) > 0 { - return false - } - - err = s.Send(datas) - if err == nil { - break - } - if err == ErrQueueClosed { - log.Debugf("Runner[%s] send to closed queue, discard datas, send error %v, failed datas (length %d): %v", lr.Name(), err, cnt, datas) - break - } - log.Debug(TruncateStrSize(err.Error(), DefaultTruncateMaxSize)) - if sendError, ok := err.(*reqerr.SendError); ok { - datas = sender.ConvertDatas(sendError.GetFailDatas()) + if lr.envTag != "" { + tags := MergeEnvTags(lr.envTag, nil) + if len(tags) > 0 { + datas = AddTagsToData(tags, datas, lr.Name()) } - - if cnt < DefaultSendTime { - cnt++ - continue - } - log.Debugf("Runner[%s] retry send %v times, but still error %v, discard datas %v ... total %d lines", lr.Name(), cnt, err, datas, len(datas)) - break } - - return true + return datas } func (lr *LogRunner) TokenRefresh(token AuthTokens) error { @@ -411,13 +356,7 @@ func SetReaderConfig(readConf conf.MapConf, logpath, metapath, from string) conf rdConf := conf.DeepCopy(readConf) logpath = strings.TrimSpace(logpath) if logpath != "" { - path, err := filepath.Abs(logpath) - if err != nil { - log.Debugf("got logpath[%s] absolute filepath failed: %v", logpath, err) - rdConf["log_path"] = logpath + "*" - } else { - rdConf["log_path"] = path + "*" - } + rdConf["log_path"] = logpath } if metapath != "" { path, err := filepath.Abs(metapath) @@ -440,7 +379,7 @@ func SetReaderConfig(readConf conf.MapConf, logpath, metapath, from string) conf rdConf["read_from"] = WhenceNewest default: log.Debugf("reader from %s unsupported", from) - rdConf["read_from"] = WhenceOldest + rdConf["read_from"] = WhenceNewest } return rdConf @@ -448,12 +387,12 @@ func SetReaderConfig(readConf conf.MapConf, logpath, metapath, from string) conf func SetSenderConfig(sendConf conf.MapConf, pandora Pandora) conf.MapConf { sdConf := conf.DeepCopy(sendConf) - logDBHost := strings.TrimSpace(pandora.Logdb) + logDBHost := strings.TrimSpace(pandora.LogDB) if logDBHost != "" { sdConf["pandora_logdb_host"] = logDBHost } - pandoraHost := strings.TrimSpace(pandora.Pipline) + pandoraHost := strings.TrimSpace(pandora.Pipeline) if pandoraHost != "" { sdConf["pandora_host"] = pandoraHost } @@ -463,11 +402,6 @@ func SetSenderConfig(sendConf conf.MapConf, pandora Pandora) conf.MapConf { sdConf["pandora_region"] = pandoraRegion } - tsdbHost := strings.TrimSpace(pandora.Tsdb) - if tsdbHost != "" { - sdConf["pandora_tsdb_host"] = tsdbHost - } - name := strings.TrimSpace(pandora.Name) if name != "" { sdConf["pandora_workflow_name"] = name @@ -494,10 +428,6 @@ func GetParserConfig() conf.MapConf { return parserConfig } -func GetTransformerConfig() conf.MapConf { - return transformerConfig -} - func GetSenderConfig() conf.MapConf { return senderConfig } diff --git a/self/logrunner_test.go b/self/logrunner_test.go index 755637a39..568c38c8c 100644 --- a/self/logrunner_test.go +++ b/self/logrunner_test.go @@ -5,6 +5,7 @@ import ( "os" "path" "path/filepath" + "strings" "testing" "time" @@ -22,31 +23,27 @@ import ( "github.com/qiniu/logkit/sender" "github.com/qiniu/logkit/sender/file" "github.com/qiniu/logkit/sender/mock" - "github.com/qiniu/logkit/transforms" - "github.com/qiniu/logkit/transforms/mutate" . "github.com/qiniu/logkit/utils/models" ) func TestLogRunner_Name(t *testing.T) { t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + rdConf, psConf, sdConf, meta, rd, ps, sd := getInfo(t, "", "") + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") assert.EqualValues(t, DefaultSelfRunnerName, logRunner.Name()) } func TestNewLogRunner(t *testing.T) { t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + rdConf, psConf, sdConf, meta, rd, ps, sd := getInfo(t, "", "") + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") assert.NotNil(t, logRunner) assert.NotNil(t, logRunner.meta) assert.NotNil(t, logRunner.reader) assert.NotNil(t, logRunner.parser) - assert.NotNil(t, logRunner.transform) assert.NotNil(t, logRunner.sender) assert.EqualValues(t, rdConf, logRunner.readerConfig) assert.EqualValues(t, parserConfig, logRunner.parserConfig) - assert.EqualValues(t, transformerConfig, logRunner.transformerConfig) assert.EqualValues(t, sdConf, logRunner.senderConfig) } @@ -64,7 +61,7 @@ func TestLogRunner_Run(t *testing.T) { defer os.RemoveAll(dir) readerConfig["stat_interval"] = "1s" - rdConf, psConf, tsConf, _, meta, rd, ps, ts, _ := getInfo(t, logpath, path.Join(dir, "meta")) + rdConf, psConf, _, meta, rd, ps, _ := getInfo(t, logpath, path.Join(dir, "meta")) readerConfig["stat_interval"] = "" sdConf := conf.MapConf{ "name": "TestLogRunner_Run", @@ -75,7 +72,7 @@ func TestLogRunner_Run(t *testing.T) { assert.Nil(t, err) expect1 := "2018/11/12 13:31:33 [ERROR][qiniu.com/logkit-enterprise/vendor/github.com/qiniu/logkit/mgr] metric_runner.go:350: SendError: Cannot send data to pandora\n" - expect2 := "2018/11/13 09:38:51 [WARN][github.com/qiniu/logkit/reader] singlefile.go:85: Runner[] /Users/qiniu/gopath/src/github.com/qiniu/logkit/meta/LogkitInternalSelfLogRunner_962838855/qiniu_logkit_logkit.log-1113093840/file.meta\n" + expect2 := "2018/11/13 09:38:51 [WARN][github.com/qiniu/logkit/reader] singlefile.go:85: Runner[] /Users/qiniu/gopath/src/github.com/qiniu/logkit/meta/a/qiniu_logkit_logkit.log-1113093840/file.meta\n" expect3 := "2018/11/13 09:38:51 [INFO][github.com/qiniu/logkit/reader/tailx] tailx.go:570: Runner[UndefinedRunnerName] statLogPath find new logpath" err = ioutil.WriteFile(logpath, []byte(expect1+ "2018/11/12 13:31:33 [DEBUG][qiniu.com/logkit-enterprise/vendor/github.com/qiniu/logkit/mgr] metric_runner.go:350: SendError: Cannot send data to pandora\n"+ @@ -84,7 +81,7 @@ func TestLogRunner_Run(t *testing.T) { expect3), 0755) assert.Nil(t, err) - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") go logRunner.Run() time.Sleep(5 * time.Second) logRunner.Stop() @@ -106,56 +103,49 @@ func TestLogRunner_Run(t *testing.T) { func TestLogRunner_GetReaderConfig(t *testing.T) { t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + rdConf, psConf, sdConf, meta, rd, ps, sd := getInfo(t, "", "") + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") assert.EqualValues(t, rdConf, logRunner.GetReaderConfig()) } func TestLogRunner_GetParserConfig(t *testing.T) { t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + rdConf, psConf, sdConf, meta, rd, ps, sd := getInfo(t, "", "") + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") assert.EqualValues(t, psConf, logRunner.GetParserConfig()) } -func TestLogRunner_GetTransformConfig(t *testing.T) { - t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) - assert.EqualValues(t, tsConf, logRunner.GetTransformerConfig()) -} - func TestLogRunner_GetSenderConfig(t *testing.T) { t.Parallel() - rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd := getInfo(t, "", "") - logRunner := NewLogRunnerWithService(rdConf, psConf, tsConf, sdConf, meta, rd, ps, ts, sd) + rdConf, psConf, sdConf, meta, rd, ps, sd := getInfo(t, "", "") + logRunner := NewLogRunnerWithService(rdConf, psConf, sdConf, meta, rd, ps, sd, "") assert.EqualValues(t, sdConf, logRunner.GetSenderConfig()) } func TestSetReaderConfig(t *testing.T) { t.Parallel() - path := "TestSetReaderConfig/logkit.log" + path := "TestSetReaderConfig/logkit.log*" rdConf := SetReaderConfig(readerConfig, path, "", "") - logpath, err := filepath.Abs(path) - assert.Nil(t, err) - assert.EqualValues(t, logpath+"*", rdConf["log_path"]) - assert.EqualValues(t, WhenceOldest, rdConf["read_from"]) + if !strings.HasSuffix(path, rdConf["log_path"]) { + t.Fatalf("expect has suffix %v, but got %v", rdConf["log_path"], path) + } + assert.EqualValues(t, WhenceNewest, rdConf["read_from"]) } func TestSetSenderConfig(t *testing.T) { t.Parallel() pandora := Pandora{ - Name: "TestSetSenderConfig_name", - Region: "TestSetSenderConfig_region", - Pipline: "TestSetSenderConfig_pipline", - AK: "TestSetSenderConfig_ak", - SK: "TestSetSenderConfig_sk", + Name: "TestSetSenderConfig_name", + Region: "TestSetSenderConfig_region", + Pipeline: "TestSetSenderConfig_pipline", + AK: "TestSetSenderConfig_ak", + SK: "TestSetSenderConfig_sk", } sdConf := SetSenderConfig(senderConfig, pandora) assert.NotNil(t, sdConf) assert.EqualValues(t, pandora.Name, sdConf["pandora_repo_name"]) assert.EqualValues(t, pandora.Region, sdConf["pandora_region"]) - assert.EqualValues(t, pandora.Pipline, sdConf["pandora_host"]) + assert.EqualValues(t, pandora.Pipeline, sdConf["pandora_host"]) assert.EqualValues(t, pandora.AK, sdConf["pandora_ak"]) assert.EqualValues(t, pandora.SK, sdConf["pandora_sk"]) assert.EqualValues(t, config.DefaultLogDBEndpoint, sdConf["pandora_logdb_host"]) @@ -171,22 +161,16 @@ func TestGetParserConfig(t *testing.T) { assert.EqualValues(t, parserConfig, GetParserConfig()) } -func TestGetTransformerConfig(t *testing.T) { - t.Parallel() - assert.EqualValues(t, transformerConfig, GetTransformerConfig()) -} - func TestGetSenderConfig(t *testing.T) { t.Parallel() assert.EqualValues(t, senderConfig, GetSenderConfig()) } -func getInfo(t *testing.T, logpath, metapath string) (conf.MapConf, conf.MapConf, conf.MapConf, conf.MapConf, - *reader.Meta, reader.Reader, parser.Parser, transforms.Transformer, sender.Sender) { +func getInfo(t *testing.T, logpath, metapath string) (conf.MapConf, conf.MapConf, conf.MapConf, + *reader.Meta, reader.Reader, parser.Parser, sender.Sender) { var ( rd reader.Reader ps parser.Parser - ts transforms.Transformer sd sender.Sender err error ) @@ -194,7 +178,7 @@ func getInfo(t *testing.T, logpath, metapath string) (conf.MapConf, conf.MapConf if logpath == "" { logpath = "TestNewLogRunner/logkit.log" } - rdConf := SetReaderConfig(readerConfig, logpath, metapath, "") + rdConf := SetReaderConfig(readerConfig, logpath, metapath, "oldest") meta, err := reader.NewMetaWithConf(rdConf) assert.Nil(t, err) defer func() { @@ -209,21 +193,11 @@ func getInfo(t *testing.T, logpath, metapath string) (conf.MapConf, conf.MapConf ps, err = raw.NewParser(nil) assert.Nil(t, err) - ts = &mutate.Filter{} - bts, err := jsoniter.Marshal(transformerConfig) - assert.Nil(t, err) - err = jsoniter.Unmarshal(bts, ts) - assert.Nil(t, err) - if trans, ok := ts.(transforms.Initializer); ok { - err = trans.Init() - assert.Nil(t, err) - } - sdConf := conf.MapConf{ "name": "mock_sender", "sender_type": "mock", } sd, err = mock.NewSender(sdConf) assert.Nil(t, err) - return rdConf, parserConfig, transformerConfig, sdConf, meta, rd, ps, ts, sd + return rdConf, parserConfig, sdConf, meta, rd, ps, sd } diff --git a/utils/models/models.go b/utils/models/models.go index ed9117bf8..81e7e097c 100644 --- a/utils/models/models.go +++ b/utils/models/models.go @@ -72,15 +72,16 @@ const ( DATE = "date" DROP = "drop" - DefaultSelfRunnerName = DefaultInternalPrefix + "SelfLogRunner" + DefaultSelfRunnerName = DefaultInternalPrefix + "CollectLogRunner" DefaultInternalPrefix = "LogkitInternal" ) var ( - MaxProcs = 1 - NumCPU = runtime.NumCPU() - LogkitAutoCreateDescription = "由logkit日志收集自动创建" - MetricAutoCreateDescription = "由logkit监控收集自动创建" + MaxProcs = 1 + NumCPU = runtime.NumCPU() + LogkitAutoCreateDescription = "由logkit日志收集自动创建" + MetricAutoCreateDescription = "由logkit监控收集自动创建" + SelfLogAutoCreateDescription = "由logkit收集自身日志创建" // matches named captures that contain a modifier. // ie, @@ -142,20 +143,13 @@ type AuthTokens struct { SenderTokens conf.MapConf } -type SelfLogSet struct { - LogPath string `json:"log_path"` - ReadFrom string `json:"read_from"` - Pandora -} - type Pandora struct { - Name string `json:"pandora_name"` - Region string `json:"pandora_region"` - Pipline string `json:"pandora_pipeline"` - Logdb string `json:"pandora_logdb"` - Tsdb string `json:"pandora_tsdb"` - AK string `json:"pandora_ak"` - SK string `json:"pandora_sk"` + Name string `json:"pandora_name"` + Region string `json:"pandora_region"` + Pipeline string `json:"pandora_pipeline"` + LogDB string `json:"pandora_logdb"` + AK string `json:"pandora_ak"` + SK string `json:"pandora_sk"` } type LagInfo struct { diff --git a/utils/models/utils.go b/utils/models/utils.go index 865a54475..fbc2e4d37 100644 --- a/utils/models/utils.go +++ b/utils/models/utils.go @@ -986,3 +986,43 @@ func IsFileModified(path string, interval time.Duration, compare time.Time) bool func IsSelfRunner(runnerName string) bool { return strings.HasPrefix(runnerName, DefaultSelfRunnerName) } + +// MergeEnvTags 获取环境变量里的内容 +func MergeEnvTags(name string, tags map[string]interface{}) map[string]interface{} { + if name == "" { + return tags + } + + envTags := make(map[string]interface{}) + if value, exist := os.LookupEnv(name); exist { + err := jsoniter.Unmarshal([]byte(value), &envTags) + if err != nil { + log.Warnf("get env tags unmarshal error: %v", err) + return tags + } + } else { + log.Warnf("env[%s] not exist", name) + } + + if tags == nil { + tags = make(map[string]interface{}) + } + for k, v := range envTags { + tags[k] = v + } + return tags +} + +func AddTagsToData(tags map[string]interface{}, datas []Data, runnername string) []Data { + for j, data := range datas { + for k, v := range tags { + if dt, ok := data[k]; ok { + log.Debugf("Runner[%v] datasource tag already has data %v, ignore %v", runnername, dt, v) + } else { + data[k] = v + } + } + datas[j] = data + } + return datas +} diff --git a/utils/models/utils_test.go b/utils/models/utils_test.go index 9d0487b92..cc4628126 100644 --- a/utils/models/utils_test.go +++ b/utils/models/utils_test.go @@ -1300,3 +1300,16 @@ func TestParseTimeZoneOffset(t *testing.T) { assert.Equal(t, ti.exp, got) } } + +func TestMergeEnvTags(t *testing.T) { + key := "TestMergeEnvTags" + os.Setenv(key, `{"a":"hello"}`) + defer os.Unsetenv(key) + tags := MergeEnvTags(key, nil) + assert.Equal(t, map[string]interface{}{"a": "hello"}, tags) + + os.Setenv(key, `{"b":"123","c":"nihao"}`) + tags = MergeEnvTags(key, tags) + assert.Equal(t, map[string]interface{}{"a": "hello", "b": "123", "c": "nihao"}, tags) + +} diff --git a/utils/utils.go b/utils/utils.go index 9d06b1501..e93cf3f38 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -8,6 +8,8 @@ import ( "github.com/json-iterator/go" "github.com/qiniu/log" + + "github.com/qiniu/logkit/utils/models" ) // IsExist checks whether a file or directory exists. @@ -63,7 +65,11 @@ func BatchFullOrTimeout(runnerName string, stopped *int32, batchLen, batchSize i } // 如果任务已经停止 if atomic.LoadInt32(stopped) > 0 { - log.Warnf("Runner[%v] meet the stopped signal", runnerName) + if !models.IsSelfRunner(runnerName) { + log.Warnf("Runner[%v] meet the stopped signal", runnerName) + } else { + log.Debugf("Runner[%v] meet the stopped signal", runnerName) + } return true } return false