Skip to content

Commit

Permalink
Merge b1d8451 into 5636a40
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Feb 8, 2019
2 parents 5636a40 + b1d8451 commit e67a024
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 155 deletions.
6 changes: 3 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (c *Cache) Update(msg *inetdiag.ParsedMessage) *inetdiag.ParsedMessage {
// It returns all messages that did not have corresponding inodes in the most recent
// batch of messages.
func (c *Cache) EndCycle() map[uint64]*inetdiag.ParsedMessage {
metrics.CacheSizeSummary.Observe(float64(len(c.current)))
metrics.CacheSizeHistogram.Observe(float64(len(c.current)))
tmp := c.previous
c.previous = c.current
// Allocate a bit more than previous size, to accommodate new connections.
// This this will grow and shrink with the number of active connections, but
// This will grow and shrink with the number of active connections, but
// minimize reallocation.
c.current = make(map[uint64]*inetdiag.ParsedMessage, len(c.previous)+len(c.previous)/10+10)
c.cycles++
Expand All @@ -59,6 +59,6 @@ func (c *Cache) EndCycle() map[uint64]*inetdiag.ParsedMessage {

// CycleCount returns the number of times EndCycle() has been called.
func (c *Cache) CycleCount() int64 {
// TODO also add a prometheus counter for this.
// Don't need a prometheus counter, because we already have the count of CacheSizeHistogram observations.
return c.cycles
}
10 changes: 5 additions & 5 deletions inetdiag/socket-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func makeReq(inetType uint8) *nl.NetlinkRequest {
func processSingleMessage(m *syscall.NetlinkMessage, seq uint32, pid uint32) (*syscall.NetlinkMessage, bool, error) {
if m.Header.Seq != seq {
log.Printf("Wrong Seq nr %d, expected %d", m.Header.Seq, seq)
metrics.ErrorCount.With(prometheus.Labels{"source": "wrong seq num"}).Inc()
metrics.ErrorCount.With(prometheus.Labels{"type": "wrong seq num"}).Inc()
return nil, false, ErrBadSequence
}
if m.Header.Pid != pid {
log.Printf("Wrong pid %d, expected %d", m.Header.Pid, pid)
metrics.ErrorCount.With(prometheus.Labels{"source": "wrong pid"}).Inc()
metrics.ErrorCount.With(prometheus.Labels{"type": "wrong pid"}).Inc()
return nil, false, ErrBadPid
}
if m.Header.Type == unix.NLMSG_DONE {
Expand All @@ -74,7 +74,7 @@ func processSingleMessage(m *syscall.NetlinkMessage, seq uint32, pid uint32) (*s
return nil, false, nil
}
log.Println(syscall.Errno(-error))
metrics.ErrorCount.With(prometheus.Labels{"source": "NLMSG_ERROR"}).Inc()
metrics.ErrorCount.With(prometheus.Labels{"type": "NLMSG_ERROR"}).Inc()
}
if m.Header.Flags&unix.NLM_F_MULTI == 0 {
return m, false, nil
Expand All @@ -96,8 +96,8 @@ func OneType(inetType uint8) ([]*syscall.NetlinkMessage, error) {
case syscall.AF_INET6:
af = "ipv6"
}
metrics.FetchTimeMsecSummary.With(prometheus.Labels{"af": af}).Observe(1000 * time.Since(start).Seconds())
metrics.ConnectionCountSummary.With(prometheus.Labels{"af": af}).Observe(float64(len(res)))
metrics.SyscallTimeMsec.With(prometheus.Labels{"af": af}).Observe(1000 * time.Since(start).Seconds())
metrics.ConnectionCountHistogram.With(prometheus.Labels{"af": af}).Observe(float64(len(res)))
}()

req := makeReq(inetType)
Expand Down
23 changes: 21 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"flag"
"log"
"os"
"runtime"
"runtime/trace"
"syscall"
"time"

_ "net/http/pprof"
_ "net/http/pprof" // Support profiling

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -124,6 +125,10 @@ var (
func main() {
// TODO - use flagx.ArgsFromEnv

// Performance instrumentation.
runtime.SetBlockProfileRate(1000000) // 1 sample/msec
runtime.SetMutexProfileFraction(1000)

metrics.SetupPrometheus(*promPort)

p := tcp.TCPDiagnosticsProto{}
Expand Down Expand Up @@ -159,13 +164,27 @@ func main() {
svrChan := make(chan []*inetdiag.ParsedMessage, 2)
go svr.MessageSaverLoop(svrChan)

nextTime := time.Now()

for loops = 0; *reps == 0 || loops < *reps; loops++ {
total, remote := CollectDefaultNamespace(svrChan)
totalCount += total
remoteCount += remote
if loops%10000 == 0 {
// print stats roughly once per minute.
if loops%6000 == 0 {
Stats(svr)
}

// For now, cap rate at 100 Hz.
nextTime = nextTime.Add(10 * time.Millisecond)
// If we get more than 5 msec behind, clamp there, to avoid bursts.
if nextTime.Before(time.Now().Add(5 * time.Millisecond)) {
nextTime = time.Now().Add(5 * time.Millisecond)
}
delay := nextTime.Sub(time.Now())
if delay > 0 {
time.Sleep(delay)
}
}

close(svrChan)
Expand Down
200 changes: 56 additions & 144 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package metrics
import (
"fmt"
"log"
"math"
"net/http"
"net/http/pprof"

Expand Down Expand Up @@ -38,175 +37,88 @@ func SetupPrometheus(promPort int) {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

prometheus.MustRegister(FetchTimeMsecSummary)
prometheus.MustRegister(ConnectionCountSummary)
prometheus.MustRegister(CacheSizeSummary)
prometheus.MustRegister(SyscallTimeMsec)

prometheus.MustRegister(EntryFieldCountHistogram)
prometheus.MustRegister(FileSizeHistogram)
prometheus.MustRegister(RowSizeHistogram)
prometheus.MustRegister(ConnectionCountHistogram)
prometheus.MustRegister(CacheSizeHistogram)

// Common metrics
prometheus.MustRegister(FileCount)
prometheus.MustRegister(NewFileCount)
prometheus.MustRegister(ErrorCount)
prometheus.MustRegister(WarningCount)

port := fmt.Sprintf(":%d", promPort)
log.Println("Exporting prometheus metrics on", port)
go http.ListenAndServe(port, mux)
}

var (
// FetchTimeMsecSummary measures the latency (in msec) to fetch tcp-info records from kernel.
// Provides metrics:
// tcpinfo_Fetch_Time_Msec_Summary
// Example usage:
// metrics.FetchTimeMsecSummary.With(prometheus.Labels{"af": "ipv6"}).observe(float64)
FetchTimeMsecSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "tcpinfo_Fetch_Time_Msec_Summary",
Help: "The total time to fetch tcp-info records, in milliseconds.",
}, []string{"af"})
// SyscallTimeMsec tracks the latency in the syscall. It does NOT include
// the time to process the netlink messages.
SyscallTimeMsec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tcpinfo_syscall_time_msec",
Help: "netlink syscall latency distribution",
Buckets: []float64{
1.0, 1.25, 1.6, 2.0, 2.5, 3.2, 4.0, 5.0, 6.3, 7.9,
10, 12.5, 16, 20, 25, 32, 40, 50, 63, 79,
100,
},
},
[]string{"af"})

// ConnectionCountSummary the (total) number of TCP connections collected, by type.
// Provides metrics:
// tcpinfo_Connection_Count_Summary
// Example usage:
// metrics.ConnectionCountSummary.With(prometheus.Labels{"af": "ipv6"}).observe(float64)
ConnectionCountSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "tcpinfo_Connection_Count_Summary",
Help: "The (total) number of TCP connections collected, by type.",
}, []string{"af"})
// ConnectionCountHistogram tracks the number of connections returned by
// each syscall. This ??? includes local connections that are NOT recorded
// in the cache or output.
ConnectionCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tcpinfo_connection_count_histogram",
Help: "connection count histogram",
Buckets: []float64{
1, 2, 3, 4, 5, 6, 8,
10, 12.5, 16, 20, 25, 32, 40, 50, 63, 79,
100, 125, 160, 200, 250, 320, 400, 500, 630, 790,
1000, 1250, 1600, 2000, 2500, 3200, 4000, 5000, 6300, 7900,
10000, 12500, 16000, 20000, 25000, 32000, 40000, 50000, 63000, 79000,
10000000,
},
},
[]string{"af"})

// CacheSizeSummary measures the size of the connection cache.
// Provides metrics:
// tcpinfo_Cache_Size_Summary
// Example usage:
// metrics.CacheSizeSummary.observe()
CacheSizeSummary = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tcpinfo_Connection_Cache_Size_Summary",
Help: "The number of entries in the connection cache.",
})
// CacheSizeHistogram tracks the number of entries in connection cache.
CacheSizeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "tcpinfo_cache_count_histogram",
Help: "cache connection count histogram",
Buckets: []float64{
1, 2, 3, 4, 5, 6, 8,
10, 12.5, 16, 20, 25, 32, 40, 50, 63, 79,
100, 125, 160, 200, 250, 320, 400, 500, 630, 790,
1000, 1250, 1600, 2000, 2500, 3200, 4000, 5000, 6300, 7900,
10000, 12500, 16000, 20000, 25000, 32000, 40000, 50000, 63000, 79000,
10000000,
},
})

