Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/types"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core/config"

"github.com/gogo/protobuf/types"
)

type Collector interface {
Expand Down Expand Up @@ -129,6 +130,16 @@ func GetCalculationMap() map[string]string {
"nginx.http.v1_0": "sum",
"nginx.http.v1_1": "sum",
"nginx.http.v2": "sum",
"nginx.upstream.connect.time": "avg",
"nginx.upstream.connect.time.count": "sum",
"nginx.upstream.connect.time.max": "avg",
"nginx.upstream.connect.time.median": "avg",
"nginx.upstream.connect.time.pctl95": "avg",
"nginx.upstream.header.time": "avg",
"nginx.upstream.header.time.count": "sum",
"nginx.upstream.header.time.max": "avg",
"nginx.upstream.header.time.median": "avg",
"nginx.upstream.header.time.pctl95": "avg",
"nginx.http.conn.handled": "sum",
"nginx.http.conn.reading": "avg",
"nginx.http.conn.writing": "avg",
Expand Down
1 change: 1 addition & 0 deletions src/core/metrics/sources/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
cgroup "github.com/nginx/agent/v2/src/core/metrics/sources/cgroup"

"github.com/shirou/gopsutil/v3/cpu"
log "github.com/sirupsen/logrus"
)
Expand Down
1 change: 1 addition & 0 deletions src/core/metrics/sources/cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/nginx/agent/sdk/v2/proto"
cgroup "github.com/nginx/agent/v2/src/core/metrics/sources/cgroup"
tutils "github.com/nginx/agent/v2/test/utils"

"github.com/shirou/gopsutil/v3/cpu"
"github.com/stretchr/testify/assert"
)
Expand Down
187 changes: 128 additions & 59 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"time"

"github.com/nginx/agent/sdk/v2/proto"
log "github.com/sirupsen/logrus"

"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
"github.com/nginx/agent/v2/src/core/metrics/sources/tailer"

log "github.com/sirupsen/logrus"
)

const (
spaceDelim = " "
)

// This metrics source is used to tail the NGINX access logs to retrieve http metrics.
// This metrics source is used to tail the NGINX access logs to retrieve metrics.

