diff --git a/pfcpiface/bess.go b/pfcpiface/bess.go index b21378758..e02e8f834 100644 --- a/pfcpiface/bess.go +++ b/pfcpiface/bess.go @@ -19,19 +19,17 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) -// SockAddr : Unix Socket path to read bess notification from +// SockAddr : Unix Socket path to read bess notification from. const SockAddr = "/tmp/notifycp" -// PfcpAddr : Unix Socket path to send end marker packet +// PfcpAddr : Unix Socket path to send end marker packet. const PfcpAddr = "/tmp/pfcpport" var intEnc = func(u uint64) *pb.FieldData { return &pb.FieldData{Encoding: &pb.FieldData_ValueInt{ValueInt: u}} } -var ( - bessIP = flag.String("bess", "localhost:10514", "BESS IP/port combo") -) +var bessIP = flag.String("bess", "localhost:10514", "BESS IP/port combo") type bess struct { client pb.BESSControlClient @@ -60,9 +58,10 @@ func (b *bess) sendEndMarkers(endMarkerList *[][]byte) error { return nil } -func (b *bess) sendMsgToUPF(method string, pdrs []pdr, fars []far, qers []qer) uint8 { +func (b *bess) sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 { // create context var cause uint8 = ie.CauseRequestAccepted + calls := len(pdrs) + len(fars) + len(qers) if calls == 0 { return cause @@ -70,53 +69,63 @@ func (b *bess) sendMsgToUPF(method string, pdrs []pdr, fars []far, qers []qer) u ctx, cancel := context.WithTimeout(context.Background(), Timeout) defer cancel() + done := make(chan bool) for _, pdr := range pdrs { log.Traceln(pdr) + switch method { - case "add": + case upfMsgTypeAdd: fallthrough - case "mod": + case upfMsgTypeMod: b.addPDR(ctx, done, pdr) - case "del": + case upfMsgTypeDel: b.delPDR(ctx, done, pdr) } } + for _, far := range fars { log.Traceln(far) + switch method { - case "add": + case upfMsgTypeAdd: fallthrough - case "mod": + case upfMsgTypeMod: b.addFAR(ctx, done, far) - case "del": + case upfMsgTypeDel: b.delFAR(ctx, done, far) } } + for _, qer := range qers { log.Traceln(qer) + switch method { - case "add": + case upfMsgTypeAdd: fallthrough - case "mod": + case upfMsgTypeMod: b.addQER(ctx, done, qer) - case "del": + case upfMsgTypeDel: b.delQER(ctx, done, qer) } } + rc := b.GRPCJoin(calls, Timeout, done) if !rc { log.Println("Unable to make GRPC calls") } + return cause } func (b *bess) sendDeleteAllSessionsMsgtoUPF() { ctx, cancel := context.WithTimeout(context.Background(), Timeout) defer cancel() + done := make(chan bool) calls := 5 + b.removeAllPDRs(ctx, done) b.removeAllFARs(ctx, done) b.removeAllCounters(ctx, done, "preQoSCounter") @@ -146,6 +155,7 @@ func (b *bess) measure(ifName string, f *pb.MeasureCommandGetSummaryArg) *pb.Mea } ctx := context.Background() + modRes, err := b.client.ModuleCommand(ctx, &pb.CommandRequest{ Name: modName(), Cmd: "get_summary", @@ -157,6 +167,7 @@ func (b *bess) measure(ifName string, f *pb.MeasureCommandGetSummaryArg) *pb.Mea } var res pb.MeasureCommandGetSummaryResponse + err = modRes.GetData().UnmarshalTo(&res) if err != nil { log.Println("Error unmarshalling the response", modName(), err) @@ -167,15 +178,17 @@ func (b *bess) measure(ifName string, f *pb.MeasureCommandGetSummaryArg) *pb.Mea } func (b *bess) getPortStats(ifname string) *pb.GetPortStatsResponse { + ctx := context.Background() req := &pb.GetPortStatsRequest{ Name: ifname + "Fast", } - ctx := context.Background() + res, err := b.client.GetPortStats(ctx, req) if err != nil || res.GetError() != nil { log.Println("Error calling GetPortStats", ifname, err, res.GetError().Errmsg) return nil } + return res } @@ -222,7 +235,6 @@ func (b *bess) portStats(uc *upfCollector, ch chan<- prometheus.Metric) { dropped(res.Inc.Dropped, "rx") dropped(res.Out.Dropped, "tx") - } portstats("Access", uc.upf.accessIface) @@ -236,6 +248,7 @@ func (b *bess) summaryLatencyJitter(uc *upfCollector, ch chan<- prometheus.Metri LatencyPercentiles: getPctiles(), JitterPercentiles: getPctiles(), } + res := b.measure(ifaceName, req) if res == nil { return @@ -269,7 +282,6 @@ func (b *bess) summaryLatencyJitter(uc *upfCollector, ch chan<- prometheus.Metri } measureIface("Access", uc.upf.accessIface) measureIface("Core", uc.upf.coreIface) - } func (b *bess) endMarkerSendLoop(endMarkerChan chan []byte) { @@ -282,9 +294,9 @@ func (b *bess) endMarkerSendLoop(endMarkerChan chan []byte) { } func (b *bess) notifyListen(reportNotifyChan chan<- uint64) { - for { buf := make([]byte, 512) + _, err := b.notifyBessSocket.Read(buf) if err != nil { return @@ -298,19 +310,23 @@ func (b *bess) notifyListen(reportNotifyChan chan<- uint64) { func (b *bess) setUpfInfo(u *upf, conf *Conf) { log.Println("setUpfInfo bess") - u.simInfo = &conf.SimInfo + u.simInfo = &conf.SimInfo u.ippoolCidr = conf.CPIface.UeIPPool + log.Println("IP pool : ", u.ippoolCidr) + errin := u.ippool.initPool(u.ippoolCidr) if errin != nil { log.Println("ip pool init failed") } + u.accessIP = ParseIP(conf.AccessIface.IfName, "Access") u.coreIP = ParseIP(conf.CoreIface.IfName, "Core") // get bess grpc client log.Println("bessIP ", *bessIP) + b.endMarkerChan = make(chan []byte, 1024) b.conn, errin = grpc.Dial(*bessIP, grpc.WithInsecure()) @@ -319,16 +335,19 @@ func (b *bess) setUpfInfo(u *upf, conf *Conf) { } b.client = pb.NewBESSControlClient(b.conn) + if conf.EnableNotifyBess { notifySockAddr := conf.NotifySockAddr if notifySockAddr == "" { notifySockAddr = SockAddr } + b.notifyBessSocket, errin = net.Dial("unixpacket", notifySockAddr) if errin != nil { log.Println("dial error:", errin) return } + go b.notifyListen(u.reportNotifyChan) } @@ -337,12 +356,15 @@ func (b *bess) setUpfInfo(u *upf, conf *Conf) { if pfcpCommAddr == "" { pfcpCommAddr = PfcpAddr } + b.endMarkerSocket, errin = net.Dial("unixpacket", pfcpCommAddr) if errin != nil { log.Println("dial error:", errin) return } + log.Println("Starting end marker loop") + go b.endMarkerSendLoop(b.endMarkerChan) } } @@ -523,12 +545,13 @@ func (b *bess) sim(u *upf, method string) { } qers := []qer{qerDown, qerN6Up, qerN9Up} + switch method { case "create": - b.sendMsgToUPF("add", pdrs, fars, qers) + b.sendMsgToUPF(upfMsgTypeAdd, pdrs, fars, qers) case "delete": - b.sendMsgToUPF("del", pdrs, fars, qers) + b.sendMsgToUPF(upfMsgTypeDel, pdrs, fars, qers) default: log.Fatalln("Unsupported method", method) @@ -537,15 +560,17 @@ func (b *bess) sim(u *upf, method string) { log.Println("Sessions/s:", float64(u.maxSessions)/time.Since(start).Seconds()) } -func (b *bess) processPDR(ctx context.Context, any *anypb.Any, method string) { - if method != "add" && method != "delete" && method != "clear" { +func (b *bess) processPDR(ctx context.Context, any *anypb.Any, method upfMsgType) { + if method != upfMsgTypeAdd && method != upfMsgTypeDel && method != upfMsgTypeClear { log.Println("Invalid method name: ", method) return } + methods := [...]string{"add", "add", "delete", "clear"} + _, err := b.client.ModuleCommand(ctx, &pb.CommandRequest{ Name: "pdrLookup", - Cmd: method, + Cmd: methods[method], Arg: any, }) if err != nil { @@ -555,8 +580,10 @@ func (b *bess) processPDR(ctx context.Context, any *anypb.Any, method string) { func (b *bess) addPDR(ctx context.Context, done chan<- bool, p pdr) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.WildcardMatchCommandAddArg{ Gate: uint64(p.needDecap), @@ -583,27 +610,30 @@ func (b *bess) addPDR(ctx context.Context, done chan<- bool, p pdr) { }, Valuesv: []*pb.FieldData{ intEnc(uint64(p.pdrID)), /* pdr-id */ - intEnc(uint64(p.fseID)), /* fseid */ + intEnc(p.fseID), /* fseid */ intEnc(uint64(p.ctrID)), /* ctr_id */ intEnc(uint64(p.qerID)), /* qer_id */ intEnc(uint64(p.farID)), /* far_id */ }, } + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processPDR(ctx, any, "add") + b.processPDR(ctx, any, upfMsgTypeAdd) done <- true }() } func (b *bess) delPDR(ctx context.Context, done chan<- bool, p pdr) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.WildcardMatchCommandDeleteArg{ Values: []*pb.FieldData{ @@ -627,26 +657,29 @@ func (b *bess) delPDR(ctx context.Context, done chan<- bool, p pdr) { intEnc(uint64(p.protoMask)), /* proto id-mask */ }, } + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processPDR(ctx, any, "delete") + b.processPDR(ctx, any, upfMsgTypeDel) done <- true }() } -func (b *bess) processQER(ctx context.Context, any *anypb.Any, method string) { - if method != "add" && method != "delete" && method != "clear" { +func (b *bess) processQER(ctx context.Context, any *anypb.Any, method upfMsgType) { + if method != upfMsgTypeAdd && method != upfMsgTypeDel && method != upfMsgTypeClear { log.Println("Invalid method name: ", method) return } + methods := [...]string{"add", "add", "delete", "clear"} + _, err := b.client.ModuleCommand(ctx, &pb.CommandRequest{ Name: "qerLookup", - Cmd: method, + Cmd: methods[method], Arg: any, }) if err != nil { @@ -656,64 +689,75 @@ func (b *bess) processQER(ctx context.Context, any *anypb.Any, method string) { func (b *bess) addQER(ctx context.Context, done chan<- bool, qer qer) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) + q := &pb.ExactMatchCommandAddArg{ Gate: uint64(0), Fields: []*pb.FieldData{ intEnc(uint64(qer.qerID)), /* far_id */ - intEnc(uint64(qer.fseID)), /* fseid */ + intEnc(qer.fseID), /* fseid */ }, Values: []*pb.FieldData{ intEnc(uint64(qer.qfi)), /* action */ intEnc(uint64(qer.ulStatus)), /* QFI */ intEnc(uint64(qer.dlStatus)), /* tunnel_out_type */ - intEnc(uint64(qer.ulMbr)), /* access-ip */ - intEnc(uint64(qer.dlMbr)), /* enb ip */ - intEnc(uint64(qer.ulGbr)), /* enb teid */ - intEnc(uint64(qer.dlGbr)), /* udp gtpu port */ + intEnc(qer.ulMbr), /* access-ip */ + intEnc(qer.dlMbr), /* enb ip */ + intEnc(qer.ulGbr), /* enb teid */ + intEnc(qer.dlGbr), /* udp gtpu port */ }, } + any, err = anypb.New(q) if err != nil { log.Println("Error marshalling the rule", q, err) return } - b.processQER(ctx, any, "add") + + b.processQER(ctx, any, upfMsgTypeAdd) done <- true }() } func (b *bess) delQER(ctx context.Context, done chan<- bool, qer qer) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) q := &pb.ExactMatchCommandDeleteArg{ Fields: []*pb.FieldData{ intEnc(uint64(qer.qerID)), /* qer_id */ - intEnc(uint64(qer.fseID)), /* fseid */ + intEnc(qer.fseID), /* fseid */ }, } + any, err = anypb.New(q) if err != nil { log.Println("Error marshalling the rule", q, err) return } - b.processQER(ctx, any, "delete") + + b.processQER(ctx, any, upfMsgTypeDel) done <- true }() } -func (b *bess) processFAR(ctx context.Context, any *anypb.Any, method string) { - if method != "add" && method != "delete" && method != "clear" { +func (b *bess) processFAR(ctx context.Context, any *anypb.Any, method upfMsgType) { + if method != upfMsgTypeAdd && method != upfMsgTypeDel && method != upfMsgTypeClear { log.Println("Invalid method name: ", method) return } + methods := [...]string{"add", "add", "delete", "clear"} + _, err := b.client.ModuleCommand(ctx, &pb.CommandRequest{ Name: "farLookup", - Cmd: method, + Cmd: methods[method], Arg: any, }) if err != nil { @@ -723,14 +767,17 @@ func (b *bess) processFAR(ctx context.Context, any *anypb.Any, method string) { func (b *bess) addFAR(ctx context.Context, done chan<- bool, far far) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) + action := far.setActionValue() f := &pb.ExactMatchCommandAddArg{ Gate: uint64(far.tunnelType), Fields: []*pb.FieldData{ intEnc(uint64(far.farID)), /* far_id */ - intEnc(uint64(far.fseID)), /* fseid */ + intEnc(far.fseID), /* fseid */ }, Values: []*pb.FieldData{ intEnc(uint64(action)), /* action */ @@ -741,41 +788,49 @@ func (b *bess) addFAR(ctx context.Context, done chan<- bool, far far) { intEnc(uint64(far.tunnelPort)), /* udp gtpu port */ }, } + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processFAR(ctx, any, "add") + + b.processFAR(ctx, any, upfMsgTypeAdd) done <- true }() } func (b *bess) delFAR(ctx context.Context, done chan<- bool, far far) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.ExactMatchCommandDeleteArg{ Fields: []*pb.FieldData{ intEnc(uint64(far.farID)), /* far_id */ - intEnc(uint64(far.fseID)), /* fseid */ + intEnc(far.fseID), /* fseid */ }, } + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processFAR(ctx, any, "delete") + + b.processFAR(ctx, any, upfMsgTypeDel) done <- true }() } -func (b *bess) processCounters(ctx context.Context, any *anypb.Any, method string, counterName string) { +func (b *bess) processCounters(ctx context.Context, any *anypb.Any, method upfMsgType, counterName string) { + methods := [...]string{"add", "add", "remove", "removeAll"} + _, err := b.client.ModuleCommand(ctx, &pb.CommandRequest{ Name: counterName, - Cmd: method, + Cmd: methods[method], Arg: any, }) if err != nil { @@ -785,8 +840,10 @@ func (b *bess) processCounters(ctx context.Context, any *anypb.Any, method strin func (b *bess) addCounter(ctx context.Context, done chan<- bool, ctrID uint32, counterName string) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.CounterAddArg{ CtrId: ctrID, @@ -797,15 +854,18 @@ func (b *bess) addCounter(ctx context.Context, done chan<- bool, ctrID uint32, c log.Println("Error marshalling the rule", f, err) return } - b.processCounters(ctx, any, "add", counterName) + + b.processCounters(ctx, any, upfMsgTypeAdd, counterName) done <- true }() } func (b *bess) delCounter(ctx context.Context, done chan<- bool, ctrID uint32, counterName string) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.CounterRemoveArg{ CtrId: ctrID, @@ -816,59 +876,68 @@ func (b *bess) delCounter(ctx context.Context, done chan<- bool, ctrID uint32, c log.Println("Error marshalling the rule", f, err) return } - b.processCounters(ctx, any, "remove", counterName) + + b.processCounters(ctx, any, upfMsgTypeDel, counterName) done <- true }() } func (b *bess) removeAllPDRs(ctx context.Context, done chan<- bool) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.EmptyArg{} + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processPDR(ctx, any, "clear") + b.processPDR(ctx, any, upfMsgTypeClear) done <- true }() } func (b *bess) removeAllFARs(ctx context.Context, done chan<- bool) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.EmptyArg{} + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processFAR(ctx, any, "clear") + b.processFAR(ctx, any, upfMsgTypeClear) done <- true }() } func (b *bess) removeAllCounters(ctx context.Context, done chan<- bool, name string) { go func() { - var any *anypb.Any - var err error + var ( + any *anypb.Any + err error + ) f := &pb.EmptyArg{} + any, err = anypb.New(f) if err != nil { log.Println("Error marshalling the rule", f, err) return } - b.processCounters(ctx, any, "removeAll", name) - + b.processCounters(ctx, any, upfMsgTypeClear, name) done <- true }() } @@ -883,6 +952,7 @@ func (b *bess) GRPCJoin(calls int, timeout time.Duration, done chan bool) bool { log.Println("Error making GRPC calls") return false } + calls-- if calls == 0 { return true diff --git a/pfcpiface/conn.go b/pfcpiface/conn.go index 2469123e8..258599214 100644 --- a/pfcpiface/conn.go +++ b/pfcpiface/conn.go @@ -13,7 +13,7 @@ import ( "github.com/wmnsk/go-pfcp/message" ) -// PktBufSz : buffer size for incoming pkt +// PktBufSz : buffer size for incoming pkt. const ( PktBufSz = 1500 PFCPPort = "8805" @@ -21,10 +21,10 @@ const ( readTimeout = 25 * time.Second ) -//Timeout : connection timeout +// Timeout : connection timeout. var Timeout = 1000 * time.Millisecond -// PFCPConn represents a PFCP connection +// PFCPConn represents a PFCP connection. type PFCPConn struct { seqNum sequenceNumber mgr *PFCPSessionMgr @@ -39,16 +39,19 @@ func (c *PFCPConn) getSeqNum() uint32 { c.seqNum.mux.Lock() defer c.seqNum.mux.Unlock() c.seqNum.seq++ + return c.seqNum.seq } func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { var pconn PFCPConn pconn.mgr = NewPFCPSessionMgr(100) + rTimeout := readTimeout if upf.readTimeout != 0 { - rTimeout = time.Duration(upf.readTimeout) + rTimeout = upf.readTimeout } + if upf.connTimeout != 0 { Timeout = upf.connTimeout } @@ -82,14 +85,18 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { if upf.simInfo != nil { return } + sendDeleteAllSessionsMsgtoUPF(upf) + cpConnected = false } // initiate connection if smf address available log.Println("calling manageSmfConnection smf service name ", smfName) + manageConnection := false if smfName != "" { manageConnection = true + go pconn.manageSmfConnection(upf.nodeIP.String(), accessIP, smfName, conn, cpConnectionStatus, upf.recoveryTime) } @@ -107,13 +114,18 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { if err, ok := err.(net.Error); ok && err.Timeout() { // do nothing for the time being log.Println(err) + cpConnected = false + if manageConnection { cpConnectionStatus <- cpConnected } + cleanupSessions() + continue } + log.Fatalln("Read error:", err) } @@ -125,14 +137,14 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { } // if sourceIP is not set, fetch it from the msg header - if sourceIP == "0.0.0.0" { + if sourceIP == net.IPv4zero.String() { addrString := strings.Split(addr.String(), ":") sourceIP = getLocalIP(addrString[0]).String() log.Println("Source IP address is now: ", sourceIP) } // if nodeIP is not set, fetch it from the msg header - if upf.nodeIP.String() == "0.0.0.0" { + if upf.nodeIP.String() == net.IPv4zero.String() { addrString := strings.Split(addr.String(), ":") upf.nodeIP = getLocalIP(addrString[0]) log.Println("Node IP address is now: ", upf.nodeIP.String()) @@ -142,14 +154,19 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { // handle message var outgoingMessage []byte + switch msg.MessageType() { case message.MsgTypeAssociationSetupRequest: cleanupSessions() + go readReportNotification(upf.reportNotifyChan, &pconn, conn, addr) + upf.setInfo(conn, addr, &pconn) + outgoingMessage = pconn.handleAssociationSetupRequest(upf, msg, addr, sourceIP, accessIP, coreIP) if outgoingMessage != nil { cpConnected = true + if manageConnection { // if we initiated connection, inform go routine cpConnectionStatus <- cpConnected @@ -157,6 +174,7 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { } case message.MsgTypeAssociationSetupResponse: cpConnected = handleAssociationSetupResponse(msg, addr, sourceIP, accessIP) + if manageConnection { // pass on information to go routine that result of association response cpConnectionStatus <- cpConnected @@ -173,6 +191,7 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { outgoingMessage = pconn.handleSessionDeletionRequest(upf, msg, addr, sourceIP) case message.MsgTypeAssociationReleaseRequest: outgoingMessage = handleAssociationReleaseRequest(upf, msg, addr, sourceIP, accessIP, upf.recoveryTime) + cleanupSessions() case message.MsgTypeSessionReportResponse: pconn.handleSessionReportResponse(upf, msg, addr) @@ -187,6 +206,5 @@ func pfcpifaceMainLoop(upf *upf, accessIP, coreIP, sourceIP, smfName string) { log.Fatalln("Unable to transmit association setup response", err) } } - } } diff --git a/pfcpiface/fastpath.go b/pfcpiface/fastpath.go index 42a0c62c8..b5e331afd 100644 --- a/pfcpiface/fastpath.go +++ b/pfcpiface/fastpath.go @@ -4,8 +4,18 @@ package main import ( - "github.com/prometheus/client_golang/prometheus" "net" + + "github.com/prometheus/client_golang/prometheus" +) + +type upfMsgType int + +const ( + upfMsgTypeAdd upfMsgType = iota + upfMsgTypeMod + upfMsgTypeDel + upfMsgTypeClear ) type fastPath interface { @@ -20,7 +30,7 @@ type fastPath interface { /* write endMarker to fastpath */ sendEndMarkers(endMarkerList *[][]byte) error /* write pdr/far/qer to fastpath */ - sendMsgToUPF(method string, pdrs []pdr, fars []far, qers []qer) uint8 + sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 /* delete all pdrs/fars/qers/ installed in fastpath tabled */ sendDeleteAllSessionsMsgtoUPF() /* check of communication channel to fastpath is setup */ diff --git a/pfcpiface/ip_pool.go b/pfcpiface/ip_pool.go index 8bfd2dae7..3d4c7ab30 100644 --- a/pfcpiface/ip_pool.go +++ b/pfcpiface/ip_pool.go @@ -24,10 +24,13 @@ func (ipp *ipPool) allocIPV4() (net.IP, error) { err := errors.New("ip pool empty") return nil, err } + element := ipp.freePool[0] // The first element is the one to be dequeued. log.Println("Dequeued:", element) + ipp.freePool = ipp.freePool[1:] // Slice off the element once it is dequeued. ipVal := net.ParseIP(element).To4() + return ipVal, nil } @@ -40,8 +43,9 @@ func (ipp *ipPool) initPool(cidr string) error { for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { ipp.freePool = append(ipp.freePool, ip.String()) } - // remove network address and broadcast address + // remove network address and broadcast address ipp.freePool = ipp.freePool[1 : len(ipp.freePool)-1] + return nil } diff --git a/pfcpiface/main.go b/pfcpiface/main.go index fb12b7717..479b98aeb 100644 --- a/pfcpiface/main.go +++ b/pfcpiface/main.go @@ -23,7 +23,7 @@ var ( pfcpsim = flag.Bool("pfcpsim", false, "simulate PFCP") ) -// Conf : Json conf struct +// Conf : Json conf struct. type Conf struct { Mode string `json:"mode"` MaxSessions uint32 `json:"max_sessions"` @@ -42,7 +42,7 @@ type Conf struct { LogLevel string `json:"log_level"` } -// SimModeInfo : Sim mode attributes +// SimModeInfo : Sim mode attributes. type SimModeInfo struct { StartUEIP net.IP `json:"start_ue_ip"` StartENBIP net.IP `json:"start_enb_ip"` @@ -53,7 +53,7 @@ type SimModeInfo struct { StartN9TEID string `json:"start_n9_teid"` } -// CPIfaceInfo : CPIface interface settings +// CPIfaceInfo : CPIface interface settings. type CPIfaceInfo struct { DestIP string `json:"nb_dst_ip"` SrcIP string `json:"nb_src_ip"` @@ -64,16 +64,15 @@ type CPIfaceInfo struct { Dnn string `json:"dnn"` } -// IfaceType : Gateway interface struct +// IfaceType : Gateway interface struct. type IfaceType struct { IfName string `json:"ifname"` } -// ParseJSON : parse json file and populate corresponding struct +// ParseJSON : parse json file and populate corresponding struct. func ParseJSON(filepath *string, conf *Conf) { /* Open up file */ jsonFile, err := os.Open(*filepath) - if err != nil { log.Fatalln("Error opening file: ", err) } @@ -96,20 +95,21 @@ func ParseJSON(filepath *string, conf *Conf) { } } -// ParseStrIP : parse IP address from config +// ParseStrIP : parse IP address from config. func ParseStrIP(n3name string) (net.IP, net.IPMask) { ip, ipNet, err := net.ParseCIDR(n3name) if err != nil { log.Fatalln("Unable to parse IP: ", err) } + log.Println("IP: ", ip) + return ip, (ipNet).Mask } -// ParseIP : parse IP address from the interface name +// ParseIP : parse IP address from the interface name. func ParseIP(name string, iface string) net.IP { byNameInterface, err := net.InterfaceByName(name) - if err != nil { log.Fatalln("Unable to get info on interface name:", name, err) } @@ -123,15 +123,21 @@ func ParseIP(name string, iface string) net.IP { if err != nil { log.Fatalln("Unable to parse", iface, " IP: ", err) } + log.Println(iface, " IP: ", ip) + return ip } func main() { // cmdline args flag.Parse() - var conf Conf - var intf fastPath + + var ( + conf Conf + intf fastPath + ) + // read and parse json startup file ParseJSON(configPath, &conf) @@ -140,11 +146,13 @@ func main() { log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, }) + if level, err := log.ParseLevel(conf.LogLevel); err != nil { log.Fatalln(err) } else { log.SetLevel(level) } + log.Infoln(conf) if conf.EnableP4rt { @@ -187,16 +195,20 @@ func main() { log.Println(*simulate, "sessions:", conf.MaxSessions) upf.sim(*simulate) + return } + log.Println("N4 local IP: ", upf.n4SrcIP.String()) log.Println("Access IP: ", upf.accessIP.String()) log.Println("Core IP: ", upf.coreIP.String()) + if conf.CPIface.PromPort != "" { *httpAddr = string("0.0.0.0:") + conf.CPIface.PromPort } log.Println("httpAddr: ", httpAddr) + go pfcpifaceMainLoop( upf, upf.accessIP.String(), upf.coreIP.String(), upf.n4SrcIP.String(), diff --git a/pfcpiface/messages.go b/pfcpiface/messages.go index 1e2eb6f32..84d7ad11d 100644 --- a/pfcpiface/messages.go +++ b/pfcpiface/messages.go @@ -49,30 +49,37 @@ func (pc *PFCPConn) handleAssociationSetupRequest(upf *upf, msg message.Message, log.Println("Got an association setup request with invalid NodeID: ", err, " from: ", addr) return nil } + ts, err := asreq.RecoveryTimeStamp.RecoveryTimeStamp() if err != nil { log.Println("Got an association setup request with invalid TS: ", err, " from: ", addr) return nil } + log.Println("Got an association setup request with TS: ", ts, " from: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Association_Setup_Request", "Incoming", "Success").Inc() + + globalPfcpStats.messages.WithLabelValues(nodeID, "Association_Setup_Request", "Incoming", "Success").Inc() cause := ie.CauseRequestAccepted if !upf.isConnected() { cause = ie.CauseRequestRejected - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Association_Setup_Response", "Outgoing", "Failure").Inc() + + globalPfcpStats.messages.WithLabelValues(nodeID, "Association_Setup_Response", "Outgoing", "Failure").Inc() } else { - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Association_Setup_Response", "Outgoing", "Success").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Association_Setup_Response", "Outgoing", "Success").Inc() } // Build response message // Timestamp shouldn't be the time message is sent in the real deployment but anyway :D - flags := uint8(0x41) log.Println("Dnn info : ", upf.dnn) + + flags := uint8(0x41) + if len(upf.dnn) != 0 { - //add ASSONI flag to set network instance. + // add ASSONI flag to set network instance. flags = uint8(0x61) } + asresmsg := message.NewAssociationSetupResponse(asreq.SequenceNumber, ie.NewRecoveryTimeStamp(upf.recoveryTime), ie.NewNodeID(upf.nodeIP.String(), "", ""), /* node id (IPv4) */ @@ -85,7 +92,9 @@ func (pc *PFCPConn) handleAssociationSetupRequest(upf *upf, msg message.Message, pc.mgr.nodeID = nodeID log.Println("Association setup NodeID : ", pc.mgr.nodeID) + features := make([]uint8, 4) + if upf.enableUeIPAlloc { setUeipFeature(features...) } @@ -94,8 +103,8 @@ func (pc *PFCPConn) handleAssociationSetupRequest(upf *upf, msg message.Message, setEndMarkerFeature(features...) } - asresmsg.UPFunctionFeatures = - ie.NewUPFunctionFeatures(features...) + asresmsg.UPFunctionFeatures = ie.NewUPFunctionFeatures(features...) + asres, err := asresmsg.Marshal() if err != nil { log.Fatalln("Unable to create association setup response", err) @@ -118,6 +127,7 @@ func handleAssociationSetupResponse(msg message.Message, addr net.Addr, sourceIP log.Println("Got an association setup response with invalid TS: ", err, " from: ", addr) return false } + log.Println("Received a PFCP association setup response with TS: ", ts, " from: ", addr) cause, err := asres.Cause.Cause() @@ -127,6 +137,7 @@ func handleAssociationSetupResponse(msg message.Message, addr net.Addr, sourceIP } log.Println("PFCP Association formed with Control Plane - ", addr) + return true } @@ -174,9 +185,9 @@ func (pc *PFCPConn) handlePFDMgmtRequest(upf *upf, msg message.Message, addr net pc.mgr.ResetAppPFDs() sendError := func(err error, offendingIE *ie.IE) []byte { + log.Println(err) // Revert the map to original contents pc.mgr.appPFDs = currentAppPFDs - log.Println(err) // Build response message pfdres, err := message.NewPFDManagementResponse(pfdmreq.SequenceNumber, ie.NewCause(ie.CauseRequestRejected), @@ -187,6 +198,7 @@ func (pc *PFCPConn) handlePFDMgmtRequest(upf *upf, msg message.Message, addr net } log.Println("Sending PFD management error response to: ", addr) + return pfdres } @@ -211,11 +223,14 @@ func (pc *PFCPConn) handlePFDMgmtRequest(upf *upf, msg message.Message, addr net pc.mgr.RemoveAppPFD(id) return sendError(err, appIDPFD) } + if fields.FlowDescription == "" { - return sendError(errors.New("Flow Description not found"), appIDPFD) + return sendError(errors.New("flow description not found"), appIDPFD) } + appPFD.flowDescs = append(appPFD.flowDescs, fields.FlowDescription) } + pc.mgr.appPFDs[id] = appPFD log.Println("Flow descriptions for AppID", id, ":", appPFD.flowDescs) } @@ -230,11 +245,13 @@ func (pc *PFCPConn) handlePFDMgmtRequest(upf *upf, msg message.Message, addr net } log.Println("Sending PFD management response to: ", addr) + return pfdres } func (pc *PFCPConn) handleSessionReportResponse(upf *upf, msg message.Message, addr net.Addr) { log.Println("Got session report response from: ", addr) + srres, ok := msg.(*message.SessionReportResponse) if !ok { log.Println("Got an unexpected message: ", msg.MessageTypeName(), " from: ", addr) @@ -245,15 +262,19 @@ func (pc *PFCPConn) handleSessionReportResponse(upf *upf, msg message.Message, a if cause != ie.CauseRequestAccepted { seid := srres.SEID() log.Println("session req not accepted seq : ", srres.SequenceNumber) + if cause == ie.CauseSessionContextNotFound { sessItem, ok := pc.mgr.sessions[seid] if !ok { log.Println("context not found locally or remote. SEID : ", seid) return } + log.Println("context not found. Delete session locally") + pc.mgr.RemoveSession(srres.SEID()) - cause := upf.sendMsgToUPF("del", sessItem.pdrs, sessItem.fars, sessItem.qers) + + cause := upf.sendMsgToUPF(upfMsgTypeDel, sessItem.pdrs, sessItem.fars, sessItem.qers) if cause == ie.CauseRequestRejected { log.Println("Write to FastPath failed") } @@ -279,7 +300,7 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess } log.Println("Got a session establishment request from: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Establishment_Request", "Incoming", "Success").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Establishment_Request", "Incoming", "Success").Inc() /* Read fseid from the IE */ fseid, err := sereq.CPFSEID.FSEID() @@ -287,11 +308,12 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess log.Println("Failed to parse FSEID from session establishment request") return nil } + remoteSEID := fseid.SEID fseidIP := ip2int(fseid.IPv4Address) sendError := func(err error, cause uint8) []byte { - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Establishment_Response", "Outgoing", "Failure").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Establishment_Response", "Outgoing", "Failure").Inc() log.Println(err) // Build response message seres, err := message.NewSessionEstablishmentResponse(0, /* MO?? <-- what's this */ @@ -307,27 +329,31 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess } log.Println("Sending session establishment response to: ", addr) + return seres } if strings.Compare(nodeID, pc.mgr.nodeID) != 0 { log.Println("Association not found for Establishment request, nodeID: ", nodeID, ", Association NodeID: ", pc.mgr.nodeID) - return sendError(errors.New("No Association found for NodeID"), + return sendError(errors.New("no association found for NodeID"), ie.CauseNoEstablishedPFCPAssociation) } /* Read CreatePDRs and CreateFARs from payload */ localSEID := pc.mgr.NewPFCPSession(remoteSEID) if localSEID == 0 { - sendError(errors.New("Unable to allocate new PFCP session"), + sendError(errors.New("unable to allocate new PFCP session"), ie.CauseNoResourcesAvailable) } + session := pc.mgr.sessions[localSEID] + for _, cPDR := range sereq.CreatePDR { var p pdr if err := p.parsePDR(cPDR, session.localSEID, pc.mgr.appPFDs, upf); err != nil { return sendError(err, ie.CauseRequestRejected) } + p.fseidIP = fseidIP session.CreatePDR(p) } @@ -337,6 +363,7 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess if err := f.parseFAR(cFAR, session.localSEID, upf, create); err != nil { return sendError(err, ie.CauseRequestRejected) } + f.fseidIP = fseidIP session.CreateFAR(f) } @@ -346,14 +373,15 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess if err := q.parseQER(cQER, session.localSEID, upf); err != nil { return sendError(err, ie.CauseRequestRejected) } + q.fseidIP = fseidIP session.CreateQER(q) } - cause := upf.sendMsgToUPF("add", session.pdrs, session.fars, session.qers) + cause := upf.sendMsgToUPF(upfMsgTypeAdd, session.pdrs, session.fars, session.qers) if cause == ie.CauseRequestRejected { pc.mgr.RemoveSession(session.localSEID) - return sendError(errors.New("Write to FastPath failed"), + return sendError(errors.New("write to FastPath failed"), ie.CauseRequestRejected) } @@ -369,13 +397,16 @@ func (pc *PFCPConn) handleSessionEstablishmentRequest(upf *upf, msg message.Mess ) addPdrInfo(seresMsg, session) + seres, err := seresMsg.Marshal() if err != nil { log.Fatalln("Unable to create session establishment response", err) } log.Println("Sending session establishment response to: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Establishment_Response", "Outgoing", "Success").Inc() + + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Establishment_Response", "Outgoing", "Success").Inc() + return seres } @@ -389,12 +420,14 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa nodeID := pc.mgr.nodeID log.Println("Got a session modification request from: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Modification_Request", "Incoming", "Success").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Modification_Request", "Incoming", "Success").Inc() var remoteSEID uint64 + sendError := func(err error) []byte { - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Modification_Response", "Outgoing", "Failure").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Modification_Response", "Outgoing", "Failure").Inc() log.Println(err) + smres, err := message.NewSessionModificationResponse(0, /* MO?? <-- what's this */ 0, /* FO <-- what's this? */ remoteSEID, /* seid */ @@ -407,36 +440,44 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa } log.Println("Sending session modification response to: ", addr) + return smres } localSEID := smreq.SEID() + session, ok := pc.mgr.sessions[localSEID] if !ok { - return sendError(fmt.Errorf("Session not found: %v", localSEID)) + return sendError(fmt.Errorf("session not found: %v", localSEID)) } var fseidIP uint32 + if smreq.CPFSEID != nil { fseid, err := smreq.CPFSEID.FSEID() if err == nil { session.remoteSEID = fseid.SEID fseidIP = ip2int(fseid.IPv4Address) + log.Println("Updated FSEID from session modification request") } } + remoteSEID = session.remoteSEID addPDRs := make([]pdr, 0, MaxItems) addFARs := make([]far, 0, MaxItems) addQERs := make([]qer, 0, MaxItems) endMarkerList := make([][]byte, 0, MaxItems) + for _, cPDR := range smreq.CreatePDR { var p pdr if err := p.parsePDR(cPDR, localSEID, pc.mgr.appPFDs, upf); err != nil { return sendError(err) } + p.fseidIP = fseidIP + session.CreatePDR(p) addPDRs = append(addPDRs, p) } @@ -446,7 +487,9 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa if err := f.parseFAR(cFAR, localSEID, upf, create); err != nil { return sendError(err) } + f.fseidIP = fseidIP + session.CreateFAR(f) addFARs = append(addFARs, f) } @@ -456,53 +499,73 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa if err := q.parseQER(cQER, localSEID, upf); err != nil { return sendError(err) } + q.fseidIP = fseidIP + session.CreateQER(q) addQERs = append(addQERs, q) } for _, uPDR := range smreq.UpdatePDR { - var p pdr - var err error + var ( + p pdr + err error + ) + if err = p.parsePDR(uPDR, localSEID, pc.mgr.appPFDs, upf); err != nil { return sendError(err) } + p.fseidIP = fseidIP + err = session.UpdatePDR(p) if err != nil { log.Println("session PDR update failed ", err) continue } + addPDRs = append(addPDRs, p) } for _, uFAR := range smreq.UpdateFAR { - var f far - var err error + var ( + f far + err error + ) + if err = f.parseFAR(uFAR, localSEID, upf, update); err != nil { return sendError(err) } + f.fseidIP = fseidIP + err = session.UpdateFAR(&f, &endMarkerList) if err != nil { log.Println("session PDR update failed ", err) continue } + addFARs = append(addFARs, f) } for _, uQER := range smreq.UpdateQER { - var q qer - var err error + var ( + q qer + err error + ) + if err = q.parseQER(uQER, localSEID, upf); err != nil { return sendError(err) } + q.fseidIP = fseidIP + err = session.UpdateQER(q) if err != nil { log.Println("session QER update failed ", err) continue } + addQERs = append(addQERs, q) } @@ -510,9 +573,9 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa session.updateNotifyFlag() } - cause := upf.sendMsgToUPF("mod", addPDRs, addFARs, addQERs) + cause := upf.sendMsgToUPF(upfMsgTypeMod, addPDRs, addFARs, addQERs) if cause == ie.CauseRequestRejected { - return sendError(errors.New("Write to FastPath failed")) + return sendError(errors.New("write to FastPath failed")) } if upf.enableEndMarker { @@ -536,6 +599,7 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa if err != nil { return sendError(err) } + delPDRs = append(delPDRs, *p) } @@ -549,6 +613,7 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa if err != nil { return sendError(err) } + delFARs = append(delFARs, *f) } @@ -562,12 +627,13 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa if err != nil { return sendError(err) } + delQERs = append(delQERs, *q) } - cause = upf.sendMsgToUPF("del", delPDRs, delFARs, delQERs) + cause = upf.sendMsgToUPF(upfMsgTypeDel, delPDRs, delFARs, delQERs) if cause == ie.CauseRequestRejected { - return sendError(errors.New("Write to FastPath failed")) + return sendError(errors.New("write to FastPath failed")) } // Build response message @@ -582,8 +648,9 @@ func (pc *PFCPConn) handleSessionModificationRequest(upf *upf, msg message.Messa log.Fatalln("Unable to create session modification response", err) } - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Modification_Response", "Outgoing", "Success").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Modification_Response", "Outgoing", "Success").Inc() log.Println("Sent session modification response to: ", addr) + return smres } @@ -595,12 +662,14 @@ func (pc *PFCPConn) handleSessionDeletionRequest(upf *upf, msg message.Message, } nodeID := pc.mgr.nodeID + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Deletion_Request", "Incoming", "Success").Inc() + log.Println("Got a session deletion request from: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Deletion_Request", "Incoming", "Success").Inc() sendError := func(err error) []byte { log.Println(err) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Deletion_Response", "Outgoing", "Failure").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Deletion_Response", "Outgoing", "Failure").Inc() + smres, err := message.NewSessionDeletionResponse(0, /* MO?? <-- what's this */ 0, /* FO <-- what's this? */ 0, /* seid */ @@ -613,19 +682,21 @@ func (pc *PFCPConn) handleSessionDeletionRequest(upf *upf, msg message.Message, } log.Println("Sending session deletion response to: ", addr) + return smres } /* retrieve sessionRecord */ localSEID := sdreq.SEID() + session, ok := pc.mgr.sessions[localSEID] if !ok { - return sendError(fmt.Errorf("Session not found: %v", localSEID)) + return sendError(fmt.Errorf("session not found: %v", localSEID)) } - cause := upf.sendMsgToUPF("del", session.pdrs, session.fars, session.qers) + cause := upf.sendMsgToUPF(upfMsgTypeDel, session.pdrs, session.fars, session.qers) if cause == ie.CauseRequestRejected { - return sendError(errors.New("Write to FastPath failed")) + return sendError(errors.New("write to FastPath failed")) } releaseAllocatedIPs(upf, session) @@ -645,7 +716,7 @@ func (pc *PFCPConn) handleSessionDeletionRequest(upf *upf, msg message.Message, } log.Println("Sent session deletion response to: ", addr) - globalPfcpStats.messages.WithLabelValues(string(nodeID), "Pfcp_Deletion_Response", "Outgoing", "Success").Inc() + globalPfcpStats.messages.WithLabelValues(nodeID, "Pfcp_Deletion_Response", "Outgoing", "Success").Inc() return smres } @@ -658,19 +729,21 @@ func (pc *PFCPConn) manageSmfConnection(n4LocalIP string, n3ip string, n4Dst str n4DstIP := getRemoteIP(n4Dst) log.Println("SPGWC/SMF address IP inside manageSmfConnection ", n4DstIP.String()) // initiate request if we have control plane address available - if n4DstIP.String() != "0.0.0.0" { + if n4DstIP.String() != net.IPv4zero.String() { pc.generateAssociationRequest(n4LocalIP, n3ip, n4DstIP.String(), conn, rTime) } - // no worry. Looks like control plane is still not up } + updateSmfStatus := func(msg bool) { log.Println("cpConnected : ", cpConnected, "msg ", msg) // events from main Loop if cpConnected && !msg { log.Println("CP disconnected ") + cpConnected = false } else if !cpConnected && msg { log.Println("CP Connected ") + cpConnected = true } else { log.Println("cpConnected ", cpConnected, "msg - ", msg) @@ -681,11 +754,13 @@ func (pc *PFCPConn) manageSmfConnection(n4LocalIP string, n3ip string, n4Dst str connHelathTicker := time.NewTicker(5000 * time.Millisecond) pfcpResponseTicker := time.NewTicker(2000 * time.Millisecond) + for { select { case msg := <-cpConnectionStatus: // events from main Loop updateSmfStatus(msg) + if cpConnected { pfcpResponseTicker.Stop() } @@ -695,16 +770,17 @@ func (pc *PFCPConn) manageSmfConnection(n4LocalIP string, n3ip string, n4Dst str initiatePfcpConnection() } case <-pfcpResponseTicker.C: + // we will attempt new connection after next recheck log.Println("PFCP session setup timeout ") pfcpResponseTicker.Stop() - // we will attempt new connection after next recheck } } } func (pc *PFCPConn) generateAssociationRequest(n4LocalIP string, n3ip string, n4DstIP string, conn *net.UDPConn, rTime time.Time) { - seq := pc.getSeqNum() log.Println("n4DstIp ", n4DstIP) + + seq := pc.getSeqNum() // Build request message asreq, err := message.NewAssociationSetupRequest(seq, ie.NewRecoveryTimeStamp(rTime), ie.NewNodeID(n4LocalIP, "", ""), /* node id (IPv4) */ @@ -732,6 +808,7 @@ func (pc *PFCPConn) generateAssociationRequest(n4LocalIP string, n3ip string, n4 func readReportNotification(rn <-chan uint64, pfcpConn *PFCPConn, udpConn *net.UDPConn, udpAddr net.Addr) { log.Println("read report notification start") + for { select { case fseid := <-rn: @@ -759,6 +836,7 @@ func handleDigestReport(fseid uint64, } session.setNotifyFlag(true) + seq := pfcpConn.getSeqNum() serep := message.NewSessionReportRequest(0, /* MO?? <-- what's this */ 0, /* FO <-- what's this? */ @@ -768,7 +846,9 @@ func handleDigestReport(fseid uint64, ie.NewReportType(0, 0, 0, 1), /*upir, erir, usar, dldr int*/ ) serep.Header.SEID = session.remoteSEID + var pdrID uint32 + for _, pdr := range session.pdrs { if pdr.srcIface == core { pdrID = pdr.pdrID @@ -777,6 +857,7 @@ func handleDigestReport(fseid uint64, } log.Println("Pdr iD : ", pdrID) + if pdrID == 0 { log.Println("No Pdr found for downlink") return diff --git a/pfcpiface/p4rt.go b/pfcpiface/p4rt.go index b6d990094..b7ba89c1c 100644 --- a/pfcpiface/p4rt.go +++ b/pfcpiface/p4rt.go @@ -20,7 +20,7 @@ var ( p4RtcServerPort = flag.String("p4RtcServerPort", "", "P4 Server port") ) -// P4rtcInfo : P4 runtime interface settings +// P4rtcInfo : P4 runtime interface settings. type P4rtcInfo struct { AccessIP string `json:"access_ip"` P4rtcServer string `json:"p4rtc_server"` @@ -28,17 +28,17 @@ type P4rtcInfo struct { UEIP string `json:"ue_ip_pool"` } -//ctrType +// TODO: convert uint8 to enum. const ( - preQosPdrCounter uint8 = 0 //Pre qos pdr ctr - postQosPdrCounter uint8 = 1 //Post qos pdr ctr + preQosPdrCounter uint8 = 0 // Pre qos pdr ctr + postQosPdrCounter uint8 = 1 // Post qos pdr ctr ) type counter struct { maxSize uint64 counterID uint64 allocated map[uint64]uint64 - //free map[uint64]uint64 + // free map[uint64]uint64 } type p4rtc struct { @@ -65,6 +65,7 @@ func (p *p4rtc) portStats(uc *upfCollector, ch chan<- prometheus.Metric) { func setSwitchInfo(p4rtClient *P4rtClient) (net.IP, net.IPMask, error) { log.Println("Set Switch Info") log.Println("device id ", (*p4rtClient).DeviceID) + p4InfoPath := "/bin/p4info.txt" deviceConfigPath := "/bin/bmv2.json" @@ -108,17 +109,20 @@ func setCounterSize(p *p4rtc, counterID uint8, name string) error { log.Println("ctr ID : ", ctr.Preamble.Id) p.counters[counterID].maxSize = uint64(ctr.Size) p.counters[counterID].counterID = uint64(ctr.Preamble.Id) + return nil } } } - errin := fmt.Errorf("Countername not found %s", name) + errin := fmt.Errorf("countername not found %s", name) + return errin } func (p *p4rtc) setInfo(conn *net.UDPConn, addr net.Addr, pconn *PFCPConn) { log.Println("setUDP Conn ", conn) + p.pfcpConn = pconn } @@ -136,20 +140,23 @@ func getCounterVal(p *p4rtc, counterID uint8, pdrID uint32) (uint64, error) { if present continue if loop reaches max break and fail. */ - ctr := &p.counters[counterID] var val uint64 + + ctr := &p.counters[counterID] for i := 0; i < int(ctr.maxSize); i++ { rand.Seed(time.Now().UnixNano()) + val = uint64(rand.Intn(int(ctr.maxSize)-1) + 1) if _, ok := ctr.allocated[val]; !ok { log.Println("key not in allocated map ", val) + ctr.allocated[val] = 1 + return val, nil } } - errin := fmt.Errorf("key alloc fail %v", val) - return 0, errin + return 0, fmt.Errorf("key alloc fail %v", val) } func (p *p4rtc) exit() { @@ -158,23 +165,23 @@ func (p *p4rtc) exit() { func (p *p4rtc) channelSetup() (*P4rtClient, error) { log.Println("Channel Setup.") - localclient, errin := CreateChannel(p.host, - p.deviceID, p.timeout, p.reportNotifyChan) + + localclient, errin := CreateChannel(p.host, p.deviceID, p.timeout, p.reportNotifyChan) if errin != nil { log.Println("create channel failed : ", errin) return nil, errin } + if localclient != nil { log.Println("device id ", (*localclient).DeviceID) - p.accessIP, p.accessIPMask, errin = - setSwitchInfo(localclient) + + p.accessIP, p.accessIPMask, errin = setSwitchInfo(localclient) if errin != nil { log.Println("Switch set info failed ", errin) return nil, errin } log.Println("accessIP, Mask ", p.accessIP, p.accessIPMask) - } else { log.Println("p4runtime client is null.") return nil, errin @@ -185,23 +192,26 @@ func (p *p4rtc) channelSetup() (*P4rtClient, error) { func initCounter(p *p4rtc) error { log.Println("Initialize counters for p4client.") + var errin error + if p.p4client == nil { errin = fmt.Errorf("can't initialize counter. P4client null") return errin } p.counters = make([]counter, 2) - errin = setCounterSize(p, preQosPdrCounter, - "PreQosPipe.pre_qos_pdr_counter") + + errin = setCounterSize(p, preQosPdrCounter, "PreQosPipe.pre_qos_pdr_counter") if errin != nil { log.Println("preQosPdrCounter counter not found : ", errin) } - errin = setCounterSize(p, postQosPdrCounter, - "PostQosPipe.post_qos_pdr_counter") + + errin = setCounterSize(p, postQosPdrCounter, "PostQosPipe.post_qos_pdr_counter") if errin != nil { log.Println("postQosPdrCounter counter not found : ", errin) } + for i := range p.counters { log.Println("init maps for counters.") p.counters[i].init() @@ -222,10 +232,12 @@ func (p *p4rtc) isConnected(accessIP *net.IP) bool { if accessIP != nil { *accessIP = p.accessIP } + errin = p.p4client.ClearPdrTable() if errin != nil { log.Println("clear PDR table failed : ", errin) } + errin = p.p4client.ClearFarTable() if errin != nil { log.Println("clear FAR table failed : ", errin) @@ -243,9 +255,10 @@ func (p *p4rtc) isConnected(accessIP *net.IP) bool { func (p *p4rtc) sendDeleteAllSessionsMsgtoUPF() { log.Println("Loop through sessions and delete all entries p4") + if (p.pfcpConn != nil) && (p.pfcpConn.mgr != nil) { for seidKey, value := range p.pfcpConn.mgr.sessions { - p.sendMsgToUPF("del", value.pdrs, value.fars, nil) + p.sendMsgToUPF(upfMsgTypeDel, value.pdrs, value.fars, nil) p.pfcpConn.mgr.RemoveSession(seidKey) } } @@ -257,10 +270,12 @@ func (p *p4rtc) sim(u *upf, method string) { func (p *p4rtc) setUpfInfo(u *upf, conf *Conf) { log.Println("setUpfInfo p4rtc") + var errin error + u.accessIP, p.accessIPMask = ParseStrIP(conf.P4rtcIface.AccessIP) - log.Println("AccessIP: ", u.accessIP, - ", AccessIPMask: ", p.accessIPMask) + log.Println("AccessIP: ", u.accessIP, ", AccessIPMask: ", p.accessIPMask) + p.p4rtcServer = conf.P4rtcIface.P4rtcServer log.Println("p4rtc server ip/name", p.p4rtcServer) p.p4rtcPort = conf.P4rtcIface.P4rtcPort @@ -274,7 +289,8 @@ func (p *p4rtc) setUpfInfo(u *upf, conf *Conf) { p.p4rtcPort = *p4RtcServerPort } - u.coreIP = net.ParseIP("0.0.0.0") + u.coreIP = net.ParseIP(net.IPv4zero.String()) + log.Println("onos server ip ", p.p4rtcServer) log.Println("onos server port ", p.p4rtcPort) log.Println("n4 ip ", u.n4SrcIP.String()) @@ -285,13 +301,15 @@ func (p *p4rtc) setUpfInfo(u *upf, conf *Conf) { p.timeout = 30 p.p4client, errin = p.channelSetup() u.accessIP = p.accessIP + if errin != nil { - fmt.Printf("create channel failed : %v\n", errin) + log.Println("create channel failed : ", errin) } else { errin = p.p4client.ClearPdrTable() if errin != nil { log.Println("clear PDR table failed : ", errin) } + errin = p.p4client.ClearFarTable() if errin != nil { log.Println("clear FAR table failed : ", errin) @@ -302,8 +320,10 @@ func (p *p4rtc) setUpfInfo(u *upf, conf *Conf) { if errin != nil { log.Println("Counter Init failed. : ", errin) } + if conf.EnableEndMarker { log.Println("Starting end marker loop") + p.endMarkerChan = make(chan []byte, 1024) go p.endMarkerSendLoop(p.endMarkerChan) } @@ -313,6 +333,7 @@ func (p *p4rtc) sendEndMarkers(endMarkerList *[][]byte) error { for _, eMarker := range *endMarkerList { p.endMarkerChan <- eMarker } + return nil } @@ -324,20 +345,24 @@ func (p *p4rtc) endMarkerSendLoop(endMarkerChan chan []byte) { } } } -func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, - fars []far, qers []qer) uint8 { + +func (p *p4rtc) sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 { log.Println("sendMsgToUPF p4") - var funcType uint8 - var err error - var val uint64 - var cause uint8 = ie.CauseRequestRejected + + var ( + funcType uint8 + err error + val uint64 + cause uint8 = ie.CauseRequestRejected + ) + if !p.isConnected(nil) { log.Println("p4rtc server not connected") return cause } switch method { - case "add": + case upfMsgTypeAdd: { funcType = FunctionTypeInsert for i := range pdrs { @@ -350,7 +375,7 @@ func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, pdrs[i].ctrID = uint32(val) } } - case "del": + case upfMsgTypeDel: { funcType = FunctionTypeDelete for i := range pdrs { @@ -358,7 +383,7 @@ func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, uint64(pdrs[i].ctrID)) } } - case "mod": + case upfMsgTypeMod: { funcType = FunctionTypeUpdate } @@ -372,10 +397,12 @@ func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, for _, pdr := range pdrs { log.Traceln(pdr) log.Traceln("write pdr funcType : ", funcType) + errin := p.p4client.WritePdrTable(pdr, funcType) if errin != nil { resetCounterVal(p, preQosPdrCounter, uint64(pdr.ctrID)) log.Println("pdr entry function failed ", errin) + return cause } } @@ -383,6 +410,7 @@ func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, for _, far := range fars { log.Traceln(far) log.Traceln("write far funcType : ", funcType) + errin := p.p4client.WriteFarTable(far, funcType) if errin != nil { log.Println("far entry function failed ", errin) @@ -391,5 +419,6 @@ func (p *p4rtc) sendMsgToUPF(method string, pdrs []pdr, } cause = ie.CauseRequestAccepted + return cause } diff --git a/pfcpiface/p4rtc.go b/pfcpiface/p4rtc.go index 940101ca5..70c59f630 100644 --- a/pfcpiface/p4rtc.go +++ b/pfcpiface/p4rtc.go @@ -22,19 +22,19 @@ import ( "google.golang.org/grpc" ) -//P4DeviceConfig ... Device config +// P4DeviceConfig ... Device config. type P4DeviceConfig []byte const invalidID = 0 -//Table Entry Function Type +// Table Entry Function Type. const ( - FunctionTypeInsert uint8 = 1 //Insert table Entry Function - FunctionTypeUpdate uint8 = 2 //Update table Entry Function - FunctionTypeDelete uint8 = 3 //Delete table Entry Function + FunctionTypeInsert uint8 = 1 // Insert table Entry Function + FunctionTypeUpdate uint8 = 2 // Update table Entry Function + FunctionTypeDelete uint8 = 3 // Delete table Entry Function ) -// IntfTableEntry ... Interface Table Entry API +// IntfTableEntry ... Interface Table Entry API. type IntfTableEntry struct { IP []byte PrefixLen int @@ -42,14 +42,14 @@ type IntfTableEntry struct { Direction string } -// ActionParam ... Action Param API +// ActionParam ... Action Param API. type ActionParam struct { Len uint32 Name string Value []byte } -// MatchField .. Match Field API +// MatchField .. Match Field API. type MatchField struct { Len uint32 PrefixLen uint32 @@ -58,7 +58,7 @@ type MatchField struct { Mask []byte } -//IntfCounterEntry .. Counter entry function API +// IntfCounterEntry .. Counter entry function API. type IntfCounterEntry struct { CounterID uint64 Index uint64 @@ -66,7 +66,7 @@ type IntfCounterEntry struct { PktCount []uint64 } -//AppTableEntry .. Table entry function API +// AppTableEntry .. Table entry function API. type AppTableEntry struct { FieldSize uint32 ParamSize uint32 @@ -76,7 +76,7 @@ type AppTableEntry struct { Params []ActionParam } -// P4rtClient ... P4 Runtime client object +// P4rtClient ... P4 Runtime client object. type P4rtClient struct { Client p4.P4RuntimeClient Conn *grpc.ClientConn @@ -92,6 +92,7 @@ func (c *P4rtClient) tableID(name string) uint32 { return table.Preamble.Id } } + return invalidID } @@ -104,12 +105,14 @@ func (c *P4rtClient) tableID(name string) uint32 { return invalidID } */ + func (c *P4rtClient) actionID(name string) uint32 { for _, action := range c.P4Info.Actions { if action.Preamble.Name == name { return action.Preamble.Id } } + return invalidID } @@ -117,7 +120,7 @@ func (c *P4rtClient) getEnumVal(enumName string, valName string) ([]byte, error) { enumVal, ok := c.P4Info.TypeInfo.SerializableEnums[enumName] if !ok { - err := fmt.Errorf("Enum Not found with name %s", enumName) + err := fmt.Errorf("enum not found with name %s", enumName) return nil, err } @@ -127,16 +130,15 @@ func (c *P4rtClient) getEnumVal(enumName string, } } - err := fmt.Errorf("EnumVal not found") - return nil, err + return nil, fmt.Errorf("EnumVal not found") } -// CheckStatus ... Check client connection status +// CheckStatus ... Check client connection status. func (c *P4rtClient) CheckStatus() (state int) { return int(c.Conn.GetState()) } -// SetMastership .. API +// SetMastership .. API. func (c *P4rtClient) SetMastership(electionID p4.Uint128) (err error) { c.ElectionID = electionID mastershipReq := &p4.StreamMessageRequest{ @@ -148,6 +150,7 @@ func (c *P4rtClient) SetMastership(electionID p4.Uint128) (err error) { }, } err = c.Stream.Send(mastershipReq) + return } @@ -161,15 +164,16 @@ func (c *P4rtClient) SendPacketOut(packet []byte) (err error) { }, } err = c.Stream.Send(pktOutReq) + return err } -// Init .. Initialize Client +// Init .. Initialize Client. func (c *P4rtClient) Init(timeout uint32, reportNotifyChan chan<- uint64) (err error) { // Initialize stream for mastership and packet I/O - //ctx, cancel := context.WithTimeout(context.Background(), + // ctx, cancel := context.WithTimeout(context.Background(), // time.Duration(timeout) * time.Second) - //defer cancel() + // defer cancel() c.Stream, err = c.Client.StreamChannel( context.Background(), grpcRetry.WithMax(3), @@ -178,6 +182,7 @@ func (c *P4rtClient) Init(timeout uint32, reportNotifyChan chan<- uint64) (err e log.Println("stream channel error: ", err) return } + go func() { for { res, err := c.Stream.Recv() @@ -209,25 +214,25 @@ func (c *P4rtClient) Init(timeout uint32, reportNotifyChan chan<- uint64) (err e } else { log.Println("stream recv: ", res) } - } }() /* - select { - case <-ctx.Done(): - log.Println(ctx.Err()) // prints "context deadline exceeded" - }*/ + select { + case <-ctx.Done(): + log.Println(ctx.Err()) // prints "context deadline exceeded" + } + */ log.Println("exited from recv thread.") + return } -// WriteFarTable .. Write far table entry API -func (c *P4rtClient) WriteFarTable( - farEntry far, funcType uint8) error { - +// WriteFarTable .. Write far table entry API. +func (c *P4rtClient) WriteFarTable(farEntry far, funcType uint8) error { log.Println("WriteFarTable.") + te := AppTableEntry{ TableName: "PreQosPipe.load_far_attributes", } @@ -237,25 +242,30 @@ func (c *P4rtClient) WriteFarTable( te.Fields[0].Name = "far_id" te.Fields[0].Value = make([]byte, 4) - binary.BigEndian.PutUint32(te.Fields[0].Value, uint32(farEntry.farID)) + binary.BigEndian.PutUint32(te.Fields[0].Value, farEntry.farID) te.Fields[1].Name = "session_id" + te.Fields[1].Value = make([]byte, 12) + fseidVal := make([]byte, 12) binary.BigEndian.PutUint32(fseidVal[:4], farEntry.fseidIP) binary.BigEndian.PutUint64(fseidVal[4:], farEntry.fseID) - te.Fields[1].Value = make([]byte, 12) + copy(te.Fields[1].Value, fseidVal) var prio int32 + if funcType == FunctionTypeDelete { te.ActionName = "NoAction" te.ParamSize = 0 + go func() { ret := c.InsertTableEntry(te, funcType, prio) if ret != nil { log.Println("Insert Table entry error : ", ret) } }() + return nil } else if funcType == FunctionTypeInsert { te.ActionName = "PreQosPipe.load_normal_far_attributes" @@ -263,18 +273,17 @@ func (c *P4rtClient) WriteFarTable( te.Params = make([]ActionParam, te.ParamSize) te.Params[0].Name = "needs_dropping" te.Params[0].Value = make([]byte, 1) - te.Params[0].Value[0] = byte(farEntry.applyAction & 0x01) + te.Params[0].Value[0] = farEntry.applyAction & 0x01 te.Params[1].Name = "notify_cp" te.Params[1].Value = make([]byte, 1) - te.Params[1].Value[0] = byte(farEntry.applyAction & 0x08) + te.Params[1].Value[0] = farEntry.applyAction & 0x08 } else if funcType == FunctionTypeUpdate { - te.ActionName = "PreQosPipe.load_tunnel_far_attributes" te.ParamSize = 8 te.Params = make([]ActionParam, te.ParamSize) te.Params[0].Name = "needs_dropping" te.Params[0].Value = make([]byte, 1) - te.Params[0].Value[0] = byte(farEntry.applyAction & 0x01) + te.Params[0].Value[0] = farEntry.applyAction & 0x01 te.Params[1].Name = "notify_cp" te.Params[1].Value = make([]byte, 1) if (farEntry.applyAction & 0x08) != 0 { @@ -320,11 +329,10 @@ func (c *P4rtClient) WriteFarTable( return c.InsertTableEntry(te, funcType, prio) } -// WritePdrTable .. Write pdr table entry API -func (c *P4rtClient) WritePdrTable( - pdrEntry pdr, funcType uint8) error { - +// WritePdrTable .. Write pdr table entry API. +func (c *P4rtClient) WritePdrTable(pdrEntry pdr, funcType uint8) error { log.Println("WritePdrTable.") + te := AppTableEntry{ TableName: "PreQosPipe.pdrs", ActionName: "PreQosPipe.set_pdr_attributes", @@ -335,8 +343,12 @@ func (c *P4rtClient) WritePdrTable( te.FieldSize = 2 te.Fields[0].Name = "src_iface" enumName := "InterfaceType" - var srcIntfStr string - var decapVal uint8 + + var ( + srcIntfStr string + decapVal uint8 + ) + if pdrEntry.srcIface == access { srcIntfStr = "ACCESS" decapVal = 1 @@ -354,7 +366,7 @@ func (c *P4rtClient) WritePdrTable( binary.BigEndian.PutUint32(te.Fields[1].Value, pdrEntry.tunnelTEID) te.Fields[1].Mask = make([]byte, 4) binary.BigEndian.PutUint32(te.Fields[1].Mask, pdrEntry.tunnelTEIDMask) - //te.Fields[2].Mask = b + // te.Fields[2].Mask = b te.Fields[2].Name = "tunnel_ipv4_dst" te.Fields[2].Value = make([]byte, 4) @@ -370,18 +382,20 @@ func (c *P4rtClient) WritePdrTable( } var prio int32 = 2 + if funcType == FunctionTypeDelete { te.ActionName = "NoAction" te.ParamSize = 0 + go func() { ret := c.InsertTableEntry(te, funcType, prio) if ret != nil { log.Println("Insert Table entry error : ", ret) } }() + return nil } else if funcType == FunctionTypeInsert { - te.ParamSize = 5 te.Params = make([]ActionParam, te.ParamSize) te.Params[0].Name = "id" @@ -401,22 +415,20 @@ func (c *P4rtClient) WritePdrTable( te.Params[3].Name = "far_id" te.Params[3].Value = make([]byte, 4) - binary.BigEndian.PutUint32(te.Params[3].Value, uint32(pdrEntry.farID)) + binary.BigEndian.PutUint32(te.Params[3].Value, pdrEntry.farID) te.Params[4].Name = "needs_gtpu_decap" te.Params[4].Value = make([]byte, 1) - te.Params[4].Value[0] = byte(decapVal) + te.Params[4].Value[0] = decapVal } return c.InsertTableEntry(te, funcType, prio) } -//WriteInterfaceTable ... Write Interface table Entry -func (c *P4rtClient) WriteInterfaceTable( - intfEntry IntfTableEntry, - funcType uint8) error { - +// WriteInterfaceTable ... Write Interface table Entry. +func (c *P4rtClient) WriteInterfaceTable(intfEntry IntfTableEntry, funcType uint8) error { log.Println("WriteInterfaceTable.") + te := AppTableEntry{ TableName: "PreQosPipe.source_iface_lookup", ActionName: "PreQosPipe.set_source_iface", @@ -432,23 +444,27 @@ func (c *P4rtClient) WriteInterfaceTable( te.Params = make([]ActionParam, 2) te.Params[0].Name = "src_iface" enumName := "InterfaceType" + val, err := c.getEnumVal(enumName, intfEntry.SrcIntf) if err != nil { log.Println("Could not find enum val ", err) return err } - te.Params[0].Value = val + te.Params[0].Value = val te.Params[1].Name = "direction" enumName = "Direction" + val, err = c.getEnumVal(enumName, intfEntry.Direction) if err != nil { log.Println("Could not find enum val ", err) return nil } + te.Params[1].Value = val var prio int32 + return c.InsertTableEntry(te, funcType, prio) } @@ -461,33 +477,40 @@ func (c *P4rtClient) getCounterValue(entity *p4.Entity, ce.ByteCount[index] = byteCount ce.PktCount[index] = pktCount log.Traceln("index , bytecount, pktcount ", index, byteCount, pktCount) + return nil } -func (c *P4rtClient) getFieldValue(entity *p4.Entity, - te AppTableEntry) (*MatchField, error) { +func (c *P4rtClient) getFieldValue(entity *p4.Entity, te AppTableEntry) (*MatchField, error) { log.Println("get Field Value") + entry := entity.GetTableEntry() tableID := c.tableID(te.TableName) actionID := c.actionID(te.ActionName) inputField := te.Fields[0] inputParam := te.Params[0] + if (entry.TableId != tableID) || (entry.Action.GetAction().ActionId != actionID) { - err := fmt.Errorf("Invalid tableID / ActionID.") + err := fmt.Errorf("invalid tableID / ActionID") return nil, err } - var matchType p4ConfigV1.MatchField_MatchType - var fieldID uint32 - var paramID uint32 + var ( + matchType p4ConfigV1.MatchField_MatchType + fieldID uint32 + paramID uint32 + ) + for _, tables := range c.P4Info.Tables { if tables.Preamble.Id == tableID { for _, fields := range tables.MatchFields { if fields.Name == inputField.Name { log.Println("field name match found.") + matchType = fields.GetMatchType() fieldID = fields.Id + break } } @@ -503,7 +526,9 @@ func (c *P4rtClient) getFieldValue(entity *p4.Entity, for _, params := range actions.Params { if params.Name == inputParam.Name { log.Println("field name match found.") + paramID = params.Id + break } } @@ -515,16 +540,20 @@ func (c *P4rtClient) getFieldValue(entity *p4.Entity, } log.Println("ParamId FieldID ", paramID, fieldID) + for _, params := range entry.Action.GetAction().Params { log.Println("ParamId recvd ", params.ParamId) log.Println("Param value ", params.Value) log.Println("inputParam value ", inputParam.Value) + if params.ParamId == paramID && (bytes.Equal(params.Value, inputParam.Value)) { log.Println("Param matched") + for _, fields := range entry.Match { if fields.FieldId == fieldID { log.Println("field name match found ", inputField.Name) + switch matchType { case p4ConfigV1.MatchField_EXACT: { @@ -553,24 +582,23 @@ func (c *P4rtClient) getFieldValue(entity *p4.Entity, } default: log.Println("Unknown MatchType.") - err := fmt.Errorf("Unknown MatchType for FieldMatch") - return nil, err + return nil, fmt.Errorf("unknown MatchType for FieldMatch") } log.Println("Field value found.") + return &inputField, nil } } } } - err := fmt.Errorf("getField Value failed") - return nil, err + return nil, fmt.Errorf("getField Value failed") } -func (c *P4rtClient) addFieldValue(entry *p4.TableEntry, field MatchField, - tableID uint32) error { +func (c *P4rtClient) addFieldValue(entry *p4.TableEntry, field MatchField, tableID uint32) error { log.Traceln("add Match field") + fieldVal := &p4.FieldMatch{ FieldId: 0, } @@ -580,7 +608,9 @@ func (c *P4rtClient) addFieldValue(entry *p4.TableEntry, field MatchField, for _, fields := range tables.MatchFields { if fields.Name == field.Name { log.Traceln("field name match found.") + fieldVal.FieldId = fields.Id + switch fields.GetMatchType() { case p4ConfigV1.MatchField_EXACT: { @@ -617,19 +647,18 @@ func (c *P4rtClient) addFieldValue(entry *p4.TableEntry, field MatchField, } default: log.Println("Unknown MatchType.") - err := fmt.Errorf("Unknown MatchType for FieldMatch") - return err + return fmt.Errorf("unknown MatchType for FieldMatch") } entry.Match = append(entry.Match, fieldVal) + return nil } } } } - err := fmt.Errorf("addField Value failed") - return err + return fmt.Errorf("addField Value failed") } func (c *P4rtClient) addActionValue(action *p4.Action, param ActionParam, @@ -645,19 +674,18 @@ func (c *P4rtClient) addActionValue(action *p4.Action, param ActionParam, Value: param.Value, } action.Params = append(action.Params, paramVal) + return nil } } } } - err := fmt.Errorf("addAction Value failed") - return err + return fmt.Errorf("addAction Value failed") } -//ReadCounter ... Read Counter entry +// ReadCounter ... Read Counter entry. func (c *P4rtClient) ReadCounter(ce *IntfCounterEntry) error { - log.Println("ReadCounter ID : ", ce.CounterID) readRes, err := c.ReadCounterEntry(ce) @@ -667,6 +695,7 @@ func (c *P4rtClient) ReadCounter(ce *IntfCounterEntry) error { } log.Traceln(proto.MarshalTextString(readRes)) + for _, ent := range readRes.GetEntities() { err := c.getCounterValue(ent, ce) if err != nil { @@ -678,18 +707,20 @@ func (c *P4rtClient) ReadCounter(ce *IntfCounterEntry) error { return nil } -//ReadCounterEntry .. Read counter Entry +// ReadCounterEntry .. Read counter Entry. func (c *P4rtClient) ReadCounterEntry(ce *IntfCounterEntry) (*p4.ReadResponse, error) { - log.Traceln("Read Counter Entry") - var index p4.Index + var ( + index p4.Index + entry p4.CounterEntry + entity p4.Entity + ctrEntry p4.Entity_CounterEntry + ) + index.Index = int64(ce.Index) - var entry p4.CounterEntry entry.CounterId = uint32(ce.CounterID) - //entry.Index = &index - var entity p4.Entity - var ctrEntry p4.Entity_CounterEntry + // entry.Index = &index ctrEntry.CounterEntry = &entry entity.Entity = &ctrEntry /* @@ -705,18 +736,20 @@ func (c *P4rtClient) ReadCounterEntry(ce *IntfCounterEntry) (*p4.ReadResponse, e Entity: &p4.Entity_CounterEntry{CounterEntry: entry}, }*/ log.Traceln(proto.MarshalTextString(&entity)) + return c.ReadReq(&entity) } -//ClearFarTable ... Clear FAR Table +// ClearFarTable ... Clear FAR Table. func (c *P4rtClient) ClearFarTable() error { - log.Println("ClearFarTable.") + te := AppTableEntry{ TableName: "PreQosPipe.load_far_attributes", } var prio int32 + readRes, err := c.ReadTableEntry(te, prio) if err != nil { log.Println("Read FAR table failed ", err) @@ -724,6 +757,7 @@ func (c *P4rtClient) ClearFarTable() error { } updates := make([]*p4.Update, len(readRes.GetEntities())) + for _, ent := range readRes.GetEntities() { updateType := p4.Update_DELETE update := &p4.Update{ @@ -744,15 +778,16 @@ func (c *P4rtClient) ClearFarTable() error { return nil } -//ClearPdrTable ... Clear PDR Table +// ClearPdrTable ... Clear PDR Table. func (c *P4rtClient) ClearPdrTable() error { + log.Println("ClearPdrTable") - log.Println("ClearPdrTable.") te := AppTableEntry{ TableName: "PreQosPipe.pdrs", } var prio int32 + readRes, err := c.ReadTableEntry(te, prio) if err != nil { log.Println("Read Pdr table failed ", err) @@ -760,6 +795,7 @@ func (c *P4rtClient) ClearPdrTable() error { } updates := make([]*p4.Update, len(readRes.GetEntities())) + for _, ent := range readRes.GetEntities() { updateType := p4.Update_DELETE update := &p4.Update{ @@ -780,11 +816,10 @@ func (c *P4rtClient) ClearPdrTable() error { return nil } -//ReadInterfaceTable ... Read Interface table Entry -func (c *P4rtClient) ReadInterfaceTable( - intfEntry *IntfTableEntry) error { - +// ReadInterfaceTable ... Read Interface table Entry. +func (c *P4rtClient) ReadInterfaceTable(intfEntry *IntfTableEntry) error { log.Println("ReadInterfaceTable.") + te := AppTableEntry{ TableName: "PreQosPipe.source_iface_lookup", ActionName: "PreQosPipe.set_source_iface", @@ -798,23 +833,27 @@ func (c *P4rtClient) ReadInterfaceTable( te.Params = make([]ActionParam, 2) te.Params[0].Name = "src_iface" enumName := "InterfaceType" + val, err := c.getEnumVal(enumName, intfEntry.SrcIntf) if err != nil { log.Println("Could not find enum val ", err) return err } - te.Params[0].Value = val + te.Params[0].Value = val te.Params[1].Name = "direction" enumName = "Direction" + val, err = c.getEnumVal(enumName, intfEntry.Direction) if err != nil { log.Println("Could not find enum val ", err) return err } + te.Params[1].Value = val var prio int32 + readRes, err := c.ReadTableEntry(te, prio) if err != nil { log.Println("Read Interface table failed ", err) @@ -832,17 +871,15 @@ func (c *P4rtClient) ReadInterfaceTable( copy(intfEntry.IP, field.Value) log.Println("ip , fieldval ", intfEntry.IP, field.Value) intfEntry.PrefixLen = int(field.PrefixLen) + return nil } - err = fmt.Errorf("ReadInterfaceTable failed") - return err + return fmt.Errorf("ReadInterfaceTable failed") } -//ReadTableEntry ... Read table Entry -func (c *P4rtClient) ReadTableEntry( - tableEntry AppTableEntry, prio int32) (*p4.ReadResponse, error) { - +// ReadTableEntry ... Read table Entry. +func (c *P4rtClient) ReadTableEntry(tableEntry AppTableEntry, prio int32) (*p4.ReadResponse, error) { log.Println("Read Table Entry for Table ", tableEntry.TableName) tableID := c.tableID(tableEntry.TableName) @@ -855,16 +892,18 @@ func (c *P4rtClient) ReadTableEntry( Entity: &p4.Entity_TableEntry{TableEntry: entry}, } log.Traceln(proto.MarshalTextString(entity)) + return c.ReadReq(entity) } -//ReadReqEntities ... Read request Entity +// ReadReqEntities ... Read request Entity. func (c *P4rtClient) ReadReqEntities(entities []*p4.Entity) (*p4.ReadResponse, error) { req := &p4.ReadRequest{ DeviceId: c.DeviceID, Entities: entities, } log.Traceln(proto.MarshalTextString(req)) + readClient, err := c.Client.Read(context.Background(), req) if err == nil { readRes, err := readClient.Recv() @@ -873,10 +912,11 @@ func (c *P4rtClient) ReadReqEntities(entities []*p4.Entity) (*p4.ReadResponse, e return readRes, nil } } + return nil, err } -// ReadReq ... Read Request +// ReadReq ... Read Request. func (c *P4rtClient) ReadReq(entity *p4.Entity) (*p4.ReadResponse, error) { var req p4.ReadRequest req.DeviceId = c.DeviceID @@ -887,6 +927,7 @@ func (c *P4rtClient) ReadReq(entity *p4.Entity) (*p4.ReadResponse, error) { defer cancel() log.Traceln(proto.MarshalTextString(&req)) + readClient, err := c.Client.Read(ctx, &req) if err == nil { readRes, err := readClient.Recv() @@ -895,14 +936,12 @@ func (c *P4rtClient) ReadReq(entity *p4.Entity) (*p4.ReadResponse, error) { return readRes, nil } } + return nil, err } -//InsertTableEntry .. Insert table Entry -func (c *P4rtClient) InsertTableEntry( - tableEntry AppTableEntry, - funcType uint8, prio int32) error { - +// InsertTableEntry .. Insert table Entry. +func (c *P4rtClient) InsertTableEntry(tableEntry AppTableEntry, funcType uint8, prio int32) error { log.Println("Insert Table Entry for Table ", tableEntry.TableName) tableID := c.tableID(tableEntry.TableName) actionID := c.actionID(tableEntry.ActionName) @@ -911,6 +950,7 @@ func (c *P4rtClient) InsertTableEntry( } log.Println("adding action params.") + for _, p := range tableEntry.Params { err := c.addActionValue(directAction, p, actionID) if err != nil { @@ -933,6 +973,7 @@ func (c *P4rtClient) InsertTableEntry( if uint32(count) >= tableEntry.FieldSize { break } + err := c.addFieldValue(entry, mf, tableID) if err != nil { return err @@ -956,10 +997,11 @@ func (c *P4rtClient) InsertTableEntry( } log.Traceln(proto.MarshalTextString(update)) + return c.WriteReq(update) } -// WriteReq ... Write Request +// WriteReq ... Write Request. func (c *P4rtClient) WriteReq(update *p4.Update) error { req := &p4.WriteRequest{ DeviceId: c.DeviceID, @@ -967,10 +1009,11 @@ func (c *P4rtClient) WriteReq(update *p4.Update) error { Updates: []*p4.Update{update}, } _, err := c.Client.Write(context.Background(), req) + return err } -// WriteBatchReq ... Write batch Request to up4 +// WriteBatchReq ... Write batch Request to up4. func (c *P4rtClient) WriteBatchReq(updates []*p4.Update) error { req := &p4.WriteRequest{ DeviceId: c.DeviceID, @@ -981,12 +1024,14 @@ func (c *P4rtClient) WriteBatchReq(updates []*p4.Update) error { log.Traceln(proto.MarshalTextString(req)) _, err := c.Client.Write(context.Background(), req) + return err } -// GetForwardingPipelineConfig ... Get Pipeline config from switch +// GetForwardingPipelineConfig ... Get Pipeline config from switch. func (c *P4rtClient) GetForwardingPipelineConfig() (err error) { log.Println("GetForwardingPipelineConfig") + pipeline, err := GetPipelineConfig(c.Client, c.DeviceID) if err != nil { log.Println("set pipeline config error ", err) @@ -994,10 +1039,11 @@ func (c *P4rtClient) GetForwardingPipelineConfig() (err error) { } c.P4Info = *pipeline.Config.P4Info + return } -// GetPipelineConfig ... Set pipeline config +// GetPipelineConfig ... Set pipeline config. func GetPipelineConfig(client p4.P4RuntimeClient, deviceID uint64) (*p4.GetForwardingPipelineConfigResponse, error) { req := &p4.GetForwardingPipelineConfigRequest{ DeviceId: deviceID, @@ -1009,10 +1055,11 @@ func GetPipelineConfig(client p4.P4RuntimeClient, deviceID uint64) (*p4.GetForwa log.Println("get forwarding pipeline returned error ", err) return nil, err } + return configRes, nil } -//SetForwardingPipelineConfig .. +// SetForwardingPipelineConfig .. func (c *P4rtClient) SetForwardingPipelineConfig(p4InfoPath, deviceConfigPath string) (err error) { log.Println("P4 Info: ", p4InfoPath) @@ -1023,6 +1070,7 @@ func (c *P4rtClient) SetForwardingPipelineConfig(p4InfoPath, deviceConfigPath st } var p4info p4ConfigV1.P4Info + err = proto.UnmarshalText(string(p4infoBytes), &p4info) if err != nil { log.Println("Unmarshal test failed for p4info ", err) @@ -1030,6 +1078,7 @@ func (c *P4rtClient) SetForwardingPipelineConfig(p4InfoPath, deviceConfigPath st } c.P4Info = p4info + deviceConfig, err := LoadDeviceConfig(deviceConfigPath) if err != nil { log.Println("bmv2 json read failed ", err) @@ -1045,10 +1094,11 @@ func (c *P4rtClient) SetForwardingPipelineConfig(p4InfoPath, deviceConfigPath st log.Println("set pipeline config error ", err) return } + return } -// SetPipelineConfig ... Set pipeline config +// SetPipelineConfig ... Set pipeline config. func SetPipelineConfig(client p4.P4RuntimeClient, deviceID uint64, electionID *p4.Uint128, config *p4.ForwardingPipelineConfig) error { req := &p4.SetForwardingPipelineConfigRequest{ DeviceId: deviceID, @@ -1057,26 +1107,30 @@ func SetPipelineConfig(client p4.P4RuntimeClient, deviceID uint64, electionID *p Action: p4.SetForwardingPipelineConfigRequest_VERIFY_AND_COMMIT, Config: config, } + _, err := client.SetForwardingPipelineConfig(context.Background(), req) if err != nil { log.Println("set forwarding pipeline returned error ", err) } + return err } -//GetConnection ... Get Grpc connection +// GetConnection ... Get Grpc connection. func GetConnection(host string) (conn *grpc.ClientConn, err error) { /* get connection */ log.Println("Get connection.") + conn, err = grpc.Dial(host, grpc.WithInsecure()) if err != nil { log.Println("grpc dial err: ", err) return nil, err } + return } -// LoadDeviceConfig : Load Device config +// LoadDeviceConfig : Load Device config. func LoadDeviceConfig(deviceConfigPath string) (P4DeviceConfig, error) { log.Println("BMv2 JSON: ", deviceConfigPath) @@ -1085,6 +1139,7 @@ func LoadDeviceConfig(deviceConfigPath string) (P4DeviceConfig, error) { return nil, fmt.Errorf("open %s: %v", deviceConfigPath, err) } defer deviceConfig.Close() + bmv2Info, err := deviceConfig.Stat() if err != nil { return nil, fmt.Errorf("stat %s: %v", deviceConfigPath, err) @@ -1100,7 +1155,7 @@ func LoadDeviceConfig(deviceConfigPath string) (P4DeviceConfig, error) { return bin, nil } -//CreateChannel ... Create p4runtime client channel +// CreateChannel ... Create p4runtime client channel. func CreateChannel(host string, deviceID uint64, timeout uint32, diff --git a/pfcpiface/parse-far.go b/pfcpiface/parse-far.go index 79efde756..f25f34cf9 100644 --- a/pfcpiface/parse-far.go +++ b/pfcpiface/parse-far.go @@ -15,23 +15,16 @@ import ( type operation int const ( - //FwdIEOuterHeaderCreation ... FwdIEOuterHeaderCreation Bits = 1 << iota - //FwdIEDestinationIntf ... FwdIEDestinationIntf - //FwdIEPfcpSMReqFlags ... FwdIEPfcpSMReqFlags ) const ( - //ActionForward ... ActionForward = 0x2 - //ActionDrop ... - ActionDrop = 0x1 - //ActionBuffer ... - ActionBuffer = 0x4 - //ActionNotify ... - ActionNotify = 0x8 + ActionDrop = 0x1 + ActionBuffer = 0x4 + ActionNotify = 0x8 ) const ( @@ -54,9 +47,9 @@ type far struct { tunnelPort uint16 } -// Satisfies the fmt.Stringer interface +// Satisfies the fmt.Stringer interface. func (f far) String() string { - var b = strings.Builder{} + b := strings.Builder{} fmt.Fprintf(&b, "\n") fmt.Fprintf(&b, "farID: %v\n", f.farID) fmt.Fprintf(&b, "fseID: %x\n", f.fseID) @@ -69,6 +62,7 @@ func (f far) String() string { fmt.Fprintf(&b, "tunnelTEID: %x\n", f.tunnelTEID) fmt.Fprintf(&b, "tunnelPort: %v\n", f.tunnelPort) fmt.Fprintf(&b, "sendEndMarker: %v\n", f.sendEndMarker) + return b.String() } @@ -87,7 +81,7 @@ func (f *far) setActionValue() uint8 { return farNotify } - //default action + // default action return farDrop } @@ -98,6 +92,7 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error if err != nil { return err } + f.farID = farID action, err := farIE.ApplyAction() @@ -106,6 +101,7 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error } f.applyAction = action + var fwdIEs []*ie.IE switch op { @@ -116,7 +112,7 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error case update: fwdIEs, err = farIE.UpdateForwardingParameters() default: - return errors.New("Invalid op specified") + return errors.New("invalid op specified") } if err != nil { @@ -124,11 +120,14 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error } f.sendEndMarker = false + var fields Bits + for _, fwdIE := range fwdIEs { switch fwdIE.Type { case ie.OuterHeaderCreation: fields = Set(fields, FwdIEOuterHeaderCreation) + ohcFields, err := fwdIE.OuterHeaderCreation() if err != nil { log.Println("Unable to parse OuterHeaderCreationFields!") @@ -141,11 +140,13 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error f.tunnelPort = tunnelGTPUPort case ie.DestinationInterface: fields = Set(fields, FwdIEDestinationIntf) + f.dstIntf, err = fwdIE.DestinationInterface() if err != nil { log.Println("Unable to parse DestinationInterface field") continue } + if f.dstIntf == ie.DstInterfaceAccess { f.tunnelIP4Src = ip2int(upf.accessIP) } else if f.dstIntf == ie.DstInterfaceCore { @@ -153,11 +154,13 @@ func (f *far) parseFAR(farIE *ie.IE, fseid uint64, upf *upf, op operation) error } case ie.PFCPSMReqFlags: fields = Set(fields, FwdIEPfcpSMReqFlags) + smReqFlags, err := fwdIE.PFCPSMReqFlags() if err != nil { log.Println("Unable to parse PFCPSMReqFlags!") continue } + if has2ndBit(smReqFlags) { f.sendEndMarker = true } diff --git a/pfcpiface/parse-pdr.go b/pfcpiface/parse-pdr.go index 66730417b..951d23d20 100644 --- a/pfcpiface/parse-pdr.go +++ b/pfcpiface/parse-pdr.go @@ -46,12 +46,13 @@ func needAllocIP(ueIPaddr *ie.UEIPAddressFields) bool { if has2ndBit(ueIPaddr.Flags) && !has5thBit(ueIPaddr.Flags) { return false } + return true } -// Satisfies the fmt.Stringer interface +// Satisfies the fmt.Stringer interface. func (p pdr) String() string { - var b = strings.Builder{} + b := strings.Builder{} fmt.Fprintf(&b, "\n") fmt.Fprintf(&b, "srcIface: %v\n", p.srcIface) fmt.Fprintf(&b, "tunnelIP4Dst: %v\n", int2ip(p.tunnelIP4Dst)) @@ -77,6 +78,7 @@ func (p pdr) String() string { fmt.Fprintf(&b, "qerID: %v\n", p.qerID) fmt.Fprintf(&b, "needDecap: %v\n", p.needDecap) fmt.Fprintf(&b, "allocIPFlag: %v\n", p.allocIPFlag) + return b.String() } @@ -95,13 +97,16 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err if needAllocIP(ueIPaddr) { /* alloc IPV6 if CHV6 is enabled : TBD */ log.Println("UPF should alloc UE IP. CHV4 flag set") + ueIP4, err = upf.ippool.allocIPV4() if err != nil { log.Println("failed to allocate UE IP") return err } - p.allocIPFlag = true + log.Println("ueipv4 : ", ueIP4.String()) + + p.allocIPFlag = true } else { ueIP4 = ueIPaddr.IPv4Address } @@ -127,6 +132,7 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err log.Println("Failed to parse FTEID IE") continue } + teid := fteid.TEID tunnelIPv4Address := fteid.IPv4Address @@ -135,10 +141,12 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err p.tunnelTEIDMask = 0xFFFFFFFF p.tunnelIP4Dst = ip2int(tunnelIPv4Address) p.tunnelIP4DstMask = 0xFFFFFFFF + log.Println("TunnelIPv4Address:", tunnelIPv4Address) } case ie.QFI: // Do nothing for the time being + continue } } @@ -156,7 +164,6 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err for _, ie2 := range pdiIEs { switch ie2.Type { case ie.ApplicationID: - appID, err := ie2.ApplicationID() if err != nil { log.Println("Unable to parse Application ID", err) @@ -168,13 +175,18 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err log.Println("Unable to find Application ID", err) continue } + if appID != apfd.appID { log.Fatalln("Mismatch in App ID", appID, apfd.appID) } + log.Println("inside application id", apfd.appID, apfd.flowDescs) + for _, flowDesc := range apfd.flowDescs { log.Println("flow desc", flowDesc) + var ipf ipFilterRule + err = ipf.parseFlowDesc(flowDesc, ueIP4.String()) if err != nil { return errBadFilterDesc @@ -182,6 +194,7 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err if (p.srcIface == access && ipf.direction == "out") || (p.srcIface == core && ipf.direction == "in") { log.Println("Found a match", p.srcIface, flowDesc) + if ipf.proto != reservedProto { p.proto = ipf.proto p.protoMask = reservedProto @@ -209,9 +222,11 @@ func (p *pdr) parsePDI(pdiIEs []*ie.IE, appPFDs map[string]appPFD, upf *upf) err // TODO: Implement referencing SDF ID continue } + log.Println("Flow Description is:", flowDesc) var ipf ipFilterRule + err = ipf.parseFlowDesc(flowDesc, ueIP4.String()) if err != nil { return errBadFilterDesc diff --git a/pfcpiface/parse-qer.go b/pfcpiface/parse-qer.go index 7b663ad0c..f6ffe9212 100644 --- a/pfcpiface/parse-qer.go +++ b/pfcpiface/parse-qer.go @@ -24,9 +24,9 @@ type qer struct { fseidIP uint32 } -// Satisfies the fmt.Stringer interface +// Satisfies the fmt.Stringer interface. func (q qer) String() string { - var b = strings.Builder{} + b := strings.Builder{} fmt.Fprintf(&b, "\n") fmt.Fprintf(&b, "qerID: %v\n", q.qerID) fmt.Fprintf(&b, "fseID: %x\n", q.fseID) @@ -38,11 +38,11 @@ func (q qer) String() string { fmt.Fprintf(&b, "downlinkMBR: %v\n", q.dlMbr) fmt.Fprintf(&b, "uplinkGBR: %v\n", q.ulGbr) fmt.Fprintf(&b, "downlinkGBR: %v\n", q.dlGbr) + return b.String() } func (q *qer) parseQER(ie1 *ie.IE, seid uint64, upf *upf) error { - qerID, err := ie1.QERID() if err != nil { log.Println("Could not read QER ID!") @@ -84,15 +84,15 @@ func (q *qer) parseQER(ie1 *ie.IE, seid uint64, upf *upf) error { log.Println("Could not read GBRDL!") } - q.qerID = uint32(qerID) - q.qfi = uint8(qfi) - q.ulStatus = uint8(gsUL) - q.dlStatus = uint8(gsDL) + q.qerID = qerID + q.qfi = qfi + q.ulStatus = gsUL + q.dlStatus = gsDL q.ulMbr = uint64(mbrUL) q.dlMbr = uint64(mbrDL) q.ulGbr = uint64(gbrUL) q.dlGbr = uint64(gbrDL) - q.fseID = (seid) // fseID currently being truncated to uint32 <--- FIXIT/TODO/XXX + q.fseID = seid return nil } diff --git a/pfcpiface/parse-sdf.go b/pfcpiface/parse-sdf.go index 5516c63a7..cb1fb699e 100644 --- a/pfcpiface/parse-sdf.go +++ b/pfcpiface/parse-sdf.go @@ -5,7 +5,6 @@ package main import ( "errors" - "fmt" "net" "strconv" "strings" @@ -17,9 +16,7 @@ const ( reservedProto = uint8(0xff) ) -var ( - errBadFilterDesc = errors.New("Unsupported Filter Description format") -) +var errBadFilterDesc = errors.New("unsupported Filter Description format") type endpoint struct { IPNet *net.IPNet @@ -29,19 +26,22 @@ type endpoint struct { func (ep *endpoint) parseNet(ipnet string) error { ipNetFields := strings.Split(ipnet, "/") log.Println(ipNetFields) + switch len(ipNetFields) { case 1: ipnet = ipNetFields[0] + "/32" case 2: default: - return errors.New("Incorrect network string") + return errors.New("incorrect network string") } var err error + _, ep.IPNet, err = net.ParseCIDR(ipnet) if err != nil { - return errors.New("Unable to ParseCIDR") + return errors.New("unable to ParseCIDR") } + return nil } @@ -50,7 +50,9 @@ func (ep *endpoint) parsePort(port string) error { if err != nil { return err } + ep.Port = uint16(p) + return nil } @@ -75,18 +77,20 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { if err := parseAction(fields[0]); err != nil { return err } + ipf.action = fields[0] if err := parseDirection(fields[1]); err != nil { return err } - ipf.direction = fields[1] + ipf.direction = fields[1] ipf.proto = parseProto(fields[2]) // bring to common intermediate representation xform := func(i int) { log.Println(fields) + switch fields[i] { case "any": fields[i] = "0.0.0.0/0" @@ -97,15 +101,18 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { fields[i] = "0.0.0.0/0" } } + log.Println(fields) } for i := 3; i < len(fields); i++ { log.Println(fields[i]) + switch fields[i] { case "from": i++ xform(i) + err := ipf.src.parseNet(fields[i]) if err != nil { log.Println(err) @@ -113,6 +120,7 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { if fields[i+1] != "to" { i++ + err = ipf.src.parsePort(fields[i]) if err != nil { log.Println("src port parse failed ", err) @@ -121,6 +129,7 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { case "to": i++ xform(i) + err := ipf.dst.parseNet(fields[i]) if err != nil { log.Println(err) @@ -128,6 +137,7 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { if i < len(fields)-1 { i++ + err = ipf.dst.parsePort(fields[i]) if err != nil { log.Println("dst port parse failed ", err) @@ -136,7 +146,8 @@ func (ipf *ipFilterRule) parseFlowDesc(flowDesc, ueIP string) error { } } - fmt.Println(ipf) + log.Println(ipf) + return nil } @@ -147,6 +158,7 @@ func parseAction(action string) error { default: return errBadFilterDesc } + return nil } @@ -157,6 +169,7 @@ func parseDirection(dir string) error { default: return errBadFilterDesc } + return nil } @@ -165,6 +178,7 @@ func parseProto(proto string) uint8 { if err == nil { return uint8(p) } + switch proto { case "udp": return 17 diff --git a/pfcpiface/pfcpsim.go b/pfcpiface/pfcpsim.go index 880bc50e4..f99659c75 100644 --- a/pfcpiface/pfcpsim.go +++ b/pfcpiface/pfcpsim.go @@ -127,7 +127,7 @@ func createPFCP(conn *net.UDPConn, raddr *net.UDPAddr) uint64 { ie.NewSourceInterface(ie.SrcInterfaceAccess), ie.NewFTEID(0x30000000, net.ParseIP("198.18.0.1"), nil, nil), ie.NewUEIPAddress(0x2, "16.0.0.1", "", 0, 0), - //ie.NewSDFFilter("permit out ip from 6.6.6.6/32 to assigned", "", "", "", 2), + // ie.NewSDFFilter("permit out ip from 6.6.6.6/32 to assigned", "", "", "", 2), ie.NewApplicationID("1000"), ), ie.NewOuterHeaderRemoval(0, 0), @@ -178,11 +178,11 @@ func createPFCP(conn *net.UDPConn, raddr *net.UDPAddr) uint64 { ie.NewCreateFAR( ie.NewFARID(3), ie.NewApplyAction(0x0c), - //ie.NewApplyAction(0x02), - //ie.NewForwardingParameters( - // ie.NewDestinationInterface(ie.DstInterfaceAccess), - // ie.NewOuterHeaderCreation(0x100, 0x00000001, "11.1.1.129", "", 0, 0, 0), - //), + // ie.NewApplyAction(0x02), + // ie.NewForwardingParameters( + // ie.NewDestinationInterface(ie.DstInterfaceAccess), + // ie.NewOuterHeaderCreation(0x100, 0x00000001, "11.1.1.129", "", 0, 0, 0), + // ), ), // Uplink N9 ie.NewCreateQER( @@ -248,7 +248,7 @@ func modifyPFCP(conn *net.UDPConn, raddr *net.UDPAddr, seid uint64) { seid, seq, 0, - //ie.NewFSEID(0x0000000000000001, net.ParseIP("127.0.0.1"), nil, nil), + // ie.NewFSEID(0x0000000000000001, net.ParseIP("127.0.0.1"), nil, nil), ie.NewPDNType(ie.PDNTypeIPv4), // Downlink N9 ie.NewUpdatePDR( @@ -310,6 +310,7 @@ func modifyPFCP(conn *net.UDPConn, raddr *net.UDPAddr, seid uint64) { func deletePFCP(conn *net.UDPConn, raddr *net.UDPAddr, seid uint64) { var seq uint32 = 5 + sdreq, err := message.NewSessionDeletionRequest( 0, 0, @@ -324,9 +325,11 @@ func deletePFCP(conn *net.UDPConn, raddr *net.UDPAddr, seid uint64) { if _, err := conn.Write(sdreq); err != nil { log.Fatal(err) } + log.Printf("sent session deletion request to: %s", raddr) buf := make([]byte, 1500) + _, _, err = conn.ReadFrom(buf) if err != nil { log.Fatal(err) @@ -345,11 +348,10 @@ func pfcpSim() { } seid := createPFCP(conn, raddr) - time.Sleep(10 * time.Second) - modifyPFCP(conn, raddr, seid) time.Sleep(10 * time.Second) + modifyPFCP(conn, raddr, seid) + time.Sleep(10 * time.Second) deletePFCP(conn, raddr, seid) - } diff --git a/pfcpiface/session-far.go b/pfcpiface/session-far.go index 1e4f5d4e8..dd159dcd5 100644 --- a/pfcpiface/session-far.go +++ b/pfcpiface/session-far.go @@ -12,7 +12,7 @@ import ( log "github.com/sirupsen/logrus" ) -// CreateFAR appends far to existing list of FARs in the session +// CreateFAR appends far to existing list of FARs in the session. func (s *PFCPSession) CreateFAR(f far) { s.fars = append(s.fars, f) } @@ -20,6 +20,7 @@ func (s *PFCPSession) CreateFAR(f far) { func addEndMarker(farItem far, endMarkerList *[][]byte) { // This time lets fill out some information log.Println("Adding end Marker for farID : ", farItem.farID) + options := gopacket.SerializeOptions{ ComputeChecksums: true, FixLengths: true, @@ -70,17 +71,20 @@ func addEndMarker(farItem far, endMarkerList *[][]byte) { } } -// UpdateFAR updates existing far in the session +// UpdateFAR updates existing far in the session. func (s *PFCPSession) UpdateFAR(f *far, endMarkerList *[][]byte) error { for idx, v := range s.fars { if v.farID == f.farID { if f.sendEndMarker { addEndMarker(v, endMarkerList) } + s.fars[idx] = *f + return nil } } + return errors.New("FAR not found") } @@ -93,12 +97,14 @@ func (s *PFCPSession) setNotifyFlag(flag bool) { func (s *PFCPSession) getNotifyFlag() bool { s.notificationFlag.mux.Lock() defer s.notificationFlag.mux.Unlock() + return s.notificationFlag.flag } -// UpdateFAR updates existing far in the session +// UpdateFAR updates existing far in the session. func (s *PFCPSession) updateNotifyFlag() { unset := true + for _, v := range s.fars { if v.applyAction&ActionNotify != 0 { unset = false @@ -110,7 +116,7 @@ func (s *PFCPSession) updateNotifyFlag() { } } -// RemoveFAR removes far from existing list of FARs in the session +// RemoveFAR removes far from existing list of FARs in the session. func (s *PFCPSession) RemoveFAR(id uint32) (*far, error) { for idx, v := range s.fars { if v.farID == id { @@ -118,5 +124,6 @@ func (s *PFCPSession) RemoveFAR(id uint32) (*far, error) { return &v, nil } } + return nil, errors.New("FAR not found") } diff --git a/pfcpiface/session-pdr.go b/pfcpiface/session-pdr.go index 8bf47515b..9ac4b991c 100644 --- a/pfcpiface/session-pdr.go +++ b/pfcpiface/session-pdr.go @@ -12,13 +12,16 @@ import ( "github.com/wmnsk/go-pfcp/message" ) -// Release allocated IPs +// Release allocated IPs. func releaseAllocatedIPs(upf *upf, session *PFCPSession) { log.Println("release allocated IPs") + for _, pdr := range session.pdrs { if (pdr.allocIPFlag) && (pdr.srcIface == core) { var ueIP net.IP = int2ip(pdr.dstIP) + log.Println("pdrID : ", pdr.pdrID, ", ueIP : ", ueIP.String()) + upf.ippool.deallocIPV4(ueIP) } } @@ -27,11 +30,16 @@ func releaseAllocatedIPs(upf *upf, session *PFCPSession) { func addPdrInfo(msg *message.SessionEstablishmentResponse, session *PFCPSession) { log.Println("Add PDRs with UPF alloc IPs to Establishment response") + for _, pdr := range session.pdrs { if (pdr.allocIPFlag) && (pdr.srcIface == core) { log.Println("pdrID : ", pdr.pdrID) - var flags uint8 = 0x02 - var ueIP net.IP = int2ip(pdr.dstIP) + + var ( + flags uint8 = 0x02 + ueIP net.IP = int2ip(pdr.dstIP) + ) + log.Println("ueIP : ", ueIP.String()) msg.CreatedPDR = append(msg.CreatedPDR, ie.NewCreatedPDR( @@ -42,12 +50,12 @@ func addPdrInfo(msg *message.SessionEstablishmentResponse, } } -// CreatePDR appends pdr to existing list of PDRs in the session +// CreatePDR appends pdr to existing list of PDRs in the session. func (s *PFCPSession) CreatePDR(p pdr) { s.pdrs = append(s.pdrs, p) } -// UpdatePDR updates existing pdr in the session +// UpdatePDR updates existing pdr in the session. func (s *PFCPSession) UpdatePDR(p pdr) error { for idx, v := range s.pdrs { if v.pdrID == p.pdrID { @@ -55,10 +63,11 @@ func (s *PFCPSession) UpdatePDR(p pdr) error { return nil } } + return errors.New("PDR not found") } -// RemovePDR removes pdr from existing list of PDRs in the session +// RemovePDR removes pdr from existing list of PDRs in the session. func (s *PFCPSession) RemovePDR(id uint32) (*pdr, error) { for idx, v := range s.pdrs { if v.pdrID == id { @@ -66,5 +75,6 @@ func (s *PFCPSession) RemovePDR(id uint32) (*pdr, error) { return &v, nil } } + return nil, errors.New("PDR not found") } diff --git a/pfcpiface/session-qer.go b/pfcpiface/session-qer.go index 19defdcfd..3af0d9c93 100644 --- a/pfcpiface/session-qer.go +++ b/pfcpiface/session-qer.go @@ -7,12 +7,12 @@ import ( "errors" ) -// CreateQER appends qer to existing list of QERs in the session +// CreateQER appends qer to existing list of QERs in the session. func (s *PFCPSession) CreateQER(q qer) { s.qers = append(s.qers, q) } -// UpdateQER updates existing qer in the session +// UpdateQER updates existing qer in the session. func (s *PFCPSession) UpdateQER(q qer) error { for idx, v := range s.qers { if v.qerID == q.qerID { @@ -20,10 +20,11 @@ func (s *PFCPSession) UpdateQER(q qer) error { return nil } } + return errors.New("QER not found") } -// RemoveQER removes qer from existing list of QERs in the session +// RemoveQER removes qer from existing list of QERs in the session. func (s *PFCPSession) RemoveQER(id uint32) (*qer, error) { for idx, v := range s.qers { if v.qerID == id { @@ -31,5 +32,6 @@ func (s *PFCPSession) RemoveQER(id uint32) (*qer, error) { return &v, nil } } + return nil, errors.New("QER not found") } diff --git a/pfcpiface/sessions.go b/pfcpiface/sessions.go index 174125356..a6abd6db9 100644 --- a/pfcpiface/sessions.go +++ b/pfcpiface/sessions.go @@ -9,7 +9,7 @@ import ( "time" ) -// PFCPSessionMgr manages PFCP sessions +// PFCPSessionMgr manages PFCP sessions. type PFCPSessionMgr struct { rng *rand.Rand nodeID string @@ -18,13 +18,13 @@ type PFCPSessionMgr struct { sessions map[uint64]*PFCPSession } -// PFD holds the switch level application IDs +// PFD holds the switch level application IDs. type appPFD struct { appID string flowDescs []string } -// NewPFCPSessionMgr initializes a manager struct with RNG and map of id/sessions +// NewPFCPSessionMgr initializes a manager struct with RNG and map of id/sessions. func NewPFCPSessionMgr(maxRetries int) *PFCPSessionMgr { return &PFCPSessionMgr{ rng: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -33,7 +33,7 @@ func NewPFCPSessionMgr(maxRetries int) *PFCPSessionMgr { } } -// RemoveSession removes session using id +// RemoveSession removes session using id. func (mgr *PFCPSessionMgr) RemoveSession(id uint64) { delete(mgr.sessions, id) globalPfcpStats.sessions.WithLabelValues(mgr.nodeID).Set(float64(len(mgr.sessions))) @@ -44,7 +44,7 @@ type notifyFlag struct { mux sync.Mutex } -// PFCPSession implements one PFCP session +// PFCPSession implements one PFCP session. type PFCPSession struct { localSEID uint64 remoteSEID uint64 @@ -54,7 +54,7 @@ type PFCPSession struct { qers []qer } -// NewPFCPSession allocates an session with ID +// NewPFCPSession allocates an session with ID. func (mgr *PFCPSessionMgr) NewPFCPSession(rseid uint64) uint64 { for i := 0; i < mgr.maxRetries; i++ { lseid := mgr.rng.Uint64() @@ -72,17 +72,19 @@ func (mgr *PFCPSessionMgr) NewPFCPSession(rseid uint64) uint64 { } mgr.sessions[lseid] = &s globalPfcpStats.sessions.WithLabelValues(mgr.nodeID).Set(float64(len(mgr.sessions))) + return lseid } + return 0 } -// ResetAppPFDs resets the map of application PFDs +// ResetAppPFDs resets the map of application PFDs. func (mgr *PFCPSessionMgr) ResetAppPFDs() { mgr.appPFDs = make(map[string]appPFD) } -// NewAppPFD stores app PFD in session mgr +// NewAppPFD stores app PFD in session mgr. func (mgr *PFCPSessionMgr) NewAppPFD(appID string) { mgr.appPFDs[appID] = appPFD{ appID: appID, @@ -90,7 +92,7 @@ func (mgr *PFCPSessionMgr) NewAppPFD(appID string) { } } -// RemoveAppPFD removes appPFD using appID +// RemoveAppPFD removes appPFD using appID. func (mgr *PFCPSessionMgr) RemoveAppPFD(appID string) { delete(mgr.appPFDs, appID) } diff --git a/pfcpiface/telemetry.go b/pfcpiface/telemetry.go index 1ac4916fb..649c239aa 100644 --- a/pfcpiface/telemetry.go +++ b/pfcpiface/telemetry.go @@ -11,7 +11,7 @@ import ( log "github.com/sirupsen/logrus" ) -// PfcpStats ... Prometheus metrics +// PfcpStats ... Prometheus metrics. type pfcpStats struct { messages *prometheus.CounterVec sessions *prometheus.GaugeVec @@ -37,6 +37,7 @@ func (ps *pfcpStats) register() error { if err := prometheus.Register(ps.messages); err != nil { return err } + if err := prometheus.Register(ps.sessions); err != nil { return err } @@ -53,10 +54,11 @@ func makeBuckets(values []uint64) map[float64]float64 { for idx, pctile := range getPctiles() { buckets[pctile] = float64(values[idx]) } + return buckets } -//upfCollector provides all UPF metrics +// upfCollector provides all UPF metrics. type upfCollector struct { packets *prometheus.Desc bytes *prometheus.Desc @@ -94,9 +96,8 @@ func newUpfCollector(upf *upf) *upfCollector { } } -//Describe writes all descriptors to the prometheus desc channel. +// Describe writes all descriptors to the prometheus desc channel. func (uc *upfCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- uc.packets ch <- uc.bytes ch <- uc.dropped @@ -105,7 +106,7 @@ func (uc *upfCollector) Describe(ch chan<- *prometheus.Desc) { ch <- uc.jitter } -//Collect writes all metrics to prometheus metric channel +// Collect writes all metrics to prometheus metric channel. func (uc *upfCollector) Collect(ch chan<- prometheus.Metric) { uc.summaryLatencyJitter(ch) uc.portStats(ch) @@ -123,9 +124,11 @@ func (uc *upfCollector) summaryLatencyJitter(ch chan<- prometheus.Metric) { func setupProm(upf *upf) { uc := newUpfCollector(upf) prometheus.MustRegister(uc) + globalPfcpStats = newPFCPStats() if err := globalPfcpStats.register(); err != nil { log.Panicln("Pfcp Stats register failed") } + http.Handle("/metrics", promhttp.Handler()) } diff --git a/pfcpiface/upf.go b/pfcpiface/upf.go index 3f4a6b743..be6e8ba7d 100644 --- a/pfcpiface/upf.go +++ b/pfcpiface/upf.go @@ -34,20 +34,20 @@ type upf struct { // to be replaced with go-pfcp structs -// Don't change these values +// Don't change these values. const ( tunnelGTPUPort = 2152 - // src-iface consts + // src-iface consts. core = 0x2 access = 0x1 - // far-id specific directions + // far-id specific directions. n3 = 0x0 n6 = 0x1 n9 = 0x2 - // far-action specific values + // far-action specific values. farForwardD = 0x0 farForwardU = 0x1 farDrop = 0x2 @@ -55,7 +55,7 @@ const ( farNotify = 0x4 ) -func (u *upf) sendMsgToUPF(method string, pdrs []pdr, fars []far, qers []qer) uint8 { +func (u *upf) sendMsgToUPF(method upfMsgType, pdrs []pdr, fars []far, qers []qer) uint8 { return u.intf.sendMsgToUPF(method, pdrs, fars, qers) } @@ -81,8 +81,8 @@ func (u *upf) sim(method string) { func (u *upf) setUpfInfo(conf *Conf) { u.reportNotifyChan = make(chan uint64, 1024) - u.n4SrcIP = net.ParseIP("0.0.0.0") - u.nodeIP = net.ParseIP("0.0.0.0") + u.n4SrcIP = net.ParseIP(net.IPv4zero.String()) + u.nodeIP = net.ParseIP(net.IPv4zero.String()) if conf.CPIface.SrcIP == "" { if conf.CPIface.DestIP != "" { @@ -103,8 +103,10 @@ func (u *upf) setUpfInfo(conf *Conf) { u.nodeIP = net.ParseIP(ips[0]) } } + log.Println("UPF Node IP : ", u.nodeIP.String()) log.Println("UPF Local IP : ", u.n4SrcIP.String()) + u.intf.setUpfInfo(u, conf) } diff --git a/pfcpiface/utils.go b/pfcpiface/utils.go index 0cb6953b1..e3491ccb7 100644 --- a/pfcpiface/utils.go +++ b/pfcpiface/utils.go @@ -12,20 +12,21 @@ import ( log "github.com/sirupsen/logrus" ) -//grpc channel state +// grpc channel state. const ( - Ready = 2 //grpc channel state Ready + Ready = 2 ) -// Bits type +// Bits type. type Bits uint8 -// Set Bits +// Set Bits. func Set(b, flag Bits) Bits { return b | flag } -//func Clear(b, flag Bits) Bits { return b &^ flag } -//func Toggle(b, flag Bits) Bits { return b ^ flag } -//func Has(b, flag Bits) bool { return b&flag != 0 } +// func Clear(b, flag Bits) Bits { return b &^ flag } +// func Toggle(b, flag Bits) Bits { return b ^ flag } +// func Has(b, flag Bits) bool { return b&flag != 0 } + func setUeipFeature(features ...uint8) { if len(features) >= 3 { features[2] = features[2] | 0x04 @@ -59,6 +60,7 @@ func ip2int(ip net.IP) uint32 { if len(ip) == 16 { return binary.BigEndian.Uint32(ip[12:16]) } + return binary.BigEndian.Uint32(ip) } @@ -66,6 +68,7 @@ func ipMask2int(ip net.IPMask) uint32 { if len(ip) == 16 { return binary.BigEndian.Uint32(ip[12:16]) } + return binary.BigEndian.Uint32(ip) } @@ -75,19 +78,21 @@ func hex2int(hexStr string) uint32 { // base 16 for hexadecimal result, _ := strconv.ParseUint(cleaned, 16, 32) + return uint32(result) } func int2ip(nn uint32) net.IP { ip := make(net.IP, 4) binary.BigEndian.PutUint32(ip, nn) + return ip } func getRemoteIP(dstIP string) net.IP { conn, err := net.Dial("udp", dstIP+":"+PFCPPort) if err != nil { - ip := "0.0.0.0" + ip := net.IPv4zero.String() return net.ParseIP(ip) } defer conn.Close() @@ -103,5 +108,6 @@ func getLocalIP(dstIP string) net.IP { } defer conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) + return localAddr.IP }