Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Experimental spike recognzing X-Forwarded-for header #249

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions learn/parse_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ func ParseHTTP(elem akinet.ParsedNetworkContent) (*PartialWitness, error) {

var streamID uuid.UUID
var seq int
xForwardedFor := optionals.None[string]()

switch t := elem.(type) {
case akinet.HTTPRequest:
streamID = t.StreamID
seq = t.Seq

isRequest = true
methodMeta, datas = parseRequest(&t)
methodMeta, datas, xForwardedFor = parseRequest(&t)
rawBody = t.Body
bodyDecompressed = t.BodyDecompressed
headers = t.Header
Expand Down Expand Up @@ -187,8 +187,9 @@ func ParseHTTP(elem akinet.ParsedNetworkContent) (*PartialWitness, error) {
}

return &PartialWitness{
Witness: &pb.Witness{Method: method},
PairKey: toWitnessID(streamID, seq),
Witness: &pb.Witness{Method: method},
PairKey: toWitnessID(streamID, seq),
XForwardedFor: xForwardedFor,
}, nil
}

Expand Down Expand Up @@ -487,14 +488,14 @@ func parseMultipartBody(multipartType string, boundary string, bodyStream io.Rea
}, nil
}

func parseRequest(req *akinet.HTTPRequest) (*pb.MethodMeta, []*pb.Data) {
func parseRequest(req *akinet.HTTPRequest) (*pb.MethodMeta, []*pb.Data, optionals.Optional[string]) {
datas := []*pb.Data{}
noStatusCode := optionals.None[int]()
datas = append(datas, parseQuery(req.URL)...)
datas = append(datas, parseHeader(req.Header, noStatusCode)...)
datas = append(datas, parseCookies(req.Cookies, noStatusCode)...)

return parseMethodMeta(req), datas
return parseMethodMeta(req), datas, parseLoadBalancer(req.Header)
}

func parseResponse(resp *akinet.HTTPResponse) []*pb.Data {
Expand Down Expand Up @@ -523,6 +524,19 @@ func parseCookies(cs []*http.Cookie, responseCodeOpt optionals.Optional[int]) []
return datas
}

// Extract the X-Forwarded-For header, if present
func parseLoadBalancer(header http.Header) optionals.Optional[string] {
for k, vs := range header {
switch strings.ToLower(k) {
case "x-forwarded-for":
if len(vs) > 0 {
return optionals.Some(vs[0])
}
}
}
return optionals.None[string]()
}

// Translate headers to data objects. optionals.None indicates that the header
// is in a request.
func parseHeader(header http.Header, responseCodeOpt optionals.Optional[int]) []*pb.Data {
Expand Down Expand Up @@ -551,11 +565,14 @@ func parseHeader(header http.Header, responseCodeOpt optionals.Optional[int]) []

switch strings.ToLower(k) {
case "cookie", "set-cookie":
// Cookies are parsed by parseHeader.
// Cookies are parsed by parseCookies.
continue
case "content-type":
// Handled by parseBody.
continue
case "x-forwarded-for":
// Handled by parseLoadBalancer
continue
case "authorization":
// If the authorization header is in the request, create an
// HTTPAuth object. Treat authorization headers in the response
Expand Down
6 changes: 5 additions & 1 deletion learn/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (

"github.com/google/uuid"

"github.com/akitasoftware/akita-libs/akid"
pb "github.com/akitasoftware/akita-ir/go/api_spec"
"github.com/akitasoftware/akita-libs/akid"
"github.com/akitasoftware/go-utils/optionals"
)

var (
Expand All @@ -22,6 +23,9 @@ type PartialWitness struct {

// Key used to pair this PartialWitness up with its counterpart.
PairKey akid.WitnessID

// Request header's X-forwarded-for header, the real client address
XForwardedFor optionals.Optional[string]
}

// Generates a v5 UUID as witness ID based on stream ID and seq.
Expand Down
14 changes: 13 additions & 1 deletion trace/backend_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package trace
import (
"encoding/base64"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,7 +52,8 @@ type witnessWithInfo struct {
requestEnd time.Time
responseStart time.Time

witness *pb.Witness
witness *pb.Witness
xForwardedFor optionals.Optional[string]
}

func (r witnessWithInfo) toReport() (*kgxapi.WitnessReport, error) {
Expand All @@ -64,6 +66,13 @@ func (r witnessWithInfo) toReport() (*kgxapi.WitnessReport, error) {
return nil, errors.Wrap(err, "failed to marshal witness proto")
}

if header, ok := r.xForwardedFor.Get(); ok {
printer.Infof("Real source IP %v, X-Forwarded-For header %v\n", r.srcIP, header)
addrs := strings.Split(header, ",")
// Replace with the leftmost IP, which is the closest to the original (???)
r.srcIP = net.ParseIP(addrs[0])
}

return &kgxapi.WitnessReport{
Direction: kgxapi.Inbound,
OriginAddr: r.srcIP,
Expand Down Expand Up @@ -207,6 +216,8 @@ func (c *BackendCollector) Process(t akinet.ParsedNetworkTraffic) error {
if isRequest {
pair.srcIP, pair.dstIP = pair.dstIP, pair.srcIP
pair.srcPort, pair.dstPort = pair.dstPort, pair.srcPort

pair.xForwardedFor = partial.XForwardedFor
}

c.queueUpload(pair)
Expand All @@ -225,6 +236,7 @@ func (c *BackendCollector) Process(t akinet.ParsedNetworkTraffic) error {
witness: partial.Witness,
observationTime: t.ObservationTime,
id: partial.PairKey,
xForwardedFor: partial.XForwardedFor,
}
// Store whichever timestamp brackets the processing interval.
w.recordTimestamp(isRequest, t)
Expand Down