Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
## Issues Fixed and Dependency Updates

* github.com/openziti/sdk-golang: [v1.1.2 -> v1.1.3](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.1.3)
* [Issue #763](https://github.com/openziti/sdk-golang/issues/763) - Use a go-routine pool for payload ingest
* [Issue #761](https://github.com/openziti/sdk-golang/issues/761) - Use cmap.ConcurrentMap for message multiplexer
* [Issue #754](https://github.com/openziti/sdk-golang/issues/754) - panic: unaligned 64-bit atomic operation when running on 32-bit raspberry pi
* [Issue #757](https://github.com/openziti/sdk-golang/issues/757) - Not authenticated check fails on session create when using OIDC

* github.com/golang-jwt/jwt/v5: v5.2.2 -> v5.2.3
* github.com/openziti/channel/v4: [v4.2.0 -> v4.2.15](https://github.com/openziti/channel/compare/v4.2.0...v4.2.15)
* [Issue #194](https://github.com/openziti/channel/issues/194) - Add GetUnderlays and GetUnderlayCountsByType to Channel

Expand All @@ -27,7 +30,6 @@
* golang.org/x/term: v0.32.0 -> v0.33.0
* golang.org/x/text: v0.25.0 -> v0.27.0


# Release notes 1.1.2

## Issues Fixed and Dependency Updates
Expand Down
6 changes: 3 additions & 3 deletions edge-apis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand/v2"
"net"
"net/url"
"slices"
"sync/atomic"
"time"
)
Expand All @@ -35,7 +36,7 @@ type ApiClientTransport struct {

// ClientTransportPool abstracts the concept of multiple `runtime.ClientTransport` (openapi interface) representing one
// target OpenZiti network. In situations where controllers are running in HA mode (multiple controllers) this
// interface can attempt to try different controller during outages or partitioning.
// interface can attempt to try a different controller during outages or partitioning.
type ClientTransportPool interface {
runtime.ClientTransport

Expand Down Expand Up @@ -265,6 +266,5 @@ func selectAndRemoveRandom[T any](slice []T, zero T) (selected T, modifiedSlice
rng := rand.New(rand.NewPCG(seed, seed))
index := rng.IntN(len(slice))
selected = slice[index]
modifiedSlice = append(slice[:index], slice[index+1:]...)
return selected, modifiedSlice
return selected, slices.Delete(slice, index, index+1)
}
4 changes: 2 additions & 2 deletions example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ require (
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-openapi/validate v0.24.0 // indirect
github.com/go-resty/resty/v2 v2.16.5 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 // indirect
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b // indirect
github.com/gorilla/schema v1.4.1 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
Expand Down
8 changes: 4 additions & 4 deletions example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptd
github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0=
github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -191,8 +191,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/gomarkdown/markdown v0.0.0-20191123064959-2c17d62f5098/go.mod h1:aii0r/K0ZnHv7G0KF7xy1v0A7s2Ljrb5byB7MO5p6TU=
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 h1:EcQR3gusLHN46TAD+G+EbaaqJArt5vHhNpXAa12PQf4=
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b h1:EY/KpStFl60qA17CptGXhwfZ+k1sFNJIUNR8DdbcuUk=
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down
4 changes: 2 additions & 2 deletions example/influxdb-client-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ require (
github.com/go-playground/validator/v10 v10.15.4 // indirect
github.com/go-resty/resty/v2 v2.16.5 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 // indirect
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/css v1.0.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions example/influxdb-client-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0=
github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -213,8 +213,8 @@ github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 h1:EcQR3gusLHN46TAD+G+EbaaqJArt5vHhNpXAa12PQf4=
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b h1:EY/KpStFl60qA17CptGXhwfZ+k1sFNJIUNR8DdbcuUk=
github.com/gomarkdown/markdown v0.0.0-20250311123330-531bef5e742b/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/go-openapi/runtime v0.28.0
github.com/go-openapi/strfmt v0.23.0
github.com/go-resty/resty/v2 v2.16.5
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/golang-jwt/jwt/v5 v5.2.3
github.com/google/uuid v1.6.0
github.com/kataras/go-events v0.0.3
github.com/michaelquigley/pfxlog v0.6.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptd
github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0=
github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
3 changes: 2 additions & 1 deletion xgress/circuit_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type InspectDetail struct {
Goroutines []string `json:"goroutines"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
LastSizeSent uint32 `json:"lastSizeSent"`
}

type SendBufferDetail struct {
WindowSize uint32 `json:"windowSize"`
QueuedPayloadCount int `json:"queuedPayloadCount"`
LinkSendBufferSize uint32 `json:"linkSendBufferSize"`
LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"`
Accumulator uint32 `json:"accumulator"`
Expand All @@ -85,7 +87,6 @@ type SendBufferDetail struct {
type RecvBufferDetail struct {
Size uint32 `json:"size"`
PayloadCount uint32 `json:"payloadCount"`
LastSizeSent uint32 `json:"lastSizeSent"`
Sequence int32 `json:"sequence"`
MaxSequence int32 `json:"maxSequence"`
NextPayload string `json:"nextPayload"`
Expand Down
98 changes: 45 additions & 53 deletions xgress/link_receive_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import (
"github.com/emirpasic/gods/trees/btree"
"github.com/emirpasic/gods/utils"
"github.com/michaelquigley/pfxlog"
"sync"
"sync/atomic"
"time"
)

type LinkReceiveBuffer struct {
tree *btree.Tree
sequence int32
maxSequence int32
size uint32
lastBufferSizeSent uint32
sync.Mutex
tree *btree.Tree
sequence int32
maxSequence int32
size uint32
txQueue chan *Payload
}

func NewLinkReceiveBuffer() *LinkReceiveBuffer {
func NewLinkReceiveBuffer(txQueueSize int32) *LinkReceiveBuffer {
return &LinkReceiveBuffer{
tree: btree.NewWith(10240, utils.Int32Comparator),
sequence: -1,
txQueue: make(chan *Payload, txQueueSize),
}
}

Expand All @@ -45,6 +47,9 @@ func (buffer *LinkReceiveBuffer) Size() uint32 {
}

func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool {
buffer.Lock()
defer buffer.Unlock()

if payload.GetSequence() <= buffer.sequence {
x.dataPlane.GetMetrics().MarkDuplicatePayload()
return true
Expand All @@ -67,47 +72,56 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, m
} else {
x.dataPlane.GetMetrics().MarkDuplicatePayload()
}

buffer.queueNext()

return true
}

func (buffer *LinkReceiveBuffer) PeekHead() *Payload {
func (buffer *LinkReceiveBuffer) queueNext() {
if val := buffer.tree.LeftValue(); val != nil {
payload := val.(*Payload)
if payload.Sequence == buffer.sequence+1 {
return payload
select {
case buffer.txQueue <- payload:
buffer.tree.Remove(payload.Sequence)
buffer.sequence = payload.Sequence
default:
}
}
}
return nil
}

func (buffer *LinkReceiveBuffer) Remove(payload *Payload) {
buffer.tree.Remove(payload.Sequence)
buffer.sequence = payload.Sequence
}
func (buffer *LinkReceiveBuffer) NextPayload(closeNotify <-chan struct{}) *Payload {
select {
case payload := <-buffer.txQueue:
return payload
default:
}

func (buffer *LinkReceiveBuffer) getLastBufferSizeSent() uint32 {
return atomic.LoadUint32(&buffer.lastBufferSizeSent)
}
buffer.Lock()
buffer.queueNext()
buffer.Unlock()

func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail {
timeout := time.After(100 * time.Millisecond)
inspectEvent := &receiveBufferInspectEvent{
buffer: buffer,
notifyComplete: make(chan *RecvBufferDetail, 1),
select {
case payload := <-buffer.txQueue:
return payload
case <-closeNotify:
}

if x.dataPlane.GetPayloadIngester().inspect(inspectEvent, timeout) {
select {
case result := <-inspectEvent.notifyComplete:
return result
case <-timeout:
}
// closed, check if there's anything pending in the queue
select {
case payload := <-buffer.txQueue:
return payload
default:
return nil
}

return buffer.inspectIncomplete()
}

func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail {
func (buffer *LinkReceiveBuffer) Inspect() *RecvBufferDetail {
buffer.Lock()
defer buffer.Unlock()

nextPayload := "none"
if head := buffer.tree.LeftValue(); head != nil {
payload := head.(*Payload)
Expand All @@ -117,31 +131,9 @@ func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail {
return &RecvBufferDetail{
Size: buffer.Size(),
PayloadCount: uint32(buffer.tree.Size()),
LastSizeSent: buffer.getLastBufferSizeSent(),
Sequence: buffer.sequence,
MaxSequence: buffer.maxSequence,
NextPayload: nextPayload,
AcquiredSafely: true,
}
}

func (buffer *LinkReceiveBuffer) inspectIncomplete() *RecvBufferDetail {
return &RecvBufferDetail{
Size: buffer.Size(),
LastSizeSent: buffer.getLastBufferSizeSent(),
Sequence: buffer.sequence,
MaxSequence: buffer.maxSequence,
NextPayload: "unsafe to check",
AcquiredSafely: false,
}
}

type receiveBufferInspectEvent struct {
buffer *LinkReceiveBuffer
notifyComplete chan *RecvBufferDetail
}

func (self *receiveBufferInspectEvent) handle() {
result := self.buffer.inspectComplete()
self.notifyComplete <- result
}
Loading
Loading