Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions xgress/circuit_inspections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright NetFoundry Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

type CircuitInspectDetail struct {
CircuitId string `json:"circuitId"`
Forwards map[string]string `json:"forwards"`
XgressDetails map[string]*InspectDetail `json:"xgressDetails"`
LinkDetails map[string]*LinkInspectDetail `json:"linkDetails"`
includeGoroutines bool
}

func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool) {
self.includeGoroutines = includeGoroutines
}

func (self *CircuitInspectDetail) IncludeGoroutines() bool {
return self.includeGoroutines
}

func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail) {
self.XgressDetails[xgressDetail.Address] = xgressDetail
}

func (self *CircuitInspectDetail) AddLinkDetail(linkDetail *LinkInspectDetail) {
self.LinkDetails[linkDetail.Id] = linkDetail
}

type InspectDetail struct {
Address string `json:"address"`
Originator string `json:"originator"`
TimeSinceLastLinkRx string `json:"timeSinceLastLinkRx"`
SendBufferDetail *SendBufferDetail `json:"sendBufferDetail"`
RecvBufferDetail *RecvBufferDetail `json:"recvBufferDetail"`
XgressPointer string `json:"xgressPointer"`
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
Goroutines []string `json:"goroutines"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
}

type SendBufferDetail struct {
WindowSize uint32 `json:"windowSize"`
LinkSendBufferSize uint32 `json:"linkSendBufferSize"`
LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"`
Accumulator uint32 `json:"accumulator"`
SuccessfulAcks uint32 `json:"successfulAcks"`
DuplicateAcks uint32 `json:"duplicateAcks"`
Retransmits uint32 `json:"retransmits"`
Closed bool `json:"closed"`
BlockedByLocalWindow bool `json:"blockedByLocalWindow"`
BlockedByRemoteWindow bool `json:"blockedByRemoteWindow"`
RetxScale float64 `json:"retxScale"`
RetxThreshold uint32 `json:"retxThreshold"`
TimeSinceLastRetx string `json:"timeSinceLastRetx"`
CloseWhenEmpty bool `json:"closeWhenEmpty"`
AcquiredSafely bool `json:"acquiredSafely"`
}

type RecvBufferDetail struct {
Size uint32 `json:"size"`
PayloadCount uint32 `json:"payloadCount"`
LastSizeSent uint32 `json:"lastSizeSent"`
Sequence int32 `json:"sequence"`
MaxSequence int32 `json:"maxSequence"`
NextPayload string `json:"nextPayload"`
AcquiredSafely bool `json:"acquiredSafely"`
}

type LinkInspectDetail struct {
Id string `json:"id"`
Iteration uint32 `json:"iteration"`
Key string `json:"key"`
Split bool `json:"split"`
Protocol string `json:"protocol"`
DialAddress string `json:"dialAddress"`
Dest string `json:"dest"`
DestVersion string `json:"destVersion"`
Dialed bool `json:"dialed"`
}
121 changes: 121 additions & 0 deletions xgress/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright NetFoundry Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v4"
)

type Decoder struct{}

const DECODER = "data"

func (d Decoder) Decode(msg *channel.Message) ([]byte, bool) {
switch msg.ContentType {
case int32(ContentTypePayloadType):
if payload, err := UnmarshallPayload(msg); err == nil {
return DecodePayload(payload)
} else {
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling payload msg")
}

case int32(ContentTypeAcknowledgementType):
if ack, err := UnmarshallAcknowledgement(msg); err == nil {
meta := channel.NewTraceMessageDecode(DECODER, "Acknowledgement")
meta["circuitId"] = ack.CircuitId
meta["sequence"] = fmt.Sprintf("len(%d)", len(ack.Sequence))
switch ack.GetOriginator() {
case Initiator:
meta["originator"] = "i"
case Terminator:
meta["originator"] = "e"
}

data, err := meta.MarshalTraceMessageDecode()
if err != nil {
return nil, true
}

return data, true

} else {
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling ack msg")
}
case int32(ContentTypeControlType):
if control, err := UnmarshallControl(msg); err == nil {
meta := channel.NewTraceMessageDecode(DECODER, "Control")
meta["circuitId"] = control.CircuitId
meta["type"] = control.Type.String()
if control.Type == ControlTypeTraceRoute || control.Type == ControlTypeTraceRouteResponse {
if ts, found := msg.GetUint64Header(ControlTimestamp); found {
meta["ts"] = ts
}
if hop, found := msg.GetUint32Header(ControlHopCount); found {
meta["hopCount"] = hop
}
if hopType, found := msg.GetStringHeader(ControlHopType); found {
meta["hopType"] = hopType
}
if hopId, found := msg.GetStringHeader(ControlHopId); found {
meta["hopId"] = hopId
}
if userVal, found := msg.GetUint32Header(ControlUserVal); found {
meta["uv"] = userVal
}
if hopErr, found := msg.GetUint32Header(ControlError); found {
meta["err"] = hopErr
}
}
data, err := meta.MarshalTraceMessageDecode()
if err != nil {
return nil, true
}

return data, true

} else {
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling control msg")
}
}

return nil, false
}

func DecodePayload(payload *Payload) ([]byte, bool) {
meta := channel.NewTraceMessageDecode(DECODER, "Payload")
meta["circuitId"] = payload.CircuitId
meta["sequence"] = payload.Sequence
switch payload.GetOriginator() {
case Initiator:
meta["originator"] = "i"
case Terminator:
meta["originator"] = "e"
}
if payload.Flags != 0 {
meta["flags"] = payload.Flags
}
meta["length"] = len(payload.Data)

data, err := meta.MarshalTraceMessageDecode()
if err != nil {
return nil, true
}

return data, true
}
38 changes: 38 additions & 0 deletions xgress/heartbeat_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright NetFoundry Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

import (
"encoding/binary"
"github.com/openziti/channel/v4"
"time"
)

type PayloadTransformer struct {
}

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {}

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) {
if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 {
if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 {
now := time.Now().UnixNano()
m.PutUint64Header(channel.HeartbeatHeader, uint64(now))
binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now))
}
}
}
Loading
Loading