From 9e2033469b21dd1a40adca3cfd475ebdf13deb2b Mon Sep 17 00:00:00 2001 From: badhri85 Date: Tue, 7 Sep 2021 21:13:31 -0500 Subject: [PATCH] remove farbuffer for bess and reset notify flag fix --- pfcpiface/bess.go | 27 ++++++++++++++++++++++++++- pfcpiface/messages.go | 36 ++++++++++++++++++++++++++++-------- pfcpiface/parse-far.go | 19 ------------------- pfcpiface/session-far.go | 15 +++++++++++++++ pfcpiface/upf.go | 7 ------- 5 files changed, 69 insertions(+), 35 deletions(-) diff --git a/pfcpiface/bess.go b/pfcpiface/bess.go index 29ac07fd2..bb4270486 100644 --- a/pfcpiface/bess.go +++ b/pfcpiface/bess.go @@ -26,6 +26,12 @@ const ( SockAddr = "/tmp/notifycp" // PfcpAddr : Unix Socket path to send end marker packet. PfcpAddr = "/tmp/pfcpport" + // far-action specific values. + farForwardD = 0x0 + farForwardU = 0x1 + farDrop = 0x2 + farBuffer = 0x3 + farNotify = 0x4 ) const ( @@ -868,6 +874,25 @@ func (b *bess) processFAR(ctx context.Context, any *anypb.Any, method upfMsgType } } +func (b *bess) setActionValue(f far) uint8 { + if (f.applyAction & ActionForward) != 0 { + if f.dstIntf == ie.DstInterfaceAccess { + return farForwardD + } else if (f.dstIntf == ie.DstInterfaceCore) || (f.dstIntf == ie.DstInterfaceSGiLANN6LAN) { + return farForwardU + } + } else if (f.applyAction & ActionDrop) != 0 { + return farDrop + } else if (f.applyAction & ActionBuffer) != 0 { + return farNotify + } else if (f.applyAction & ActionNotify) != 0 { + return farNotify + } + + // default action + return farDrop +} + func (b *bess) addFAR(ctx context.Context, done chan<- bool, far far) { go func() { var ( @@ -875,7 +900,7 @@ func (b *bess) addFAR(ctx context.Context, done chan<- bool, far far) { err error ) - action := far.setActionValue() + action := b.setActionValue(far) f := &pb.ExactMatchCommandAddArg{ Gate: uint64(far.tunnelType), Fields: []*pb.FieldData{ diff --git a/pfcpiface/messages.go b/pfcpiface/messages.go index f2b5ae8d1..784a0d7de 100644 --- a/pfcpiface/messages.go +++ b/pfcpiface/messages.go @@ -15,6 +15,11 @@ import ( "github.com/wmnsk/go-pfcp/message" ) +const ( + // Default timeout for DDN. + DefaultDDNTimeout = 20 +) + func handleHeartbeatRequest(msg message.Message, addr net.Addr, rTime time.Time) []byte { hbreq, ok := msg.(*message.HeartbeatRequest) if !ok { @@ -570,15 +575,15 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa addQERs = append(addQERs, q) } - if session.getNotifyFlag() { - session.updateNotifyFlag() - } - cause := upf.sendMsgToUPF(upfMsgTypeMod, addPDRs, addFARs, addQERs) if cause == ie.CauseRequestRejected { return sendError(errors.New("write to FastPath failed")) } + if session.getNotifyFlag() { + session.updateNotifyFlag() + } + if upf.enableEndMarker { err := upf.sendEndMarkers(&endMarkerList) if err != nil { @@ -831,13 +836,11 @@ func handleDigestReport(fseid uint64, return } - /* Number of Outstanding Notifies per session is 1 */ + /* Check if notify is already sent in current time interval */ if session.getNotifyFlag() { return } - session.setNotifyFlag(true) - seq := pfcpConn.getSeqNum() serep := message.NewSessionReportRequest(0, /* MO?? <-- what's this */ 0, /* FO <-- what's this? */ @@ -850,20 +853,37 @@ func handleDigestReport(fseid uint64, var pdrID uint32 + var farID uint32 + for _, pdr := range session.pdrs { if pdr.srcIface == core { pdrID = pdr.pdrID + + farID = pdr.farID + break } } - log.Println("Pdr iD : ", pdrID) + for _, far := range session.fars { + if far.farID == farID { + if far.applyAction&ActionNotify == 0 { + log.Println("packet recieved for forwarding far. discard") + return + } + } + } if pdrID == 0 { log.Println("No Pdr found for downlink") + return } + go session.runTimerForDDNNotify(DefaultDDNTimeout) + + session.setNotifyFlag(true) + serep.DownlinkDataReport = ie.NewDownlinkDataReport( ie.NewPDRID(uint16(pdrID))) diff --git a/pfcpiface/parse-far.go b/pfcpiface/parse-far.go index f25f34cf9..a8cacdfb5 100644 --- a/pfcpiface/parse-far.go +++ b/pfcpiface/parse-far.go @@ -66,25 +66,6 @@ func (f far) String() string { return b.String() } -func (f *far) setActionValue() uint8 { - if (f.applyAction & ActionForward) != 0 { - if f.dstIntf == ie.DstInterfaceAccess { - return farForwardD - } else if (f.dstIntf == ie.DstInterfaceCore) || (f.dstIntf == ie.DstInterfaceSGiLANN6LAN) { - return farForwardU - } - } else if (f.applyAction & ActionDrop) != 0 { - return farDrop - } else if (f.applyAction & ActionBuffer) != 0 { - return farBuffer - } else if (f.applyAction & ActionNotify) != 0 { - return farNotify - } - - // default action - return farDrop -} - func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error { f.fseID = (fseid) diff --git a/pfcpiface/session-far.go b/pfcpiface/session-far.go index dd159dcd5..805dedf95 100644 --- a/pfcpiface/session-far.go +++ b/pfcpiface/session-far.go @@ -6,6 +6,7 @@ package main import ( "errors" "net" + "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -101,6 +102,20 @@ func (s *PFCPSession) getNotifyFlag() bool { return s.notificationFlag.flag } +func (s *PFCPSession) runTimerForDDNNotify(timeout time.Duration) { + endTime := time.After(timeout) + + for { + select { + case <-endTime: + log.Println("DDN notify send timeout") + s.setNotifyFlag(false) + + return + } + } +} + // UpdateFAR updates existing far in the session. func (s *PFCPSession) updateNotifyFlag() { unset := true diff --git a/pfcpiface/upf.go b/pfcpiface/upf.go index 8f491334c..5af325f7a 100644 --- a/pfcpiface/upf.go +++ b/pfcpiface/upf.go @@ -54,13 +54,6 @@ const ( n3 = 0x0 n6 = 0x1 n9 = 0x2 - - // far-action specific values. - farForwardD = 0x0 - farForwardU = 0x1 - farDrop = 0x2 - farBuffer = 0x3 - farNotify = 0x4 ) func (u *upf) sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 {