Skip to content

Commit

Permalink
revert server for benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick <patrickjiang0530@gmail.com>
  • Loading branch information
Patrick0308 committed Jul 27, 2022
1 parent 01373aa commit 5a3fb73
Showing 1 changed file with 39 additions and 99 deletions.
138 changes: 39 additions & 99 deletions pilot/cmd/pilot-agent/status/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ const (
)

var (
bufPool = &sync.Pool{
New: func() interface{} {
buf := make([]byte, 32*1024)
return &buf
},
}
UpstreamLocalAddressIPv4 = &net.TCPAddr{IP: net.ParseIP("127.0.0.6")}
UpstreamLocalAddressIPv6 = &net.TCPAddr{IP: net.ParseIP("::6")}
)
Expand Down Expand Up @@ -487,38 +481,24 @@ type PrometheusScrapeConfiguration struct {
// but we still want Envoy metrics. Instead, errors are tracked in the failed scrape metrics/logs.
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
metrics.ScrapeTotals.Increment()
var envoy, application, agent []byte
var err error
var envoy, application io.ReadCloser
var envoyCancel, appCancel context.CancelFunc
defer func() {
if envoy != nil {
envoy.Close()
}
if application != nil {
application.Close()
}
if envoyCancel != nil {
envoyCancel()
}
if appCancel != nil {
appCancel()
}
}()

// Gather all the metrics we will merge
if !s.config.NoEnvoy {
if envoy, envoyCancel, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil {
if envoy, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil {
log.Errorf("failed scraping envoy metrics: %v", err)
metrics.EnvoyScrapeErrors.Increment()
}
// Process envoy's metrics to make them compatible with FmtOpenMetrics
envoy = processMetrics(envoy)
}

// Scrape app metrics if defined and capture their format
var format expfmt.Format
if s.prometheus != nil {
var contentType string
url := fmt.Sprintf("http://localhost:%s%s", s.prometheus.Port, s.prometheus.Path)
if application, appCancel, contentType, err = s.scrape(url, r.Header); err != nil {
if application, contentType, err = s.scrape(url, r.Header); err != nil {
log.Errorf("failed scraping application metrics: %v", err)
metrics.AppScrapeErrors.Increment()
}
Expand All @@ -528,38 +508,29 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
format = expfmt.FmtText
}

if agent, err = scrapeAgentMetrics(); err != nil {
log.Errorf("failed scraping agent metrics: %v", err)
metrics.AgentScrapeErrors.Increment()
}

w.Header().Set("Content-Type", string(format))

// Write out the metrics
if err = scrapeAndWriteAgentMetrics(io.Writer(w)); err != nil {
log.Errorf("failed scraping and writing agent metrics: %v", err)
if _, err := w.Write(agent); err != nil {
log.Errorf("failed to write agent metrics: %v", err)
metrics.AgentScrapeErrors.Increment()
}

buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)

if envoy != nil {
var eerr error
if format == expfmt.FmtOpenMetrics {
_, eerr = copyAndProcessMetrics(w, envoy, *buf)
} else {
_, eerr = io.CopyBuffer(w, envoy, *buf)
}
if eerr != nil {
log.Errorf("failed to scraping and writing envoy metrics: %v", eerr)
if _, err := w.Write(envoy); err != nil {
log.Errorf("failed to write envoy metrics: %v", err)
metrics.EnvoyScrapeErrors.Increment()
}
}

// App metrics must go last because if they are FmtOpenMetrics,
// they will have a trailing "# EOF" which terminates the full exposition
if application != nil {
_, err = io.CopyBuffer(w, application, *buf)
if err != nil {
log.Errorf("failed to scraping and writing application metrics: %v", err)
metrics.AppScrapeErrors.Increment()
}
if _, err := w.Write(application); err != nil {
log.Errorf("failed to write application metrics: %v", err)
metrics.AppScrapeErrors.Increment()
}
}

Expand All @@ -571,62 +542,24 @@ func negotiateMetricsFormat(contentType string) expfmt.Format {
return expfmt.FmtText
}

func copyAndProcessMetrics(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
var sideBreak bool
for {
nr, er := src.Read(buf)
if nr > 0 {
rbuf := bytes.ReplaceAll(buf[0:nr], []byte("\n\n"), []byte("\n"))
if rbuf[0] == '\n' && sideBreak {
rbuf = rbuf[1:]
}
lr := len(rbuf)
if rbuf[lr-1] == '\n' {
sideBreak = true
}
nw, ew := dst.Write(rbuf[0:lr])
if nw < 0 || lr < nw {
nw = 0
if ew == nil {
ew = errors.New("invalid write result")
}
}
written += int64(nw)
if ew != nil {
err = ew
break
}
if lr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
if nr <= 0 {
break
}
}
return written, err
func processMetrics(metrics []byte) []byte {
return bytes.ReplaceAll(metrics, []byte("\n\n"), []byte("\n"))
}

func scrapeAndWriteAgentMetrics(w io.Writer) error {
func scrapeAgentMetrics() ([]byte, error) {
buf := &bytes.Buffer{}
mfs, err := promRegistry.Gather()
enc := expfmt.NewEncoder(w, expfmt.FmtText)
enc := expfmt.NewEncoder(buf, expfmt.FmtText)
if err != nil {
return err
return nil, err
}
var errs error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
return buf.Bytes(), errs
}

func applyHeaders(into http.Header, from http.Header, keys ...string) {
Expand All @@ -651,21 +584,22 @@ func getHeaderTimeout(timeout string) (time.Duration, error) {
// scrape will send a request to the provided url to scrape metrics from
// This will attempt to mimic some of Prometheus functionality by passing some of the headers through
// such as accept, timeout, and user agent
// Returns the scraped metrics reader as well as the response's "Content-Type" header to determine the metrics format
func (s *Server) scrape(url string, header http.Header) (io.ReadCloser, context.CancelFunc, string, error) {
var cancel context.CancelFunc
// Returns the scraped metrics as well as the response's "Content-Type" header to determine the metrics format
func (s *Server) scrape(url string, header http.Header) ([]byte, string, error) {
ctx := context.Background()
if timeoutString := header.Get("X-Prometheus-Scrape-Timeout-Seconds"); timeoutString != "" {
timeout, err := getHeaderTimeout(timeoutString)
if err != nil {
log.Warnf("Failed to parse timeout header %v: %v", timeoutString, err)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
c, cancel := context.WithTimeout(ctx, timeout)
ctx = c
defer cancel()
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, cancel, "", err
return nil, "", err
}
applyHeaders(req.Header, header, "Accept",
"User-Agent",
Expand All @@ -674,13 +608,19 @@ func (s *Server) scrape(url string, header http.Header) (io.ReadCloser, context.

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, cancel, "", fmt.Errorf("error scraping %s: %v", url, err)
return nil, "", fmt.Errorf("error scraping %s: %v", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, cancel, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode)
return nil, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode)
}
metrics, err := io.ReadAll(resp.Body)
if err != nil {
return nil, "", fmt.Errorf("error reading %s: %v", url, err)
}

format := resp.Header.Get("Content-Type")
return resp.Body, cancel, format, nil
return metrics, format, nil
}

func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 5a3fb73

Please sign in to comment.