type NginxAccessLog struct {
baseDimensions *metrics.CommonDim
Expand All @@ -49,7 +49,7 @@ func NewNginxAccessLog(
binary core.NginxBinary,
nginxType string,
collectionInterval time.Duration) *NginxAccessLog {
log.Trace("Creating NewNginxAccessLog")
log.Trace("Creating NginxAccessLog")

nginxAccessLog := &NginxAccessLog{
baseDimensions,
Expand Down Expand Up @@ -78,7 +78,6 @@ func NewNginxAccessLog(

func (c *NginxAccessLog) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *proto.StatsEntity) {
defer wg.Done()

c.collectLogStats(ctx, m)
}

Expand Down Expand Up @@ -153,15 +152,22 @@ func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *proto.St
c.buf = []*proto.StatsEntity{}
}

var httpRequestMetrics = []string{
"request.time",
"request.time.count",
"request.time.max",
"request.time.median",
"request.time.pctl95",
}

func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string) {
logPattern := convertLogFormat(logFormat)
log.Debugf("Collecting from: %s using format: %s", logFile, logFormat)
log.Debugf("Pattern used for tailing logs: %s", logPattern)

counters := getDefaultCounters()
gzipRatios := []float64{}
requestLengths := []float64{}
requestTimes := []float64{}
httpCounters, connCounters, headerCounters := getDefaultCounters()
gzipRatios, requestLengths, requestTimes, connectTimes, headerTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

mu := sync.Mutex{}

t, err := tailer.NewPatternTailer(logFile, map[string]string{"DEFAULT": logPattern})
Expand All @@ -187,34 +193,34 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
mu.Lock()
if v, err := strconv.Atoi(access.BodyBytesSent); err == nil {
n := "request.body_bytes_sent"
counters[n] = float64(v) + counters[n]
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting body_bytes_sent value from access logs, %v", err)
log.Debugf("Error getting body_bytes_sent value from access logs: %v", err)
}

if v, err := strconv.Atoi(access.BytesSent); err == nil {
n := "request.bytes_sent"
counters[n] = float64(v) + counters[n]
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting bytes_sent value from access logs, %v", err)
log.Debugf("Error getting bytes_sent value from access logs: %v", err)
}

if v, err := strconv.Atoi(access.GzipRatio); err == nil {
gzipRatios = append(gzipRatios, float64(v))
} else {
log.Debugf("Error getting gzip_ratio value from access logs, %v", err)
log.Debugf("Error getting gzip_ratio value from access logs: %v", err)
}

if v, err := strconv.Atoi(access.RequestLength); err == nil {
requestLengths = append(requestLengths, float64(v))
} else {
log.Debugf("Error getting request_length value from access logs, %v", err)
log.Debugf("Error getting request_length value from access logs: %v", err)
}

if v, err := strconv.ParseFloat(access.RequestTime, 64); err == nil {
requestTimes = append(requestTimes, v)
} else {
log.Debugf("Error getting request_time value from access logs, %v", err)
log.Debugf("Error getting request_time value from access logs: %v", err)
}

if access.Request != "" {
Expand All @@ -223,43 +229,65 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
if isOtherMethod(n) {
n = "method.others"
}
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1

if access.ServerProtocol == "" {
if strings.Count(protocol, "/") == 1 {
httpProtocolVersion := strings.Split(protocol, "/")[1]
httpProtocolVersion = strings.ReplaceAll(httpProtocolVersion, ".", "_")
n = fmt.Sprintf("v%s", httpProtocolVersion)
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
}
}
}

for _, cTime := range strings.Split(access.UpstreamConnectTime, ", ") {
// nginx uses '-' to represent TCP connection failures
cTime = strings.ReplaceAll(cTime, "-", "0")

if v, err := strconv.ParseFloat(cTime, 64); err == nil {
connectTimes = append(connectTimes, v)
} else {
log.Debugf("Error getting upstream_connect_time value from access logs, %v", err)
}
}

for _, hTime := range strings.Split(access.UpstreamHeaderTime, ", ") {
// nginx uses '-' to represent TCP connection failures
hTime = strings.ReplaceAll(hTime, "-", "0")

if v, err := strconv.ParseFloat(hTime, 64); err == nil {
headerTimes = append(headerTimes, v)
} else {
log.Debugf("Error getting upstream_header_time value from access logs: %v", err)
}
}

if access.ServerProtocol != "" {
if strings.Count(access.ServerProtocol, "/") == 1 {
httpProtocolVersion := strings.Split(access.ServerProtocol, "/")[1]
httpProtocolVersion = strings.ReplaceAll(httpProtocolVersion, ".", "_")
n := fmt.Sprintf("v%s", httpProtocolVersion)
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
}
}

// don't need the http status for NGINX Plus
if c.nginxType == OSSNginxType {
if v, err := strconv.Atoi(access.Status); err == nil {
n := fmt.Sprintf("status.%dxx", v/100)
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
if v == 403 || v == 404 || v == 500 || v == 502 || v == 503 || v == 504 {
n := fmt.Sprintf("status.%d", v)
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
}
if v == 499 {
n := "status.discarded"
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
}
if v == 400 {
n := "request.malformed"
counters[n] = counters[n] + 1
httpCounters[n] = httpCounters[n] + 1
}
} else {
log.Debugf("Error getting status value from access logs, %v", err)
Expand All @@ -270,29 +298,41 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
case <-tick.C:
c.baseDimensions.NginxType = c.nginxType
c.baseDimensions.PublishedAPI = logFile
c.group = "http"

mu.Lock()

if len(requestLengths) > 0 {
counters["request.length"] = getRequestLengthMetricValue(requestLengths)
httpCounters["request.length"] = getRequestLengthMetricValue(requestLengths)
}

if len(gzipRatios) > 0 {
counters["gzip.ratio"] = getGzipRatioMetricValue(gzipRatios)
httpCounters["gzip.ratio"] = getGzipRatioMetricValue(gzipRatios)
}

for key, value := range getRequestTimeMetrics(requestTimes) {
counters[key] = value
for _, metricName := range httpRequestMetrics {
httpCounters[metricName] = getTimeMetrics(metricName, requestTimes)
}

simpleMetrics := c.convertSamplesToSimpleMetrics(counters)
for metricName := range connCounters {
connCounters[metricName] = getTimeMetrics(metricName, connectTimes)
}

for metricName := range headerCounters {
headerCounters[metricName] = getTimeMetrics(metricName, headerTimes)
}

c.group = "http"
simpleMetrics := c.convertSamplesToSimpleMetrics(httpCounters)

c.group = ""
simpleMetrics = append(simpleMetrics, c.convertSamplesToSimpleMetrics(connCounters)...)
simpleMetrics = append(simpleMetrics, c.convertSamplesToSimpleMetrics(headerCounters)...)

log.Tracef("Access log metrics collected: %v", simpleMetrics)

// reset the counters
counters = getDefaultCounters()
gzipRatios = []float64{}
requestLengths = []float64{}
requestTimes = []float64{}
httpCounters, connCounters, headerCounters = getDefaultCounters()
gzipRatios, requestLengths, requestTimes, connectTimes, headerTimes = []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

c.buf = append(c.buf, metrics.NewStatsEntity(c.baseDimensions.ToDimensions(), simpleMetrics))

Expand Down Expand Up @@ -368,43 +408,52 @@ func getGzipRatioMetricValue(gzipRatios []float64) float64 {
return value
}

func getRequestTimeMetrics(requestTimes []float64) map[string]float64 {
counters := make(map[string]float64)
func getTimeMetrics(metricName string, times []float64) float64 {
if len(times) == 0 {
return 0
}

metricType := metricName[strings.LastIndex(metricName, ".")+1:]

if len(requestTimes) > 0 {
// Calculate request time average
sort.Float64s(requestTimes)
requestTimesSum := 0.0
for _, requestTime := range requestTimes {
requestTimesSum += requestTime
switch metricType {
case "time":
// Calculate average
sum := 0.0
for _, t := range times {
sum += t
}
return sum / float64(len(times))

counters["request.time"] = requestTimesSum / float64(len(requestTimes))
case "count":
return float64(len(times))

// Calculate request time count
sort.Float64s(requestTimes)
counters["request.time.count"] = float64(len(requestTimes))
case "max":
sort.Float64s(times)
return times[len(times)-1]

// Calculate request time max
sort.Float64s(requestTimes)
counters["request.time.max"] = requestTimes[len(requestTimes)-1]
case "median":
sort.Float64s(times)

// Calculate request time median
mNumber := len(requestTimes) / 2
if len(requestTimes)%2 != 0 {
counters["request.time.median"] = requestTimes[mNumber]
mNumber := len(times) / 2
if len(times)%2 != 0 {
return times[mNumber]
} else {
counters["request.time.median"] = (requestTimes[mNumber-1] + requestTimes[mNumber]) / 2
return (times[mNumber-1] + times[mNumber]) / 2
}

// Calculate request time 95 percentile
index := int(math.RoundToEven(float64(0.95)*float64(len(requestTimes)))) - 1
counters["request.time.pctl95"] = requestTimes[index]
case "pctl95":
sort.Float64s(times)

index := int(math.RoundToEven(float64(0.95)*float64(len(times)))) - 1
return times[index]
}

return counters
log.Debugf("Could not get time metrics for %s: invalid metric type", metricName)

return 0
}

// convertLogFormat converts log format into a pattern that can be parsed by the tailer
func convertLogFormat(logFormat string) string {
newLogFormat := strings.ReplaceAll(logFormat, "$remote_addr", "%{IPORHOST:remote_addr}")
newLogFormat = strings.ReplaceAll(newLogFormat, "$remote_user", "%{USERNAME:remote_user}")
Expand All @@ -421,6 +470,8 @@ func convertLogFormat(logFormat string) string {
newLogFormat = strings.ReplaceAll(newLogFormat, "$request_time", "%{DATA:request_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "\"$request\"", "\"%{DATA:request}\"")
newLogFormat = strings.ReplaceAll(newLogFormat, "$request ", "%{DATA:request} ")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_connect_time", "%{DATA:upstream_connect_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_header_time", "%{DATA:upstream_header_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "[", "\\[")
newLogFormat = strings.ReplaceAll(newLogFormat, "]", "\\]")
return newLogFormat
Expand All @@ -435,8 +486,8 @@ func isOtherMethod(method string) bool {
method != "method.options"
}

func getDefaultCounters() map[string]float64 {
return map[string]float64{
func getDefaultCounters() (map[string]float64, map[string]float64, map[string]float64) {
httpCounters := map[string]float64{
"gzip.ratio": 0,
"method.delete": 0,
"method.get": 0,
Expand Down Expand Up @@ -471,4 +522,22 @@ func getDefaultCounters() map[string]float64 {
"v1_1": 0,
"v2": 0,
}

upstreamConnectCounters := map[string]float64{
"upstream.connect.time": 0,
"upstream.connect.time.count": 0,
"upstream.connect.time.max": 0,
"upstream.connect.time.median": 0,
"upstream.connect.time.pctl95": 0,
}

upstreamHeaderCounters := map[string]float64{
"upstream.header.time": 0,
"upstream.header.time.count": 0,
"upstream.header.time.max": 0,
"upstream.header.time.median": 0,
"upstream.header.time.pctl95": 0,
}

return httpCounters, upstreamConnectCounters, upstreamHeaderCounters
}
Loading