Skip to content

Commit

Permalink
remove farbuffer for bess and reset notify flag fix
Browse files Browse the repository at this point in the history
  • Loading branch information
badhri85 committed Sep 15, 2021
1 parent 164bdfd commit 9e20334
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 35 deletions.
27 changes: 26 additions & 1 deletion pfcpiface/bess.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const (
SockAddr = "/tmp/notifycp"
// PfcpAddr : Unix Socket path to send end marker packet.
PfcpAddr = "/tmp/pfcpport"
// far-action specific values.
farForwardD = 0x0
farForwardU = 0x1
farDrop = 0x2
farBuffer = 0x3
farNotify = 0x4
)

const (
Expand Down Expand Up @@ -868,14 +874,33 @@ func (b *bess) processFAR(ctx context.Context, any *anypb.Any, method upfMsgType
}
}

func (b *bess) setActionValue(f far) uint8 {
if (f.applyAction & ActionForward) != 0 {
if f.dstIntf == ie.DstInterfaceAccess {
return farForwardD
} else if (f.dstIntf == ie.DstInterfaceCore) || (f.dstIntf == ie.DstInterfaceSGiLANN6LAN) {
return farForwardU
}
} else if (f.applyAction & ActionDrop) != 0 {
return farDrop
} else if (f.applyAction & ActionBuffer) != 0 {
return farNotify
} else if (f.applyAction & ActionNotify) != 0 {
return farNotify
}

// default action
return farDrop
}

func (b *bess) addFAR(ctx context.Context, done chan<- bool, far far) {
go func() {
var (
any *anypb.Any
err error
)

action := far.setActionValue()
action := b.setActionValue(far)
f := &pb.ExactMatchCommandAddArg{
Gate: uint64(far.tunnelType),
Fields: []*pb.FieldData{
Expand Down
36 changes: 28 additions & 8 deletions pfcpiface/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/wmnsk/go-pfcp/message"
)

const (
// Default timeout for DDN.
DefaultDDNTimeout = 20
)

func handleHeartbeatRequest(msg message.Message, addr net.Addr, rTime time.Time) []byte {
hbreq, ok := msg.(*message.HeartbeatRequest)
if !ok {
Expand Down Expand Up @@ -570,15 +575,15 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa
addQERs = append(addQERs, q)
}

if session.getNotifyFlag() {
session.updateNotifyFlag()
}

cause := upf.sendMsgToUPF(upfMsgTypeMod, addPDRs, addFARs, addQERs)
if cause == ie.CauseRequestRejected {
return sendError(errors.New("write to FastPath failed"))
}

if session.getNotifyFlag() {
session.updateNotifyFlag()
}

if upf.enableEndMarker {
err := upf.sendEndMarkers(&endMarkerList)
if err != nil {
Expand Down Expand Up @@ -831,13 +836,11 @@ func handleDigestReport(fseid uint64,
return
}

/* Number of Outstanding Notifies per session is 1 */
/* Check if notify is already sent in current time interval */
if session.getNotifyFlag() {
return
}

session.setNotifyFlag(true)

seq := pfcpConn.getSeqNum()
serep := message.NewSessionReportRequest(0, /* MO?? <-- what's this */
0, /* FO <-- what's this? */
Expand All @@ -850,20 +853,37 @@ func handleDigestReport(fseid uint64,

var pdrID uint32

var farID uint32

for _, pdr := range session.pdrs {
if pdr.srcIface == core {
pdrID = pdr.pdrID

farID = pdr.farID

break
}
}

log.Println("Pdr iD : ", pdrID)
for _, far := range session.fars {
if far.farID == farID {
if far.applyAction&ActionNotify == 0 {
log.Println("packet recieved for forwarding far. discard")
return
}
}
}

if pdrID == 0 {
log.Println("No Pdr found for downlink")

return
}

go session.runTimerForDDNNotify(DefaultDDNTimeout)

session.setNotifyFlag(true)

serep.DownlinkDataReport = ie.NewDownlinkDataReport(
ie.NewPDRID(uint16(pdrID)))

Expand Down
19 changes: 0 additions & 19 deletions pfcpiface/parse-far.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,6 @@ func (f far) String() string {
return b.String()
}

func (f *far) setActionValue() uint8 {
if (f.applyAction & ActionForward) != 0 {
if f.dstIntf == ie.DstInterfaceAccess {
return farForwardD
} else if (f.dstIntf == ie.DstInterfaceCore) || (f.dstIntf == ie.DstInterfaceSGiLANN6LAN) {
return farForwardU
}
} else if (f.applyAction & ActionDrop) != 0 {
return farDrop
} else if (f.applyAction & ActionBuffer) != 0 {
return farBuffer
} else if (f.applyAction & ActionNotify) != 0 {
return farNotify
}

// default action
return farDrop
}

func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error {
f.fseID = (fseid)

Expand Down
15 changes: 15 additions & 0 deletions pfcpiface/session-far.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"errors"
"net"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand Down Expand Up @@ -101,6 +102,20 @@ func (s *PFCPSession) getNotifyFlag() bool {
return s.notificationFlag.flag
}

func (s *PFCPSession) runTimerForDDNNotify(timeout time.Duration) {
endTime := time.After(timeout)

for {
select {
case <-endTime:
log.Println("DDN notify send timeout")
s.setNotifyFlag(false)

return
}
}
}

// UpdateFAR updates existing far in the session.
func (s *PFCPSession) updateNotifyFlag() {
unset := true
Expand Down
7 changes: 0 additions & 7 deletions pfcpiface/upf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ const (
n3 = 0x0
n6 = 0x1
n9 = 0x2

// far-action specific values.
farForwardD = 0x0
farForwardU = 0x1
farDrop = 0x2
farBuffer = 0x3
farNotify = 0x4
)

func (u *upf) sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 {
Expand Down

0 comments on commit 9e20334

Please sign in to comment.