// ErrorCount measures the number of annotation errors
// ErrorCount measures the number of errors
// Provides metrics:
// tcpinfo_Error_Count
// Example usage:
// metrics.ErrorCount.With(prometheus.Labels{"source", "foobar"}).Inc()
// metrics.ErrorCount.With(prometheus.Labels{"type", "foobar"}).Inc()
ErrorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcpinfo_Error_Count",
Name: "tcpinfo_error_count",
Help: "The total number of errors encountered.",
}, []string{"source"})

// WarningCount measures the number of annotation warnings
// Provides metrics:
// tcpinfo_Warning_Count
// Example usage:
// metrics.WarningCount.With(prometheus.Labels{"source", "foobar"}).Inc()
WarningCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcpinfo_Warning_Count",
Help: "The total number of Warnings encountered.",
}, []string{"source"})
}, []string{"type"})

// FileCount counts the number of files written.
// NewFileCount counts the number of connection files written.
//
// Provides metrics:
// tcpinfo_New_File_Count
// tcpinfo_new_file_count
// Example usage:
// metrics.FileCount.Inc()
FileCount = prometheus.NewCounter(
NewFileCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "tcpinfo_New_File_Count",
Name: "tcpinfo_new_file_count",
Help: "Number of files created.",
},
)

// TODO(dev): bytes/row - generalize this metric for any file type.
//
// RowSizeHistogram provides a histogram of bq row json sizes. It is intended primarily for
// NDT, so the bins are fairly large. NDT average json is around 200K
//
// Provides metrics:
// etl_row_json_size_bucket{table="...", le="..."}
// ...
// etl_row_json_size_sum{table="...", le="..."}
// etl_row_json_size_count{table="...", le="..."}
// Usage example:
// metrics.RowSizeHistogram.WithLabelValues(
// "ndt").Observe(len(json))
RowSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "etl_row_json_size",
Help: "Row json size distributions.",
Buckets: []float64{
0,
100, 130, 180, 240, 320, 420, 560, 750,
1000, 1300, 1800, 2400, 3200, 4200, 5600, 7500,
10000, 13000, 18000, 24000, 32000, 42000, 56000, 75000,
100000, 130000, 180000, 240000, 320000, 420000, 560000, 750000,
1000000, 1300000, 1800000, 2400000, 3200000, 4200000, 5600000, 7500000,
10000000, // 10MiB
math.Inf(+1),
},
},
[]string{"table"},
)

