Skip to content

Commit

Permalink
Move collection code into bess
Browse files Browse the repository at this point in the history
  • Loading branch information
pudelkoM committed Jan 13, 2022
1 parent 94b008e commit ee04d04
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 102 deletions.
93 changes: 76 additions & 17 deletions pfcpiface/bess.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/binary"
"flag"
"net"
"strconv"
"time"

"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -415,11 +416,7 @@ func (b *bess) readFlowMeasurement(
return
}

func (b *bess) sessionStats(uc *upfCollector, ch chan<- prometheus.Metric) error {
return nil
}

func (b *bess) sessionStats2() (s []sessionInfo, err error) {
func (b *bess) sessionStats(pc *PfcpNodeCollector, ch chan<- prometheus.Metric) (err error) {
// Clearing table data with large tables is slow, let's wait for a little longer since this is
// non-blocking for the dataplane anyway.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down Expand Up @@ -452,6 +449,13 @@ func (b *bess) sessionStats2() (s []sessionInfo, err error) {
return
}

// TODO: pick first connection for now
var con *PFCPConn
for _, c := range pc.node.pConns {
con = c
break
}

// Prepare session stats.
createStats := func(preResp, postResp *pb.FlowMeasureReadResponse) {
for i := 0; i < len(postResp.Statistics); i++ {
Expand All @@ -471,26 +475,81 @@ func (b *bess) sessionStats2() (s []sessionInfo, err error) {
continue
}

si := sessionInfo{
Fseid: post.Fseid,
Pdr: post.Pdr,
TxPackets: post.TotalPackets,
RxPackets: pre.TotalPackets,
TxBytes: post.TotalBytes,
RxBytes: pre.TotalBytes,
Latency: map[float64]float64{
fseidString := strconv.FormatUint(pre.Fseid, 10)
pdrString := strconv.FormatUint(pre.Pdr, 10)
ueIpString := "unknown"

if con != nil {
session, ok := con.sessions[pre.Fseid]
if !ok {
log.Errorln("Invalid or unknown FSEID", pre.Fseid)
continue
}
for _, p := range session.pdrs {
if uint64(p.pdrID) != pre.Pdr {
continue
}
// Only downlink PDRs contain the UE address.
if p.srcIP > 0 {
ueIpString = int2ip(p.srcIP).String()
log.Warnln(p.fseID, " -> ", ueIpString)
break
}
}
} else {
log.Warnln("No active PFCP connection, IP lookup disabled")
}

ch <- prometheus.MustNewConstMetric(
pc.sessionTxPackets,
prometheus.GaugeValue,
float64(post.TotalPackets), // check if uint possible
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstMetric(
pc.sessionRxPackets,
prometheus.GaugeValue,
float64(pre.TotalPackets),
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstMetric(
pc.sessionTxBytes,
prometheus.GaugeValue,
float64(post.TotalBytes),
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstSummary(
pc.sessionLatency,
post.TotalPackets,
0,
map[float64]float64{
q[0]: float64(post.Latency.PercentileValuesNs[0]),
q[1]: float64(post.Latency.PercentileValuesNs[1]),
q[2]: float64(post.Latency.PercentileValuesNs[2]),
},
Jitter: map[float64]float64{
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstSummary(
pc.sessionJitter,
post.TotalPackets,
0,
map[float64]float64{
q[0]: float64(post.Jitter.PercentileValuesNs[0]),
q[1]: float64(post.Jitter.PercentileValuesNs[1]),
q[2]: float64(post.Jitter.PercentileValuesNs[2]),
},
}

s = append(s, si)
fseidString,
pdrString,
ueIpString,
)
}
}

Expand Down
3 changes: 1 addition & 2 deletions pfcpiface/fastpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,5 @@ type fastPath interface {
isConnected(accessIP *net.IP) bool
summaryLatencyJitter(uc *upfCollector, ch chan<- prometheus.Metric)
portStats(uc *upfCollector, ch chan<- prometheus.Metric)
sessionStats(uc *upfCollector, ch chan<- prometheus.Metric) error
sessionStats2() ([]sessionInfo, error)
sessionStats(pc *PfcpNodeCollector, ch chan<- prometheus.Metric) error
}
79 changes: 1 addition & 78 deletions pfcpiface/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"github.com/prometheus/client_golang/prometheus"
"net"
"strconv"

reuse "github.com/libp2p/go-reuseport"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -185,85 +184,9 @@ func (col PfcpNodeCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (col PfcpNodeCollector) Collect(ch chan<- prometheus.Metric) {
stats, err := col.node.upf.sessionStats2()
err := col.node.upf.sessionStats(&col, ch)
if err != nil {
log.Errorln(err)
return
}
// TODO: pick first connection for now
var con *PFCPConn
for _, c := range col.node.pConns {
con = c
break
}

for _, s := range stats {
fseidString := strconv.FormatUint(s.Fseid, 10)
pdrString := strconv.FormatUint(s.Pdr, 10)
ueIpString := "unknown"

if con != nil {
session, ok := con.sessions[s.Fseid]
if !ok {
log.Errorln("Invalid or unknown FSEID in session info", s)
continue
}
for _, p := range session.pdrs {
if uint64(p.pdrID) != s.Pdr {
continue
}
// Only downlink PDRs contain the UE address.
if p.srcIP > 0 {
ueIpString = int2ip(p.srcIP).String()
log.Warnln(p.fseID, " -> ", ueIpString)
break
}
}
} else {
log.Warnln("No active PFCP connection, IP lookup disabled")
}

ch <- prometheus.MustNewConstMetric(
col.sessionTxPackets,
prometheus.GaugeValue,
float64(s.TxPackets), // check if uint possible
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstMetric(
col.sessionRxPackets,
prometheus.GaugeValue,
float64(s.RxPackets),
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstMetric(
col.sessionTxBytes,
prometheus.GaugeValue,
float64(s.TxBytes),
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstSummary(
col.sessionLatency,
s.TxPackets,
0,
s.Latency,
fseidString,
pdrString,
ueIpString,
)
ch <- prometheus.MustNewConstSummary(
col.sessionJitter,
s.TxPackets,
0,
s.Jitter,
fseidString,
pdrString,
ueIpString,
)
}
}
6 changes: 1 addition & 5 deletions pfcpiface/up4.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,10 @@ func (up4 *UP4) addSliceInfo(sliceInfo *SliceInfo) error {
func (up4 *UP4) summaryLatencyJitter(uc *upfCollector, ch chan<- prometheus.Metric) {
}

func (up4 *UP4) sessionStats(uc *upfCollector, ch chan<- prometheus.Metric) error {
func (up4 *UP4) sessionStats(*PfcpNodeCollector, chan<- prometheus.Metric) error {
return nil
}

func (up4 *UP4) sessionStats2() (s []sessionInfo, err error) {
return
}

func (up4 *UP4) portStats(uc *upfCollector, ch chan<- prometheus.Metric) {
}

Expand Down

0 comments on commit ee04d04

Please sign in to comment.