Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDFAB-1150] Detect failure & restore P4rt channel in PFCP Agent #599

Merged
merged 13 commits into from
Apr 1, 2022
4 changes: 3 additions & 1 deletion conf/upf.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@
"p4rtc_server": "onos",
"p4rtc_port": "51001",
"": "Default TC is ELASTIC",
"default_tc": 3
"default_tc": 3,
"": "Whether to wipe out PFCP state from UP4 datapath on UP4 restart. Default: false",
"clear_state_on_restart": false
}
}
3 changes: 2 additions & 1 deletion docs/configuration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ These are configurations commonly shared between P4-UPF and BESS-UPF.
| `p4rtciface.access_ip` | - | Yes | N3/S1u address for 5G/4G |
| `p4rtciface.p4rtc_server` | - | Yes | IP address of the P4Runtime server exposed by UP4 |
| `p4rtciface.p4rtc_port` | - | Yes | TCP port of the P4Runtime server exposed by UP4 |
| `p4rtciface.default_tc` | 3 | No | Default Traffic Class (default value is ELASTIC - TC=3) |
| `p4rtciface.default_tc` | 3 | No | Default Traffic Class (default value is ELASTIC - TC=3) |
| `p4rtciface.clear_state_on_restart` | false | No | Whether to wipe out PFCP state from UP4 datapath on UP4 restart. |
13 changes: 7 additions & 6 deletions pfcpiface/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ type IfaceType struct {

// P4rtcInfo : P4 runtime interface settings.
type P4rtcInfo struct {
SliceID uint8 `json:"slice_id"`
AccessIP string `json:"access_ip"`
P4rtcServer string `json:"p4rtc_server"`
P4rtcPort string `json:"p4rtc_port"`
QFIToTC map[uint8]uint8 `json:"qfi_tc_mapping"`
DefaultTC uint8 `json:"default_tc"`
SliceID uint8 `json:"slice_id"`
AccessIP string `json:"access_ip"`
P4rtcServer string `json:"p4rtc_server"`
P4rtcPort string `json:"p4rtc_port"`
QFIToTC map[uint8]uint8 `json:"qfi_tc_mapping"`
DefaultTC uint8 `json:"default_tc"`
ClearStateOnRestart bool `json:"clear_state_on_restart"`
}

// validateConf checks that the given config reaches a baseline of correctness.
Expand Down
18 changes: 3 additions & 15 deletions pfcpiface/p4rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ func (c *P4rtClient) SendPacketOut(packet []byte) (err error) {

// Init .. Initialize Client.
func (c *P4rtClient) Init() (err error) {
// Initialize stream for mastership and packet I/O
// ctx, cancel := context.WithTimeout(context.Background(),
// time.Duration(timeout) * time.Second)
// defer cancel()
c.digests = make(chan *p4.DigestList, 1024)

c.stream, err = c.client.StreamChannel(
context.Background(),
grpcRetry.WithMax(3),
Expand Down Expand Up @@ -189,13 +183,6 @@ func (c *P4rtClient) Init() (err error) {
}
}()

/*
select {
case <-ctx.Done():
log.Println(ctx.Err()) // prints "context deadline exceeded"
}
*/

log.Println("exited from recv thread.")

return
Expand Down Expand Up @@ -581,14 +568,15 @@ func LoadDeviceConfig(deviceConfigPath string) (P4DeviceConfig, error) {
// CreateChannel ... Create p4runtime client channel.
func CreateChannel(host string, deviceID uint64) (*P4rtClient, error) {
log.Println("create channel")
// Second, check to see if we can reuse the gRPC connection for a new P4RT client

conn, err := GetConnection(host)
if err != nil {
log.Println("grpc connection failed")
return nil, err
}

client := &P4rtClient{
digests: make(chan *p4.DigestList, 1024),
client: p4.NewP4RuntimeClient(conn),
conn: conn,
deviceID: deviceID,
Expand All @@ -602,7 +590,7 @@ func CreateChannel(host string, deviceID uint64) (*P4rtClient, error) {

err = client.SetMastership(TimeBasedElectionId())
if err != nil {
log.Println("Set Mastership error: ", err)
log.Error("Set Mastership error: ", err)
return nil, err
}

Expand Down
27 changes: 21 additions & 6 deletions pfcpiface/up4.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ func (up4 *UP4) setupChannel() error {
return err
}

up4.p4RtTranslator = newP4RtTranslator(up4.p4client.P4Info)

setupLog.Debug("P4Rt channel created")

return nil
Expand Down Expand Up @@ -464,13 +466,16 @@ func (up4 *UP4) tryConnect() error {
return nil
}

// p4client is nil only if it's a first attempt to connect to UP4.
firstRun := up4.p4client == nil

err := up4.setupChannel()
if err != nil {
log.Errorf("Failed to setup UP4 channel: %v", err)
return err
}

err = up4.initialize()
err = up4.initialize(firstRun)
if err != nil {
log.Errorf("Failed to initialize UP4: %v", err)
return err
Expand Down Expand Up @@ -560,11 +565,7 @@ func (up4 *UP4) listenToDDNs() {
}
}

// initialize configures the UP4-related objects.
// A caller should ensure that P4Client is not nil and the P4Runtime channel is open.
func (up4 *UP4) initialize() error {
up4.p4RtTranslator = newP4RtTranslator(up4.p4client.P4Info)

func (up4 *UP4) clearDatapathState() error {
err := up4.clearTables()
if err != nil {
log.Warningf("failed to clear tables: %v", err)
Expand All @@ -579,6 +580,20 @@ func (up4 *UP4) initialize() error {
return ErrOperationFailedWithReason("Interfaces initialization", err.Error())
}

return nil
}

// initialize configures the UP4-related objects.
// A caller should ensure that P4Client is not nil and the P4Runtime channel is open.
func (up4 *UP4) initialize(firstRun bool) error {
// always clear datapath state at startup or
// on UP4 datapath restart if ClearStateOnRestart is enabled.
if firstRun || up4.conf.ClearStateOnRestart {
if err := up4.clearDatapathState(); err != nil {
return err
}
}

up4.initOnce.Do(func() {
go up4.listenToDDNs()

Expand Down
66 changes: 66 additions & 0 deletions test/integration/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package integration

import (
"github.com/omec-project/upf-epc/internal/p4constants"
"github.com/omec-project/upf-epc/pfcpiface"
"github.com/wmnsk/go-pfcp/message"
"net"
"os"
"testing"
"time"

Expand Down Expand Up @@ -118,6 +120,70 @@ func TestUPFBasedUeIPAllocation(t *testing.T) {
verifyNoEntries(t, testcase.expected)
}

func TestDetectUP4Restart(t *testing.T) {
if !isDatapathUP4() {
t.Skipf("Skipping UP4-specific test for datapath: %s", os.Getenv(EnvDatapath))
}

run := func(t *testing.T) {
// restart UP4, it will close P4Runtime channel between pfcpiface and mock-up4
MustStopMockUP4()
MustStartMockUP4()

// establish session, it forces pfcpiface to re-connect to UP4.
// Otherwise, we would need to wait about 2 minutes for pfcpiface to re-connect.
pfcpClient.EstablishSession([]*ie.IE{
session.NewPDRBuilder().MarkAsUplink().
WithMethod(session.Create).
WithID(1).
WithTEID(15).
WithN3Address(upfN3Address).
WithFARID(1).
AddQERID(1).BuildPDR(),
session.NewPDRBuilder().MarkAsDownlink().
WithMethod(session.Create).
WithID(2).
WithUEAddress(ueAddress).
WithFARID(2).
AddQERID(1).BuildPDR(),
}, []*ie.IE{
session.NewFARBuilder().
WithMethod(session.Create).WithID(1).WithDstInterface(ie.DstInterfaceCore).
WithAction(ActionForward).BuildFAR(),
session.NewFARBuilder().
WithMethod(session.Create).WithID(2).
WithDstInterface(ie.DstInterfaceAccess).
WithAction(ActionDrop).WithTEID(16).
WithDownlinkIP(nodeBAddress).BuildFAR(),
}, []*ie.IE{
session.NewQERBuilder().WithMethod(session.Create).WithID(1).
WithUplinkMBR(500000).
WithDownlinkMBR(500000).
WithUplinkGBR(0).
WithDownlinkGBR(0).Build(),
})
}

t.Run("Do not clear on UP4 restart", func(t *testing.T) {
setup(t, ConfigDefault)
defer teardown(t)

run(t)
// do not clear state on UP4 restart means that interfaces will not be re-installed.
// The assumption is that the ONOS cluster preserves them, but BMv2 doesn't.
verifyNumberOfEntries(t, p4constants.TablePreQosPipeInterfaces, 0)
})

t.Run("Clear on UP4 restart", func(t *testing.T) {
setup(t, ConfigWipeOutOnUP4Restart)
defer teardown(t)

run(t)
// clear state on UP4 restart means that interfaces entries will be re-installed.
verifyNumberOfEntries(t, p4constants.TablePreQosPipeInterfaces, 2)
})
}

func TestPFCPHeartbeats(t *testing.T) {
setup(t, ConfigDefault)
defer teardown(t)
Expand Down
10 changes: 10 additions & 0 deletions test/integration/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
const (
ConfigDefault = iota
ConfigUPFBasedIPAllocation
ConfigWipeOutOnUP4Restart
)

const (
Expand Down Expand Up @@ -98,6 +99,13 @@ func UP4ConfigUPFBasedIPAllocation() pfcpiface.Conf {
return config
}

func UP4ConfigWipeOutOnUP4Restart() pfcpiface.Conf {
config := UP4ConfigDefault()
config.P4rtcIface.ClearStateOnRestart = true

return config
}

func GetConfig(datapath string, configType uint32) pfcpiface.Conf {
switch datapath {
case DatapathUP4:
Expand All @@ -106,6 +114,8 @@ func GetConfig(datapath string, configType uint32) pfcpiface.Conf {
return UP4ConfigDefault()
case ConfigUPFBasedIPAllocation:
return UP4ConfigUPFBasedIPAllocation()
case ConfigWipeOutOnUP4Restart:
return UP4ConfigWipeOutOnUP4Restart()
}
case DatapathBESS:
switch configType {
Expand Down
11 changes: 11 additions & 0 deletions test/integration/verify_up4.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ func verifyP4RuntimeEntries(t *testing.T, testdata *pfcpSessionData, expectedVal
fmt.Sprintf("app meter should have %d cells configured", expectedNrOfConfiguredMeters))
}

func verifyNumberOfEntries(t *testing.T, tableID uint32, expectedNoOfEntries int) {
p4rtClient, err := providers.ConnectP4rt("127.0.0.1:50001", false)
require.NoErrorf(t, err, "failed to connect to P4Runtime server")
defer providers.DisconnectP4rt()

entries, err := p4rtClient.ReadTableEntryWildcard(p4constants.GetTableIDToNameMap()[tableID])
require.NoError(t, err)

require.Len(t, entries, expectedNoOfEntries)
}

func verifyNoP4RuntimeEntries(t *testing.T, expectedValues p4RtValues) {
p4rtClient, err := providers.ConnectP4rt("127.0.0.1:50001", false)
require.NoErrorf(t, err, "failed to connect to P4Runtime server")
Expand Down