// TODO(dev): rows/test - generalize this metric for any file type.
//
// EntryFieldCountHistogram provides a histogram of (approximate) row field counts. It is intended primarily for
// NDT, so the bins are fairly large. NDT snapshots typically total about 10k
// fields, 99th percentile around 35k fields, and occasionally as many as 50k.
// Smaller field count bins included so that it is possibly useful for other
// parsers.
//
// Provides metrics:
// etl_entry_field_count_bucket{table="...", le="..."}
// ...
// etl_entry_field_count_sum{table="...", le="..."}
// etl_entry_field_count_count{table="...", le="..."}
// Usage example:
// metrics.EntryFieldCountHistogram.WithLabelValues(
// "ndt").Observe(fieldCount)
EntryFieldCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "etl_entry_field_count",
Help: "total snapshot field count distributions.",
Buckets: []float64{
0,
10, 13, 18, 24, 32, 42, 56, 75,
100, 130, 180, 240, 320, 420, 560, 750,
1000, 1300, 1800, 2400, 3200, 4200, 5600, 7500,
10000, 13000, 18000, 24000, 32000, 42000, 56000, 75000,
100000, 130000, 180000, 240000, 320000, 420000, 560000, 750000,
1000000, // 1 MiB
math.Inf(+1),
},
},
[]string{"table"},
)

// FileSizeHistogram provides a histogram of source file sizes. The bucket
// sizes should cover a wide range of input file sizes, but should not have too
// many buckets, because there are also three vector dimensions.
//
// Example usage:
// metrics.FileSizeHistogram.WithLabelValues(
// "ndt", "c2s_snaplog", "parsed").Observe(size)
FileSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "etl_test_file_size_bytes",
Help: "Size of individual test files.",
Buckets: []float64{
0,
1000, 2500, 5000, 10000, 25000, 50000,
100000, 250000, 500000, 1000000, 2500000, 5000000,
10000000, 25000000, 50000000, 100000000, 250000000, 500000000,
1000000000, // 1 gb
math.Inf(+1),
},
},
[]string{"table", "kind", "group"},
)
)
2 changes: 1 addition & 1 deletion saver/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (conn *Connection) Rotate(Host string, Pod string, FileAgeLimit time.Durati
if err != nil {
return err
}
metrics.FileCount.Inc()
metrics.NewFileCount.Inc()
conn.Expiration = conn.Expiration.Add(10 * time.Minute)
conn.Sequence++
return nil
Expand Down

0 comments on commit e67a024

Please sign in to comment.