Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
497 lines (439 sloc) 11.4 KB
// Logger implementation for WCG.
//
// Copyright (C) 2014 Yohei Sasaki
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package wcg
import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
type LogRecord struct {
Level *LogLevel `json:"level"`
Timestamp time.Time `json:"timestamp"`
SourceFile string `json:"source_file"`
SourceLine int `json:"source_line"`
SourceStack []string `json:"source_stack"`
Goroutine int `json:"goroutine"`
RequestId string `json:"request_id,omitempty"`
SessionId string `json:"session_id,omitempty"`
UserId string `json:"user_id,omitempty"`
Data interface{} `json:"data,omitempty"`
Text string `json:"text,omitempty"`
Request *Request
}
type Logger interface {
Trace(string, ...interface{})
TraceData(interface{})
Debug(string, ...interface{})
DebugData(interface{})
Info(string, ...interface{})
InfoData(interface{})
Warn(string, ...interface{})
WarnData(interface{})
Error(string, ...interface{})
ErrorData(interface{})
Fatal(string, ...interface{})
FatalData(interface{})
}
type LogSink interface {
Open() error
Write(*LogRecord) error
Close() error
AsyncSupport() bool
}
type LogRecordFormatter struct {
*CompiledFormatter
}
var logrecordRepl = func(varname string, v ...interface{}) []byte {
r := v[0].(*LogRecord)
var s string
switch varname {
case "$TIMESTAMP":
s = r.Timestamp.Format("2006/01/02 15:04:05 -0700")
case "$URL":
u := r.Request.URL()
if u.RawQuery != "" {
s = fmt.Sprintf("%s?%s", u.Path, u.RawQuery)
} else {
s = r.Request.URL().Path
}
case "$LOGLEVEL":
s = r.Level.Str
case "$SOURCE_FILE":
s = r.SourceFile
case "$SOURCE_LINE":
s = strconv.Itoa(r.SourceLine)
case "$GOROUTINE":
s = strconv.Itoa(r.Goroutine)
case "$USER_ID":
s = r.UserId
case "$SESSION_ID":
s = r.SessionId
case "$REQUEST_ID":
s = r.RequestId
case "$TEXT":
s = r.Text
default:
s = ""
}
return []byte(s)
}
// LogSinkManager is an struct that manage concurrency
// for forwarding logs to LogSink.
type LogSinkManager struct {
level *LogLevel
sink LogSink
chSize int
forwardCh chan *LogRecord
wg *sync.WaitGroup
stopCh chan (chan bool)
stopping bool
stopped bool
}
func NewLogSinkManager(l *LogLevel, s LogSink, chSize int) *LogSinkManager {
return &LogSinkManager{
level: l,
sink: s,
chSize: chSize,
}
}
// Staring the sink
func (lsm *LogSinkManager) Start() {
if lsm.stopping {
// TODO: we may want to have the capability of reusing the sink.
panic("You are trying to reuse the sink!")
}
openErr := lsm.sink.Open()
lsm.stopCh = make(chan (chan bool))
lsm.forwardCh = make(chan *LogRecord, lsm.chSize)
lsm.wg = new(sync.WaitGroup)
go func() {
var ack (chan bool)
if openErr != nil {
for !lsm.stopping {
select {
case <-lsm.forwardCh: // discard all records since sink is unavailable.
case ack = <-lsm.stopCh:
lsm.stopping = true
}
}
} else {
for !lsm.stopping {
select {
case r := <-lsm.forwardCh:
lsm.sink.Write(r)
case ack = <-lsm.stopCh:
lsm.stopping = true
}
}
}
lsm.wg.Wait()
// now all forwarding thread do not write forwardCh.
// so we can start to shutdown.
go func() {
// waiting for all forwarding thread to be completed.
// final message would be nil.
// This channel write might be blocked if channel size is 0 (sync mode), so that
// we spawn another channel.
lsm.forwardCh <- nil
}()
// read all remaing records buffered in forwardCh.
for r := range lsm.forwardCh {
if r == nil {
break
}
lsm.sink.Write(r)
}
// send ack to stop caller.
ack <- true
}()
}
// Forward logs to a goroutine that write logs to sink via channel.
// the log records forwarded after Stop() would be lost
func (lsm *LogSinkManager) Forward(r *LogRecord) {
if lsm.sink.AsyncSupport() {
if !lsm.stopping {
lsm.wg.Add(1)
lsm.forwardCh <- r
lsm.wg.Done()
}
} else {
lsm.sink.Write(r)
}
}
// Stop the log sink forwarding.
func (lsm *LogSinkManager) Stop() {
ack := make(chan bool)
lsm.stopCh <- ack
<-ack
lsm.stopped = true
lsm.sink.Close()
close(lsm.stopCh)
close(lsm.forwardCh)
}
// LogLevel
type LogLevel struct {
Level int `json:"level"`
Str string `json:"Str"`
}
var LogLevelTrace = &LogLevel{Level: 0, Str: "TRACE"}
var LogLevelDebug = &LogLevel{Level: 1, Str: "DEBUG"}
var LogLevelInfo = &LogLevel{Level: 2, Str: "INFO"}
var LogLevelWarn = &LogLevel{Level: 3, Str: "WARN"}
var LogLevelError = &LogLevel{Level: 4, Str: "ERROR"}
var LogLevelFatal = &LogLevel{Level: 5, Str: "FATAL"}
type trace struct {
request *Request
cfg *logConfig
}
func (t *trace) Trace(str string, v ...interface{}) {
t.log(t.text(LogLevelTrace, str, v...))
}
func (t *trace) TraceData(d interface{}) {
t.log(t.data(LogLevelTrace, d))
}
func (t *trace) Debug(str string, v ...interface{}) {
t.log(t.text(LogLevelDebug, str, v...))
}
func (t *trace) DebugData(d interface{}) {
t.log(t.data(LogLevelDebug, d))
}
func (t *trace) Info(str string, v ...interface{}) {
t.log(t.text(LogLevelInfo, str, v...))
}
func (t *trace) InfoData(d interface{}) {
t.log(t.data(LogLevelInfo, d))
}
func (t *trace) Warn(str string, v ...interface{}) {
t.log(t.text(LogLevelWarn, str, v...))
}
func (t *trace) WarnData(d interface{}) {
t.log(t.data(LogLevelWarn, d))
}
func (t *trace) Error(str string, v ...interface{}) {
t.log(t.text(LogLevelError, str, v...))
}
func (t *trace) ErrorData(d interface{}) {
t.log(t.data(LogLevelError, d))
}
func (t *trace) Fatal(str string, v ...interface{}) {
t.log(t.text(LogLevelFatal, str, v...))
}
func (t *trace) FatalData(d interface{}) {
t.log(t.data(LogLevelFatal, d))
}
func (t *trace) text(l *LogLevel, str string, v ...interface{}) *LogRecord {
return &LogRecord{
Level: l, Text: fmt.Sprintf(str, v...),
}
}
func (t *trace) data(l *LogLevel, d interface{}) *LogRecord {
return &LogRecord{
Level: l, Data: d,
}
}
// configure LogRecord attributes and pass it to channels of sinks.
func (t *trace) log(r *LogRecord) {
r.Request = t.request
r.Timestamp = time.Now()
// Set sourcefile and sourceline info.
// This function would be called by public function like Trace(), Debug(),...
// so that 2 levels upper caller would be actual file and line.
_, file, line, ok := runtime.Caller(2)
if ok {
r.SourceFile = file
r.SourceLine = line
} else {
// OK for testing not covered here
r.SourceFile = "unknown"
r.SourceLine = 0
}
r.Goroutine = runtime.NumGoroutine()
// TODO: stack
if r.Level.Level >= LogConfig.CapctureStack.Level {
// TODO: Capture all stacks?
buff := make([]byte, 512*10)
runtime.Stack(buff, false)
r.SourceStack = strings.Split(string(buff), "\n")
}
if t.request != nil {
r.RequestId = t.request.RequestId
if t.request.Session != nil {
r.SessionId = t.request.Session.Id
}
if t.request.User != nil {
r.UserId = t.request.User.Id()
}
}
for _, lsm := range t.cfg.lsms {
lsm.Forward(r)
}
}
type debug struct {
trace
}
func (*debug) Trace(string, ...interface{}) {}
func (*debug) TraceData(interface{}) {}
type info struct {
debug
}
func (*info) Debug(string, ...interface{}) {}
func (*info) DebugData(interface{}) {}
type warn struct {
info
}
func (*warn) Info(string, ...interface{}) {}
func (*warn) InfoData(interface{}) {}
type error_ struct {
warn
}
func (*error_) Warn(string, ...interface{}) {}
func (*error_) WarnData(interface{}) {}
type fatal struct {
error_
}
func (*fatal) Error(string, ...interface{}) {}
func (*fatal) ErrorData(interface{}) {}
type logConfig struct {
Level *LogLevel `ini:"level" default:"info"`
CapctureStack *LogLevel `ini:"capture_stack" default:"error"`
ChSize int `ini:"channel_size" default:"65536"`
WaitForSinkClosing time.Duration `ini:"close_wait" default:"500ms"`
lsms []*LogSinkManager
sync.Mutex
}
// Add a new log sink and start logging on the sink.
func (cfg *logConfig) AddSink(s LogSink, level *LogLevel) {
// OK for testing not covered here
lsm := NewLogSinkManager(level, s, cfg.ChSize)
lsm.Start()
cfg.Lock()
cfg.lsms = append(cfg.lsms, lsm)
cfg.Unlock()
}
func (cfg *logConfig) NumSinks() int {
return len(cfg.lsms)
}
// Configure the new list of LogSinks and stop logging on old sinks.
func (cfg *logConfig) ConfigureSinks(sinks ...LogSink) {
// prepare new sinks.
newlsms := make([]*LogSinkManager, 0)
for _, s := range sinks {
lsm := NewLogSinkManager(cfg.Level, s, cfg.ChSize)
lsm.Start()
newlsms = append(newlsms, lsm)
}
cfg.Lock()
oldlsms := cfg.lsms
// add
cfg.lsms = newlsms
cfg.Unlock()
// now all logger reference the new sinks, so we can stop old sinks.
if oldlsms != nil {
go func() {
time.Sleep(cfg.WaitForSinkClosing)
for _, sm := range oldlsms {
sm.Stop()
}
}()
}
}
var loggerfactories = map[*LogLevel](func(*Request, *logConfig) Logger){
LogLevelTrace: func(r *Request, cfg *logConfig) Logger {
l := new(trace)
l.request = r
l.cfg = cfg
return l
},
LogLevelDebug: func(r *Request, cfg *logConfig) Logger {
l := new(debug)
l.request = r
l.cfg = cfg
return l
},
LogLevelInfo: func(r *Request, cfg *logConfig) Logger {
l := new(info)
l.request = r
l.cfg = cfg
return l
},
LogLevelWarn: func(r *Request, cfg *logConfig) Logger {
l := new(warn)
l.request = r
l.cfg = cfg
return l
},
LogLevelError: func(r *Request, cfg *logConfig) Logger {
l := new(error_)
l.request = r
l.cfg = cfg
return l
},
LogLevelFatal: func(r *Request, cfg *logConfig) Logger {
l := new(fatal)
l.request = r
l.cfg = cfg
return l
},
}
// Log Configuration in the process.
var LogConfig = &logConfig{}
func NewLogger(r *Request) Logger {
// returns one of loggers
return loggerfactories[LogConfig.Level](r, LogConfig)
}
// Waiting all logs to be flushed before exiting the process
func WaitLogs() {
for _, lsm := range LogConfig.lsms {
lsm.Stop()
}
}
func ParseLogLevel(levelstr string) (*LogLevel, error) {
switch strings.ToUpper(levelstr) {
case LogLevelTrace.Str:
return LogLevelTrace, nil
case LogLevelDebug.Str:
return LogLevelDebug, nil
case LogLevelInfo.Str:
return LogLevelInfo, nil
case LogLevelWarn.Str:
return LogLevelWarn, nil
case LogLevelError.Str:
return LogLevelError, nil
case LogLevelFatal.Str:
return LogLevelFatal, nil
default:
return nil, fmt.Errorf("No level definition of %q", levelstr)
}
}
func init() {
AddConfigParser(LogLevelError, func(levelstr string) (interface{}, error) {
return ParseLogLevel(levelstr)
})
RegisterProcessConfig(LogConfig, "wcg.logging", nil)
}
const defaultLogFormat = "$TIMESTAMP [$LOGLEVEL] [$GOROUTINE|$USER_ID|$SESSION_ID|$REQUEST_ID] $TEXT ($SOURCE_FILE#$SOURCE_LINE)"
func NewLogRecordFormatter(format string) *LogRecordFormatter {
if format == "" {
format = defaultLogFormat
}
return &LogRecordFormatter{
CompiledFormatter: NewCompiledFormatter(format, logrecordRepl),
}
}