Skip to content
Permalink
Browse files

Upgrade from upstream 497b522583131b5a1ed0c2471663df9d7e24d71d

Signed-off-by: kuba-- <kuba@sourced.tech>
  • Loading branch information...
kuba-- committed Jun 2, 2019
1 parent de3c641 commit c2869138cd7fa7cc1e39c20b68d56355dcea6128
@@ -683,10 +683,8 @@ func (l *Listener) parseClientHandshakePacket(c *Conn, firstTime bool, data []by

// Decode connection attributes send by the client
if clientFlags&CapabilityClientConnAttr != 0 {
var err error
_, _, err = parseConnAttrs(data, pos)
if err != nil {
return "", "", nil, err
if _, _, err := parseConnAttrs(data, pos); err != nil {
log.Warningf("Decode connection attributes send by the client: %v", err)
}
}

@@ -25,6 +25,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"

@@ -40,6 +41,9 @@ var (
// QueryLogFormat controls the format of the query log (either text or json)
QueryLogFormat = flag.String("querylog-format", "text", "format for query logs (\"text\" or \"json\")")

// QueryLogFilterTag contains an optional string that must be present in the query for it to be logged
QueryLogFilterTag = flag.String("querylog-filter-tag", "", "string that must be present in the query for it to be logged")

sendCount = stats.NewCountersWithSingleLabel("StreamlogSend", "stream log send count", "logger_names")
deliveredCount = stats.NewCountersWithMultiLabels(
"StreamlogDelivered",
@@ -201,3 +205,12 @@ func GetFormatter(logger *StreamLogger) LogFormatter {
return fmter.Logf(w, params)
}
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string) bool {
if *QueryLogFilterTag == "" {
return true
}
return strings.Contains(sql, *QueryLogFilterTag)
}
@@ -94,7 +94,7 @@ func registerBaseFlags() {
flag.StringVar(&baseConfig.UnixSocket, "db_socket", "", "The unix socket to connect on. If this is specifed, host and port will not be used.")
flag.StringVar(&baseConfig.Host, "db_host", "", "The host name for the tcp connection.")
flag.IntVar(&baseConfig.Port, "db_port", 0, "tcp port")
flag.StringVar(&baseConfig.Charset, "db_charset", "utf8", "Character set. Only utf8 or latin1 based character sets are supported.")
flag.StringVar(&baseConfig.Charset, "db_charset", "", "Character set. Only utf8 or latin1 based character sets are supported.")
flag.Uint64Var(&baseConfig.Flags, "db_flags", 0, "Flag values as defined by MySQL.")
flag.StringVar(&baseConfig.SslCa, "db_ssl_ca", "", "connection ssl ca")
flag.StringVar(&baseConfig.SslCaPath, "db_ssl_ca_path", "", "connection ssl ca path")
@@ -32,7 +32,7 @@ func TestParsing(t *testing.T) {
"/tmp/something.foo/zkocc.goedel.szopa.test.log.ERROR.20130806-151006.10530"}

for _, filepath := range path {
ts, err := parseTimestamp(filepath)
ts, err := parseCreatedTimestamp(filepath)
if err != nil {
t.Fatalf("parse: %v", err)
}
@@ -43,7 +43,7 @@ func TestParsing(t *testing.T) {
}
}

func TestPurge(t *testing.T) {
func TestPurgeByCtime(t *testing.T) {
logDir := path.Join(os.TempDir(), fmt.Sprintf("%v-%v", os.Args[0], os.Getpid()))
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("os.MkdirAll: %v", err)
@@ -67,18 +67,69 @@ func TestPurge(t *testing.T) {
t.Fatalf("os.Symlink: %v", err)
}

purgeLogsOnce(now, logDir, "zkocc", 30*time.Minute)
purgeLogsOnce(now, logDir, "zkocc", 30*time.Minute, 0)

left, err := filepath.Glob(path.Join(logDir, "zkocc.*"))
if err != nil {
t.Fatalf("filepath.Glob: %v", err)
}

if len(left) != 2 {
// 151006 is still good, 131006 is the "current" log
// (symlinked to zkocc.INFO), the rest should be
// removed.
t.Errorf("wrong number of files remain: want %v, got %v", 2, len(left))
if len(left) != 3 {
// 131006 is current
// 151006 is within 30 min
// symlink remains
// the rest should be removed.
t.Errorf("wrong number of files remain: want %v, got %v", 3, len(left))
}
}

func TestPurgeByMtime(t *testing.T) {
logDir := path.Join(os.TempDir(), fmt.Sprintf("%v-%v", os.Args[0], os.Getpid()))
if err := os.MkdirAll(logDir, 0777); err != nil {
t.Fatalf("os.MkdirAll: %v", err)
}
defer os.RemoveAll(logDir)
createFileWithMtime := func(filename, mtimeStr string) {
var err error
var mtime time.Time
filepath := path.Join(logDir, filename)
if mtime, err = time.Parse(time.RFC3339, mtimeStr); err != nil {
t.Fatalf("time.Parse: %v", err)
}
if _, err = os.Create(filepath); err != nil {
t.Fatalf("os.Create: %v", err)
}
if err = os.Chtimes(filepath, mtime, mtime); err != nil {
t.Fatalf("os.Chtimes: %v", err)
}
}
now := time.Date(2020, 1, 1, 12, 15, 0, 0, time.UTC)
filenameMtimeMap := map[string]string{
"vtadam.localhost.vitess.log.INFO.20200101-120000.00000": "2020-01-01T12:00:00.000Z",
"vtadam.localhost.vitess.log.INFO.20200101-113000.00000": "2020-01-01T11:30:00.000Z",
"vtadam.localhost.vitess.log.INFO.20200101-100000.00000": "2020-01-01T10:00:00.000Z",
"vtadam.localhost.vitess.log.INFO.20200101-090000.00000": "2020-01-01T09:00:00.000Z",
"vtadam.localhost.vitess.log.INFO.20200101-080000.00000": "2020-01-01T08:00:00.000Z",
}
for filename, mtimeStr := range filenameMtimeMap {
createFileWithMtime(filename, mtimeStr)
}
if err := os.Symlink("vtadam.localhost.vitess.log.INFO.20200101-120000.00000", path.Join(logDir, "vtadam.INFO")); err != nil {
t.Fatalf("os.Symlink: %v", err)
}

purgeLogsOnce(now, logDir, "vtadam", 0, 1*time.Hour)

left, err := filepath.Glob(path.Join(logDir, "vtadam.*"))
if err != nil {
t.Fatalf("filepath.Glob: %v", err)
}

if len(left) != 3 {
// 20200101-120000 is current
// 20200101-113000 is within 1 hour
// symlink remains
// rest are removed
t.Errorf("wrong number of files remain: want %v, got %v", 3, len(left))
}
}
@@ -27,13 +27,14 @@ import (
)

var (
keepLogs = flag.Duration("keep_logs", 0*time.Second, "keep logs for this long (zero to keep forever)")
keepLogsByCtime = flag.Duration("keep_logs", 0, "keep logs for this long (using ctime) (zero to keep forever)")
keepLogsByMtime = flag.Duration("keep_logs_by_mtime", 0, "keep logs for this long (using mtime) (zero to keep forever)")
purgeLogsInterval = flag.Duration("purge_logs_interval", 1*time.Hour, "how often try to remove old logs")
)

// parse parses a file name (as used by glog) and returns its process
// name and timestamp.
func parseTimestamp(filename string) (timestamp time.Time, err error) {
func parseCreatedTimestamp(filename string) (timestamp time.Time, err error) {
parts := strings.Split(filepath.Base(filename), ".")
if len(parts) < 6 {
return time.Time{}, fmt.Errorf("malformed logfile name: %v", filename)
@@ -42,14 +43,22 @@ func parseTimestamp(filename string) (timestamp time.Time, err error) {

}

func getModifiedTimestamp(filename string) (timestamp time.Time, err error) {
fileInfo, err := os.Stat(filename)
if err != nil {
return time.Time{}, err
}
return fileInfo.ModTime(), nil
}

var levels = []string{"INFO", "ERROR", "WARNING", "FATAL"}

// purgeLogsOnce removes logfiles for program for dir, if their age
// relative to now is greater than keep.
func purgeLogsOnce(now time.Time, dir, program string, keep time.Duration) {
// relative to now is greater than [cm]timeDelta
func purgeLogsOnce(now time.Time, dir, program string, ctimeDelta time.Duration, mtimeDelta time.Duration) {
current := make(map[string]bool)
for _, level := range levels {
c, err := os.Readlink(path.Join(dir, fmt.Sprintf("%s.%s", program, level)))
c, err := filepath.EvalSymlinks(path.Join(dir, fmt.Sprintf("%s.%s", program, level)))
if err != nil {
continue
}
@@ -64,11 +73,22 @@ func purgeLogsOnce(now time.Time, dir, program string, keep time.Duration) {
if current[file] {
continue
}
created, err := parseTimestamp(file)
if err != nil {
continue
purgeFile := false
if ctimeDelta != 0 {
createdTs, err := parseCreatedTimestamp(file)
if err != nil {
continue
}
purgeFile = purgeFile || now.Sub(createdTs) > ctimeDelta
}
if mtimeDelta != 0 {
modifiedTs, err := getModifiedTimestamp(file)
if err != nil {
continue
}
purgeFile = purgeFile || now.Sub(modifiedTs) > mtimeDelta
}
if now.Sub(created) > keep {
if purgeFile {
os.Remove(file)
}
}
@@ -81,15 +101,13 @@ func PurgeLogs() {
if f == nil {
panic("the logging module doesn't specify a log_dir flag")
}

if *keepLogs == 0*time.Second {
if *keepLogsByCtime == 0 && *keepLogsByMtime == 0 {
return
}
logDir := f.Value.String()
program := filepath.Base(os.Args[0])

timer := time.NewTimer(*purgeLogsInterval)
for range timer.C {
purgeLogsOnce(time.Now(), logDir, program, *keepLogs)
purgeLogsOnce(time.Now(), logDir, program, *keepLogsByCtime, *keepLogsByMtime)
}
}
@@ -27,6 +27,7 @@ import (
"time"

"gopkg.in/src-d/go-vitess.v1/mysql"
"gopkg.in/src-d/go-vitess.v1/vt/log"

"golang.org/x/net/context"
)
@@ -84,6 +85,7 @@ func (mysqld *Mysqld) WaitForReparentJournal(ctx context.Context, timeCreatedNS
t := time.After(100 * time.Millisecond)
select {
case <-ctx.Done():
log.Warning("WaitForReparentJournal failed to see row before timeout.")
return ctx.Err()
case <-t:
}
@@ -83,7 +83,7 @@ import (
var LogErrStacks bool

func init() {
flag.BoolVar(&LogErrStacks, "LogErrStacks", false, "log stack traces in errors")
flag.BoolVar(&LogErrStacks, "log_err_stacks", false, "log stack traces for errors")
}

// New returns an error with the supplied message.
@@ -121,6 +121,10 @@ func (stats *LogStats) RemoteAddrUsername() (string, string) {
// Logf formats the log record to the given writer, either as
// tab-separated list of logged fields or as JSON.
func (stats *LogStats) Logf(w io.Writer, params url.Values) error {
if !streamlog.ShouldEmitLog(stats.SQL) {
return nil
}

formattedBindVars := "\"[REDACTED]\""
if !*streamlog.RedactDebugUIQueries {
_, fullBindParams := params["full"]
@@ -126,6 +126,35 @@ func TestLogStatsFormat(t *testing.T) {
*streamlog.QueryLogFormat = "text"
}

func TestLogStatsFilter(t *testing.T) {
defer func() { *streamlog.QueryLogFilterTag = "" }()

logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)})
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
params := map[string][]string{"full": {}}

got := testFormat(logStats, url.Values(params))
want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "LOG_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\tmap[intVal:type:INT64 value:\"1\" ]\t0\t0\t\"\"\t\n"
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}

*streamlog.QueryLogFilterTag = "NOT_THIS_QUERY"
got = testFormat(logStats, url.Values(params))
want = ""
if got != want {
t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want)
}
}

func TestLogStatsContextHTML(t *testing.T) {
html := "HtmlContext"
callInfo := &fakecallinfo.FakeCallInfo{

0 comments on commit c286913

Please sign in to comment.
You can’t perform that action at this time.