Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Add basic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Dec 22, 2021
1 parent 10a574d commit b9a5607
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 7 deletions.
81 changes: 81 additions & 0 deletions crew/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package crew

import (
"sync/atomic"

"github.com/safing/portbase/api"
"github.com/safing/portbase/metrics"
"github.com/tevino/abool"
)

var (
connectOpDurationHistogram *metrics.Histogram
connectOpDownloadDataHistogram *metrics.Histogram
connectOpUploadDataHistogram *metrics.Histogram

metricsRegistered = abool.New()
)

func registerMetrics() (err error) {
// Only register metrics once.
if !metricsRegistered.SetToIf(false, true) {
return nil
}

// Connect Op Stats.

_, err = metrics.NewGauge(
"spn/op/connect/active/total",
nil,
getActiveConnectOpsStat,
&metrics.Options{
Name: "SPN Active Connect Operations",
Permission: api.PermitUser,
},
)
if err != nil {
return err
}

connectOpDurationHistogram, err = metrics.NewHistogram(
"spn/op/connect/duration/seconds",
nil,
&metrics.Options{
Name: "SPN Connect Operation Duration",
Permission: api.PermitUser,
},
)
if err != nil {
return err
}

connectOpDownloadDataHistogram, err = metrics.NewHistogram(
"spn/op/connect/download/bytes",
nil,
&metrics.Options{
Name: "SPN Connect Operation Downloaded Data",
Permission: api.PermitUser,
},
)
if err != nil {
return err
}

connectOpUploadDataHistogram, err = metrics.NewHistogram(
"spn/op/connect/upload/bytes",
nil,
&metrics.Options{
Name: "SPN Connect Operation Uploaded Data",
Permission: api.PermitUser,
},
)
if err != nil {
return err
}

return err
}

func getActiveConnectOpsStat() float64 {
return float64(atomic.LoadInt64(activeConnectOps))
}
6 changes: 5 additions & 1 deletion crew/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ import (
var module *modules.Module

func init() {
module = modules.Register("crew", nil, nil, nil, "navigator", "intel", "cabin")
module = modules.Register("crew", nil, start, nil, "navigator", "intel", "cabin")
}

func start() error {
return registerMetrics()
}
34 changes: 34 additions & 0 deletions crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"sync/atomic"
"time"

"github.com/safing/portbase/container"
Expand All @@ -20,6 +21,10 @@ import (

const ConnectOpType string = "connect"

var (
activeConnectOps = new(int64)
)

type ConnectOp struct {
terminal.OpBase
*terminal.DuplexFlowQueue
Expand All @@ -29,6 +34,9 @@ type ConnectOp struct {
// cancelCtx cancels ctx.
cancelCtx context.CancelFunc

downloaded *uint64
uploaded *uint64

t terminal.OpTerminal
conn net.Conn
entry bool
Expand Down Expand Up @@ -160,6 +168,10 @@ func runConnectOp(t terminal.OpTerminal, opID uint32, data *container.Container)
op.ctx, op.cancelCtx = context.WithCancel(context.Background())
op.DuplexFlowQueue = terminal.NewDuplexFlowQueue(op, request.QueueSize, op.submitUpstream)

// Setup metrics.
op.downloaded = new(uint64)
op.uploaded = new(uint64)

// Start worker.
module.StartWorker("connect op conn reader", op.connReader)
module.StartWorker("connect op conn writer", op.connWriter)
Expand All @@ -177,6 +189,18 @@ func (op *ConnectOp) submitUpstream(c *container.Container) {
}

func (op *ConnectOp) connReader(_ context.Context) error {
// Metrics setup and submitting.
if !op.entry {
atomic.AddInt64(activeConnectOps, 1)
started := time.Now()
defer func() {
atomic.AddInt64(activeConnectOps, -1)
connectOpDurationHistogram.UpdateDuration(started)
connectOpDownloadDataHistogram.Update(float64(atomic.LoadUint64(op.downloaded)))
connectOpUploadDataHistogram.Update(float64(atomic.LoadUint64(op.uploaded)))
}()
}

for {
buf := make([]byte, 1500)
n, err := op.conn.Read(buf)
Expand All @@ -193,6 +217,11 @@ func (op *ConnectOp) connReader(_ context.Context) error {
continue
}

// Count downloaded data for metrics on the Hub.
if !op.entry {
atomic.AddUint64(op.downloaded, uint64(n))
}

tErr := op.DuplexFlowQueue.Send(container.New(buf[:n]))
if tErr != nil {
op.t.OpEnd(op, tErr.Wrap("failed to send data (dfq) read from %s", op.connectedType()))
Expand Down Expand Up @@ -228,6 +257,11 @@ writing:
continue writing
}

// Count uploaded data for metrics on the Hub.
if !op.entry {
atomic.AddUint64(op.uploaded, uint64(len(data)))
}

// Send all given data.
for {
n, err := op.conn.Write(data)
Expand Down
13 changes: 11 additions & 2 deletions docks/crane.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ func (crane *Crane) unloadUntilFull(buf []byte) error {

// Return if buffer has been fully filled.
if bytesRead == len(buf) {
// Submit metrics.
totalIncomingTraffic.Add(bytesRead)

return nil
}
}
Expand Down Expand Up @@ -654,9 +657,15 @@ func (crane *Crane) load(c *container.Container) error {
return fmt.Errorf("failed to encrypt: %w", err)
}

// Load onto ship.
// Finalize data.
c.PrependLength()
err = crane.ship.Load(c.CompileData())
readyToSend := c.CompileData()

// Submit metrics.
totalOutgoingTraffic.Add(len(readyToSend))

// Load onto ship.
err = crane.ship.Load(readyToSend)
if err != nil {
return fmt.Errorf("failed to load ship: %w", err)
}
Expand Down
Loading

0 comments on commit b9a5607

Please sign in to comment.