diff --git a/CHANGELOG.md b/CHANGELOG.md index 955e0475..d35af9a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,12 @@ ## Issues Fixed and Dependency Updates * github.com/openziti/sdk-golang: [v1.1.2 -> v1.1.3](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.1.3) + * [Issue #763](https://github.com/openziti/sdk-golang/issues/763) - Use a go-routine pool for payload ingest + * [Issue #761](https://github.com/openziti/sdk-golang/issues/761) - Use cmap.ConcurrentMap for message multiplexer * [Issue #754](https://github.com/openziti/sdk-golang/issues/754) - panic: unaligned 64-bit atomic operation when running on 32-bit raspberry pi * [Issue #757](https://github.com/openziti/sdk-golang/issues/757) - Not authenticated check fails on session create when using OIDC +* github.com/golang-jwt/jwt/v5: v5.2.2 -> v5.2.3 * github.com/openziti/channel/v4: [v4.2.0 -> v4.2.15](https://github.com/openziti/channel/compare/v4.2.0...v4.2.15) * [Issue #194](https://github.com/openziti/channel/issues/194) - Add GetUnderlays and GetUnderlayCountsByType to Channel @@ -27,7 +30,6 @@ * golang.org/x/term: v0.32.0 -> v0.33.0 * golang.org/x/text: v0.25.0 -> v0.27.0 - # Release notes 1.1.2 ## Issues Fixed and Dependency Updates diff --git a/edge-apis/pool.go b/edge-apis/pool.go index 247b7d46..b5be99d1 100644 --- a/edge-apis/pool.go +++ b/edge-apis/pool.go @@ -24,6 +24,7 @@ import ( "math/rand/v2" "net" "net/url" + "slices" "sync/atomic" "time" ) @@ -35,7 +36,7 @@ type ApiClientTransport struct { // ClientTransportPool abstracts the concept of multiple `runtime.ClientTransport` (openapi interface) representing one // target OpenZiti network. In situations where controllers are running in HA mode (multiple controllers) this -// interface can attempt to try different controller during outages or partitioning. +// interface can attempt to try a different controller during outages or partitioning. type ClientTransportPool interface { runtime.ClientTransport @@ -265,6 +266,5 @@ func selectAndRemoveRandom[T any](slice []T, zero T) (selected T, modifiedSlice rng := rand.New(rand.NewPCG(seed, seed)) index := rng.IntN(len(slice)) selected = slice[index] - modifiedSlice = append(slice[:index], slice[index+1:]...) - return selected, modifiedSlice + return selected, slices.Delete(slice, index, index+1) } diff --git a/example/go.mod b/example/go.mod index 175446de..d227c37f 100644 --- a/example/go.mod +++ b/example/go.mod @@ -56,9 +56,9 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/go-openapi/validate v0.24.0 // indirect github.com/go-resty/resty/v2 v2.16.5 // indirect - github.com/golang-jwt/jwt/v5 v5.2.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.3 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 // indirect + github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b // indirect github.com/gorilla/schema v1.4.1 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect diff --git a/example/go.sum b/example/go.sum index fa819ce2..ca62b30c 100644 --- a/example/go.sum +++ b/example/go.sum @@ -155,8 +155,8 @@ github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptd github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= -github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0= +github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -191,8 +191,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/gomarkdown/markdown v0.0.0-20191123064959-2c17d62f5098/go.mod h1:aii0r/K0ZnHv7G0KF7xy1v0A7s2Ljrb5byB7MO5p6TU= -github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 h1:EcQR3gusLHN46TAD+G+EbaaqJArt5vHhNpXAa12PQf4= -github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= +github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b h1:EY/KpStFl60qA17CptGXhwfZ+k1sFNJIUNR8DdbcuUk= +github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/example/influxdb-client-go/go.mod b/example/influxdb-client-go/go.mod index 8ee4c110..fff10945 100644 --- a/example/influxdb-client-go/go.mod +++ b/example/influxdb-client-go/go.mod @@ -54,9 +54,9 @@ require ( github.com/go-playground/validator/v10 v10.15.4 // indirect github.com/go-resty/resty/v2 v2.16.5 // indirect github.com/goccy/go-json v0.10.2 // indirect - github.com/golang-jwt/jwt/v5 v5.2.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 // indirect + github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/css v1.0.0 // indirect github.com/gorilla/mux v1.8.1 // indirect diff --git a/example/influxdb-client-go/go.sum b/example/influxdb-client-go/go.sum index 87e31c85..fff87902 100644 --- a/example/influxdb-client-go/go.sum +++ b/example/influxdb-client-go/go.sum @@ -180,8 +180,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= -github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0= +github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -213,8 +213,8 @@ github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 h1:EcQR3gusLHN46TAD+G+EbaaqJArt5vHhNpXAa12PQf4= -github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= +github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b h1:EY/KpStFl60qA17CptGXhwfZ+k1sFNJIUNR8DdbcuUk= +github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/go.mod b/go.mod index 58260aec..ae9a836c 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/go-openapi/runtime v0.28.0 github.com/go-openapi/strfmt v0.23.0 github.com/go-resty/resty/v2 v2.16.5 - github.com/golang-jwt/jwt/v5 v5.2.2 + github.com/golang-jwt/jwt/v5 v5.2.3 github.com/google/uuid v1.6.0 github.com/kataras/go-events v0.0.3 github.com/michaelquigley/pfxlog v0.6.10 diff --git a/go.sum b/go.sum index b5e69cb2..f23a33aa 100644 --- a/go.sum +++ b/go.sum @@ -122,8 +122,8 @@ github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptd github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= -github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0= +github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/xgress/circuit_inspections.go b/xgress/circuit_inspections.go index c01a9e45..628a83f5 100644 --- a/xgress/circuit_inspections.go +++ b/xgress/circuit_inspections.go @@ -62,10 +62,12 @@ type InspectDetail struct { Goroutines []string `json:"goroutines"` Sequence uint64 `json:"sequence"` Flags string `json:"flags"` + LastSizeSent uint32 `json:"lastSizeSent"` } type SendBufferDetail struct { WindowSize uint32 `json:"windowSize"` + QueuedPayloadCount int `json:"queuedPayloadCount"` LinkSendBufferSize uint32 `json:"linkSendBufferSize"` LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"` Accumulator uint32 `json:"accumulator"` @@ -85,7 +87,6 @@ type SendBufferDetail struct { type RecvBufferDetail struct { Size uint32 `json:"size"` PayloadCount uint32 `json:"payloadCount"` - LastSizeSent uint32 `json:"lastSizeSent"` Sequence int32 `json:"sequence"` MaxSequence int32 `json:"maxSequence"` NextPayload string `json:"nextPayload"` diff --git a/xgress/link_receive_buffer.go b/xgress/link_receive_buffer.go index 5a010ae6..f0ca1957 100644 --- a/xgress/link_receive_buffer.go +++ b/xgress/link_receive_buffer.go @@ -21,22 +21,24 @@ import ( "github.com/emirpasic/gods/trees/btree" "github.com/emirpasic/gods/utils" "github.com/michaelquigley/pfxlog" + "sync" "sync/atomic" - "time" ) type LinkReceiveBuffer struct { - tree *btree.Tree - sequence int32 - maxSequence int32 - size uint32 - lastBufferSizeSent uint32 + sync.Mutex + tree *btree.Tree + sequence int32 + maxSequence int32 + size uint32 + txQueue chan *Payload } -func NewLinkReceiveBuffer() *LinkReceiveBuffer { +func NewLinkReceiveBuffer(txQueueSize int32) *LinkReceiveBuffer { return &LinkReceiveBuffer{ tree: btree.NewWith(10240, utils.Int32Comparator), sequence: -1, + txQueue: make(chan *Payload, txQueueSize), } } @@ -45,6 +47,9 @@ func (buffer *LinkReceiveBuffer) Size() uint32 { } func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool { + buffer.Lock() + defer buffer.Unlock() + if payload.GetSequence() <= buffer.sequence { x.dataPlane.GetMetrics().MarkDuplicatePayload() return true @@ -67,47 +72,56 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, m } else { x.dataPlane.GetMetrics().MarkDuplicatePayload() } + + buffer.queueNext() + return true } -func (buffer *LinkReceiveBuffer) PeekHead() *Payload { +func (buffer *LinkReceiveBuffer) queueNext() { if val := buffer.tree.LeftValue(); val != nil { payload := val.(*Payload) if payload.Sequence == buffer.sequence+1 { - return payload + select { + case buffer.txQueue <- payload: + buffer.tree.Remove(payload.Sequence) + buffer.sequence = payload.Sequence + default: + } } } - return nil } -func (buffer *LinkReceiveBuffer) Remove(payload *Payload) { - buffer.tree.Remove(payload.Sequence) - buffer.sequence = payload.Sequence -} +func (buffer *LinkReceiveBuffer) NextPayload(closeNotify <-chan struct{}) *Payload { + select { + case payload := <-buffer.txQueue: + return payload + default: + } -func (buffer *LinkReceiveBuffer) getLastBufferSizeSent() uint32 { - return atomic.LoadUint32(&buffer.lastBufferSizeSent) -} + buffer.Lock() + buffer.queueNext() + buffer.Unlock() -func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail { - timeout := time.After(100 * time.Millisecond) - inspectEvent := &receiveBufferInspectEvent{ - buffer: buffer, - notifyComplete: make(chan *RecvBufferDetail, 1), + select { + case payload := <-buffer.txQueue: + return payload + case <-closeNotify: } - if x.dataPlane.GetPayloadIngester().inspect(inspectEvent, timeout) { - select { - case result := <-inspectEvent.notifyComplete: - return result - case <-timeout: - } + // closed, check if there's anything pending in the queue + select { + case payload := <-buffer.txQueue: + return payload + default: + return nil } - - return buffer.inspectIncomplete() } -func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail { +func (buffer *LinkReceiveBuffer) Inspect() *RecvBufferDetail { + buffer.Lock() + defer buffer.Unlock() + nextPayload := "none" if head := buffer.tree.LeftValue(); head != nil { payload := head.(*Payload) @@ -117,31 +131,9 @@ func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail { return &RecvBufferDetail{ Size: buffer.Size(), PayloadCount: uint32(buffer.tree.Size()), - LastSizeSent: buffer.getLastBufferSizeSent(), Sequence: buffer.sequence, MaxSequence: buffer.maxSequence, NextPayload: nextPayload, AcquiredSafely: true, } } - -func (buffer *LinkReceiveBuffer) inspectIncomplete() *RecvBufferDetail { - return &RecvBufferDetail{ - Size: buffer.Size(), - LastSizeSent: buffer.getLastBufferSizeSent(), - Sequence: buffer.sequence, - MaxSequence: buffer.maxSequence, - NextPayload: "unsafe to check", - AcquiredSafely: false, - } -} - -type receiveBufferInspectEvent struct { - buffer *LinkReceiveBuffer - notifyComplete chan *RecvBufferDetail -} - -func (self *receiveBufferInspectEvent) handle() { - result := self.buffer.inspectComplete() - self.notifyComplete <- result -} diff --git a/xgress/payload_ingester.go b/xgress/payload_ingester.go index 78361d8d..dc14092d 100644 --- a/xgress/payload_ingester.go +++ b/xgress/payload_ingester.go @@ -1,60 +1,56 @@ package xgress -import "time" - -type payloadEntry struct { - payload *Payload - x *Xgress -} +import ( + "fmt" + "github.com/michaelquigley/pfxlog" + "github.com/openziti/foundation/v2/goroutines" + "github.com/sirupsen/logrus" + "runtime/debug" + "time" +) type PayloadIngester struct { - payloadIngest chan *payloadEntry - payloadSendReq chan *Xgress - receiveBufferInspects chan *receiveBufferInspectEvent - closeNotify <-chan struct{} + pool goroutines.Pool } func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester { - pi := &PayloadIngester{ - payloadIngest: make(chan *payloadEntry, 16), - payloadSendReq: make(chan *Xgress, 16), - receiveBufferInspects: make(chan *receiveBufferInspectEvent, 4), - closeNotify: closeNotify, + return NewPayloadIngesterWithConfig(1, closeNotify) +} + +func NewPayloadIngesterWithConfig(maxWorkers uint32, closeNotify <-chan struct{}) *PayloadIngester { + if maxWorkers < 1 { + maxWorkers = 1 + } + poolConfig := goroutines.PoolConfig{ + QueueSize: uint32(64), + MinWorkers: 1, + MaxWorkers: maxWorkers, + IdleTime: 30 * time.Second, + CloseNotify: closeNotify, + PanicHandler: func(err interface{}) { + pfxlog.Logger().WithField(logrus.ErrorKey, err).WithField("backtrace", string(debug.Stack())).Error("panic during payload ingest") + }, + WorkerFunction: payloadIngesterWorker, } - go pi.run() + pool, err := goroutines.NewPool(poolConfig) + if err != nil { + panic(fmt.Errorf("error creating payload ingester handler pool (%w)", err)) + } + + pi := &PayloadIngester{ + pool: pool, + } return pi } -func (self *PayloadIngester) inspect(evt *receiveBufferInspectEvent, timeout <-chan time.Time) bool { - select { - case self.receiveBufferInspects <- evt: - return true - case <-self.closeNotify: - case <-timeout: - } - return false +func payloadIngesterWorker(_ uint32, f func()) { + f() } func (self *PayloadIngester) ingest(payload *Payload, x *Xgress) { - self.payloadIngest <- &payloadEntry{ - payload: payload, - x: x, - } -} - -func (self *PayloadIngester) run() { - for { - select { - case payloadEntry := <-self.payloadIngest: - payloadEntry.x.acceptPayload(payloadEntry.payload) - case x := <-self.payloadSendReq: - x.queueSends() - case evt := <-self.receiveBufferInspects: - evt.handle() - case <-self.closeNotify: - return - } - } + _ = self.pool.Queue(func() { + x.acceptPayload(payload) + }) } diff --git a/xgress/xgress.go b/xgress/xgress.go index 95d34038..f5379fac 100644 --- a/xgress/xgress.go +++ b/xgress/xgress.go @@ -126,7 +126,6 @@ type Xgress struct { peer Connection originator Originator Options *Options - txQueue chan *Payload closeNotify chan struct{} rxSequence uint64 rxSequenceLock sync.Mutex @@ -136,6 +135,7 @@ type Xgress struct { peekHandlers []PeekHandler flags concurrenz.AtomicBitSet tags map[string]string + lastBufferSizeSent uint32 } func (self *Xgress) GetDestinationType() string { @@ -158,10 +158,9 @@ func NewXgress(circuitId string, ctrlId string, address Address, peer Connection peer: peer, originator: originator, Options: options, - txQueue: make(chan *Payload, options.TxQueueSize), closeNotify: make(chan struct{}), rxSequence: 0, - linkRxBuffer: NewLinkReceiveBuffer(), + linkRxBuffer: NewLinkReceiveBuffer(options.TxQueueSize), timeOfLastRxFromLink: time.Now().UnixMilli(), tags: tags, } @@ -379,45 +378,6 @@ func (self *Xgress) acceptPayload(payload *Payload) { if !self.Options.RandomDrops || rand.Int31n(self.Options.Drop1InN) != 1 { self.PayloadReceived(payload) } - self.queueSends() -} - -func (self *Xgress) queueSends() { - payload := self.linkRxBuffer.PeekHead() - for payload != nil { - select { - case self.txQueue <- payload: - self.linkRxBuffer.Remove(payload) - payload = self.linkRxBuffer.PeekHead() - default: - payload = nil - } - } -} - -func (self *Xgress) nextPayload() *Payload { - select { - case payload := <-self.txQueue: - return payload - default: - } - - // nothing was available in the txQueue, request more, then wait on txQueue - self.dataPlane.GetPayloadIngester().payloadSendReq <- self - - select { - case payload := <-self.txQueue: - return payload - case <-self.closeNotify: - } - - // closed, check if there's anything pending in the queue - select { - case payload := <-self.txQueue: - return payload - default: - return nil - } } func (self *Xgress) tx() { @@ -440,7 +400,7 @@ func (self *Xgress) tx() { payloadLogger := log.WithFields(payload.GetLoggerFields()) payloadLogger.Debugf("payload %v of size %v removed from rx buffer, new size: %v", payload.Sequence, payloadSize, size) - lastBufferSizeSent := self.linkRxBuffer.getLastBufferSizeSent() + lastBufferSizeSent := self.getLastBufferSizeSent() if lastBufferSizeSent > 10000 && (lastBufferSizeSent>>1) > size { self.SendEmptyAck() } @@ -485,7 +445,7 @@ func (self *Xgress) tx() { var payloadWriteOffset int for { - payloadChunk = self.nextPayload() + payloadChunk = self.linkRxBuffer.NextPayload(self.closeNotify) if payloadChunk == nil { log.Debug("nil payload received, exiting") @@ -829,7 +789,7 @@ func (self *Xgress) PayloadReceived(payload *Payload) { ack.Sequence = append(ack.Sequence, payload.Sequence) ack.RTT = payload.RTT - atomic.StoreUint32(&self.linkRxBuffer.lastBufferSizeSent, ack.RecvBufferSize) + atomic.StoreUint32(&self.lastBufferSizeSent, ack.RecvBufferSize) self.dataPlane.ForwardAcknowledgement(ack, self.address) } else { log.Debug("dropped") @@ -840,7 +800,7 @@ func (self *Xgress) SendEmptyAck() { pfxlog.ContextLogger(self.Label()).WithField("circuit", self.circuitId).Debug("sending empty ack") ack := NewAcknowledgement(self.circuitId, self.originator) ack.RecvBufferSize = self.linkRxBuffer.Size() - atomic.StoreUint32(&self.linkRxBuffer.lastBufferSizeSent, ack.RecvBufferSize) + atomic.StoreUint32(&self.lastBufferSizeSent, ack.RecvBufferSize) self.dataPlane.ForwardAcknowledgement(ack, self.address) } @@ -850,6 +810,10 @@ func (self *Xgress) GetSequence() uint64 { return uint64(self.rxSequence) } +func (self *Xgress) getLastBufferSizeSent() uint32 { + return atomic.LoadUint32(&self.lastBufferSizeSent) +} + func (self *Xgress) InspectCircuit(detail *CircuitInspectDetail) { detail.AddXgressDetail(self.GetInspectDetail(detail.includeGoroutines)) } @@ -861,11 +825,12 @@ func (self *Xgress) GetInspectDetail(includeGoroutines bool) *InspectDetail { Originator: self.originator.String(), TimeSinceLastLinkRx: timeSinceLastRxFromLink.String(), SendBufferDetail: self.payloadBuffer.Inspect(), - RecvBufferDetail: self.linkRxBuffer.Inspect(self), + RecvBufferDetail: self.linkRxBuffer.Inspect(), XgressPointer: fmt.Sprintf("%p", self), LinkSendBufferPointer: fmt.Sprintf("%p", self.payloadBuffer), Sequence: self.GetSequence(), Flags: strconv.FormatUint(uint64(self.flags.Load()), 2), + LastSizeSent: self.getLastBufferSizeSent(), } if includeGoroutines { diff --git a/ziti/edge/conn.go b/ziti/edge/conn.go index 2b2ea137..b9864de7 100644 --- a/ziti/edge/conn.go +++ b/ziti/edge/conn.go @@ -103,6 +103,7 @@ type Conn interface { ServiceConn Identifiable GetRouterId() string + GetState() string CompleteAcceptSuccess() error CompleteAcceptFailed(err error) } diff --git a/ziti/edge/network/conn.go b/ziti/edge/network/conn.go index a88e63ff..1817049b 100644 --- a/ziti/edge/network/conn.go +++ b/ziti/edge/network/conn.go @@ -118,6 +118,15 @@ func (conn *edgeConn) CloseWrite() error { } func (conn *edgeConn) Inspect() string { + state := conn.getBaseState() + jsonOutput, err := json.Marshal(state) + if err != nil { + pfxlog.Logger().WithError(err).Error("unable to marshal inspect result") + } + return string(jsonOutput) +} + +func (conn *edgeConn) getBaseState() map[string]any { result := map[string]interface{}{} result["id"] = conn.Id() result["serviceName"] = conn.serviceName @@ -126,8 +135,17 @@ func (conn *edgeConn) Inspect() string { result["encrypted"] = conn.rxKey != nil || conn.receiver != nil result["readFIN"] = conn.readFIN.Load() result["sentFIN"] = conn.sentFIN.Load() + result["marker"] = conn.marker + result["circuitId"] = conn.circuitId + return result +} - jsonOutput, err := json.Marshal(result) +func (conn *edgeConn) GetState() string { + state := conn.getBaseState() + if conn.xgCircuit != nil && conn.xgCircuit.xg != nil { + state["xg"] = conn.xgCircuit.xg.GetInspectDetail(true) + } + jsonOutput, err := json.Marshal(state) if err != nil { pfxlog.Logger().WithError(err).Error("unable to marshal inspect result") } diff --git a/ziti/edge/network/hosting_conn.go b/ziti/edge/network/hosting_conn.go index fc83765e..90ff1302 100644 --- a/ziti/edge/network/hosting_conn.go +++ b/ziti/edge/network/hosting_conn.go @@ -153,7 +153,7 @@ func (conn *edgeHostConn) newChildConnection(message *channel.Message) { WithField("connId", id). WithField("parentConnId", conn.Id()). WithField("token", token). - WithField("circuitId", token) + WithField("circuitId", circuitId) err := conn.msgMux.AddMsgSink(edgeCh) // duplicate errors only happen on the server side, since client controls ids if err != nil { diff --git a/ziti/xg_env.go b/ziti/xg_env.go index dbe0c0d0..41a9c066 100644 --- a/ziti/xg_env.go +++ b/ziti/xg_env.go @@ -14,7 +14,7 @@ type xgEnv struct { func NewXgressEnv(closeNotify <-chan struct{}, registry metrics.Registry) xgress.Env { return &xgEnv{ retransmitter: xgress.NewRetransmitter(dummyRetransmitterFaultReporter{}, registry, closeNotify), - payloadIngester: xgress.NewPayloadIngester(closeNotify), + payloadIngester: xgress.NewPayloadIngesterWithConfig(5, closeNotify), metrics: xgress.NewMetrics(registry), } }