-
Notifications
You must be signed in to change notification settings - Fork 346
/
absorb.go
127 lines (107 loc) · 3.1 KB
/
absorb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package diag
import (
"io"
"net/http"
"time"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/flowid"
"github.com/zalando/skipper/logging"
)
// AbsorbName contains the name of the absorb filter.
// Deprecated, use filters.AbsorbName instead
const AbsorbName = filters.AbsorbName
// AbsorbSilentName contains the name of the absorbSilent filter.
// Deprecated, use filters.AbsorbSilentName instead
const AbsorbSilentName = filters.AbsorbSilentName
const loggingInterval = time.Second
type absorb struct {
logger logging.Logger
id flowid.Generator
silent bool
}
func withLogger(silent bool, l logging.Logger) filters.Spec {
if l == nil {
l = &logging.DefaultLog{}
}
id, err := flowid.NewStandardGenerator(flowid.MinLength)
if err != nil {
l.Errorf("failed to create ID generator: %v", err)
}
return &absorb{
logger: l,
id: id,
silent: silent,
}
}
// NewAbsorb initializes a filter spec for the absorb filter.
//
// The absorb filter reads and discards the payload of the incoming requests.
// It logs with INFO level and a unique ID per request:
// - the event of receiving the request
// - partial and final events for consuming request payload and total consumed byte count
// - the finishing event of the request
// - any read errors other than EOF
func NewAbsorb() filters.Spec {
return withLogger(false, nil)
}
// NewAbsorbSilent initializes a filter spec for the absorbSilent filter,
// similar to the absorb filter, but without verbose logging of the absorbed
// payload.
//
// The absorbSilent filter reads and discards the payload of the incoming requests. It only
// logs read errors other than EOF.
func NewAbsorbSilent() filters.Spec {
return withLogger(true, nil)
}
func (a *absorb) Name() string {
if a.silent {
return filters.AbsorbSilentName
} else {
return filters.AbsorbName
}
}
func (a *absorb) CreateFilter(args []interface{}) (filters.Filter, error) { return a, nil }
func (a *absorb) Response(filters.FilterContext) {}
func (a *absorb) Request(ctx filters.FilterContext) {
req := ctx.Request()
id := req.Header.Get(flowid.HeaderName)
if id == "" {
if a.id == nil {
id = "-"
} else {
var err error
if id, err = a.id.Generate(); err != nil {
a.logger.Error(err)
}
}
}
sink := io.Discard
if !a.silent {
a.logger.Infof("received request to be absorbed: %s", id)
sink = &loggingSink{id: id, logger: a.logger, next: time.Now().Add(loggingInterval)}
}
count, err := io.Copy(sink, req.Body)
if !a.silent {
if err != nil {
a.logger.Infof("request %s, error while consuming request: %v", id, err)
}
a.logger.Infof("request %s, consumed bytes: %d", id, count)
a.logger.Infof("request finished: %s", id)
}
ctx.Serve(&http.Response{StatusCode: http.StatusOK})
}
type loggingSink struct {
id string
logger logging.Logger
next time.Time
count int64
}
func (s *loggingSink) Write(p []byte) (n int, err error) {
n, err = len(p), nil
s.count += int64(n)
if time.Now().After(s.next) {
s.logger.Infof("request %s, consumed bytes: %d", s.id, s.count)
s.next = s.next.Add(loggingInterval)
}
return
}