Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(orc8r): add reindexing of gateway subscriber state to subscriberdb indexer #13041

Merged
merged 2 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion lte/cloud/configs/service_registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ services:
orc8r.io/state_indexer: "true"
orc8r.io/swagger_spec: "true"
annotations:
orc8r.io/state_indexer_types: "mobilityd_ipdesc_record"
orc8r.io/state_indexer_types: "mobilityd_ipdesc_record,gateway_subscriber_state"
orc8r.io/state_indexer_version: "1"
orc8r.io/obsidian_handlers_path_prefixes: >
/magma/v1/lte/:network_id/msisdns,
Expand Down
15 changes: 8 additions & 7 deletions lte/cloud/go/lte/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ const (
SubscriberStreamName = "subscriberdb"

// EnodebStateType etc. denote types of state replicated from AGWs.
EnodebStateType = "single_enodeb"
ICMPStateType = "icmp_monitoring"
MMEStateType = "MME"
MobilitydStateType = "mobilityd_ipdesc_record"
S1APStateType = "S1AP"
SPGWStateType = "SPGW"
SubscriberStateType = "subscriber_state"
EnodebStateType = "single_enodeb"
ICMPStateType = "icmp_monitoring"
MMEStateType = "MME"
MobilitydStateType = "mobilityd_ipdesc_record"
S1APStateType = "S1AP"
SPGWStateType = "SPGW"
SubscriberStateType = "subscriber_state"
GatewaySubscriberStateType = "gateway_subscriber_state"

// MSISDNBlobstoreType etc. denote blob types stored in blobstore tables.
MSISDNBlobstoreType = "msisdn"
Expand Down
2 changes: 2 additions & 0 deletions lte/cloud/go/serdes/serdes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
nprobe_models "magma/lte/cloud/go/services/nprobe/obsidian/models"
policydb_models "magma/lte/cloud/go/services/policydb/obsidian/models"
subscriberdb_models "magma/lte/cloud/go/services/subscriberdb/obsidian/models"
subscriberdb_storage "magma/lte/cloud/go/services/subscriberdb/storage"
"magma/orc8r/cloud/go/serde"
"magma/orc8r/cloud/go/serdes"
"magma/orc8r/cloud/go/services/state"
Expand Down Expand Up @@ -49,6 +50,7 @@ var (
state.NewStateSerde(lte.S1APStateType, &state.ArbitraryJSON{}),
state.NewStateSerde(lte.SPGWStateType, &state.ArbitraryJSON{}),
state.NewStateSerde(lte.SubscriberStateType, &state.ArbitraryJSON{}),
state.NewStateSerde(lte.GatewaySubscriberStateType, &subscriberdb_storage.GatewaySubscriberState{}),
))
// Device contains the full set of device serdes used in the LTE module
Device = serdes.Device
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/golang/glog"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -26,6 +27,7 @@ import (
"magma/lte/cloud/go/services/subscriberdb"
subscriberdb_protos "magma/lte/cloud/go/services/subscriberdb/protos"
subscriberdb_state "magma/lte/cloud/go/services/subscriberdb/state"
"magma/lte/cloud/go/services/subscriberdb/storage"
"magma/orc8r/cloud/go/services/state"
"magma/orc8r/cloud/go/services/state/indexer"
"magma/orc8r/cloud/go/services/state/protos"
Expand All @@ -37,10 +39,12 @@ const (
)

var (
indexerTypes = []string{lte.MobilitydStateType}
indexerTypes = []string{lte.MobilitydStateType, lte.GatewaySubscriberStateType}
)

type indexerServicer struct{}
type indexerServicer struct {
subscriberStore storage.SubscriberStorage
}

// NewIndexerServicer returns the state indexer for subscriberdb.
//
Expand All @@ -59,16 +63,22 @@ type indexerServicer struct{}
// - an {IP -> IMSI} mapping may be missing even though the IMSI is assigned
// that IP
// - an {IP -> IMSI} mapping may be stale (caller should check for staleness)
func NewIndexerServicer() protos.IndexerServer {
return &indexerServicer{}
//
// Gateway Subscriber State is reported as a map IMSI to arbitrary JSON (reported
// state for that IMSI).
// The indexer updates gateway subscriber state in SubscriberStorage.
// It deletes all entries for the gateway ID and writes the current
// IMSI,state pairs into the SubscriberStorage.
func NewIndexerServicer(ss storage.SubscriberStorage) protos.IndexerServer {
return &indexerServicer{subscriberStore: ss}
}

func (i *indexerServicer) Index(ctx context.Context, req *protos.IndexRequest) (*protos.IndexResponse, error) {
states, err := state_types.MakeStatesByID(req.States, serdes.State)
if err != nil {
return nil, err
}
stErrs, err := indexImpl(ctx, req.NetworkId, states)
stErrs, err := i.indexImpl(ctx, req.NetworkId, states)
if err != nil {
return nil, err
}
Expand All @@ -91,8 +101,31 @@ func (i *indexerServicer) CompleteReindex(ctx context.Context, req *protos.Compl
return nil, status.Errorf(codes.InvalidArgument, "unsupported from/to for CompleteReindex: %v to %v", req.FromVersion, req.ToVersion)
}

func indexImpl(ctx context.Context, networkID string, states state_types.StatesByID) (state_types.StateErrors, error) {
return setIPMappings(ctx, networkID, states)
func (i *indexerServicer) indexImpl(ctx context.Context, networkID string, states state_types.StatesByID) (state_types.StateErrors, error) {
statesMobilityD := state_types.StatesByID{}
statesSubscribers := state_types.StatesByID{}
for id, st := range states {
switch id.Type {
case lte.MobilitydStateType:
statesMobilityD[id] = st
case lte.GatewaySubscriberStateType:
statesSubscribers[id] = st
default:
glog.Errorf("Unsupported state type %s", id.Type)
}
}
var stErrs state_types.StateErrors
errs := &multierror.Error{}
if len(statesMobilityD) > 0 {
stErrsMobilityD, err := setIPMappings(ctx, networkID, statesMobilityD)
stErrs = stErrsMobilityD
errs = multierror.Append(errs, err)
}
if len(statesSubscribers) > 0 {
err := i.setGatewaySubscriberStates(networkID, statesSubscribers)
errs = multierror.Append(errs, err)
}
return stErrs, errs.ErrorOrNil()
}

// setIPMappings maps {IP -> IMSI}.
Expand Down Expand Up @@ -129,3 +162,15 @@ func setIPMappings(ctx context.Context, networkID string, states state_types.Sta

return stateErrors, nil
}

func (i *indexerServicer) setGatewaySubscriberStates(networkID string, states state_types.StatesByID) *multierror.Error {
errs := &multierror.Error{}
for id, st := range states {
err := i.subscriberStore.SetAllSubscribersForGateway(networkID, id.DeviceID, st.ReportedState.(*storage.GatewaySubscriberState))
if err != nil {
glog.Errorf("Error setting subscriber state for gateway %s: %s", id.DeviceID, err)
errs = multierror.Append(errs, err)
}
}
return errs
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"net"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"

"magma/lte/cloud/go/lte"
"magma/lte/cloud/go/serdes"
"magma/lte/cloud/go/services/subscriberdb"
"magma/lte/cloud/go/services/subscriberdb/storage"
subscriberdb_test_init "magma/lte/cloud/go/services/subscriberdb/test_init"
"magma/orc8r/cloud/go/serde"
"magma/orc8r/cloud/go/services/state"
Expand Down Expand Up @@ -56,10 +58,10 @@ func TestIndexerIP(t *testing.T) {
// "ip": {"address": "wKiArg=="}
// }
states := state_types.SerializedStatesByID{
id00: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}})},
id01: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}})},
id10: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}})},
id11: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.2")}})},
id00: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}}, lte.MobilitydStateType)},
id01: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}}, lte.MobilitydStateType)},
id10: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}}, lte.MobilitydStateType)},
id11: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.2")}}, lte.MobilitydStateType)},
}

// Index the imsi0->sid0 state, result is sid0->imsi0 reverse mapping
Expand All @@ -75,8 +77,8 @@ func TestIndexerIP(t *testing.T) {

// Correctly handle per-state errs
states = state_types.SerializedStatesByID{
id00: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.3")}})},
id2: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": "deadbeef"}})},
id00: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.3")}}, lte.MobilitydStateType)},
id2: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": "deadbeef"}}, lte.MobilitydStateType)},
}
errs, err = idx.Index("nid0", states)
assert.NoError(t, err)
Expand All @@ -86,8 +88,8 @@ func TestIndexerIP(t *testing.T) {
assert.Equal(t, []string{"IMSI0"}, gotC)
}

func serialize(t *testing.T, mobilitydState *state.ArbitraryJSON) []byte {
bytes, err := serde.Serialize(mobilitydState, lte.MobilitydStateType, serdes.State)
func serialize(t *testing.T, state serde.ValidateableBinaryConvertible, stateType string) []byte {
bytes, err := serde.Serialize(state, stateType, serdes.State)
assert.NoError(t, err)
return bytes
}
Expand All @@ -96,3 +98,143 @@ func encodeIP(ip string) string {
ipBytes := net.ParseIP(ip)[12:16] // get just the IPv4 bytes
return base64.StdEncoding.EncodeToString(ipBytes)
}

const gwid1 = "snowflake_ID_1"
const gwid2 = "snowflake_ID_2"

func TestIndexerSubscriber(t *testing.T) {
const (
version indexer.Version = 1 // copied from indexer_servicer.go
)

var (
types = []string{lte.GatewaySubscriberStateType}
)

subscriberStorage := subscriberdb_test_init.StartTestService(t)
idx := indexer.NewRemoteIndexer(subscriberdb.ServiceName, version, types...)

// define IDs
id0 := state_types.ID{Type: lte.GatewaySubscriberStateType, DeviceID: gwid1}
id1 := state_types.ID{Type: lte.GatewaySubscriberStateType, DeviceID: gwid2}
// define gateway states
gatewayState0 := storage.CreateTestGatewaySubscriberState("IMSI001010000000123", "IMSI001010000000456", "IMSI001010000000789", "IMSI001010000001011")
gatewayState1 := storage.CreateTestGatewaySubscriberState("IMSI002020000000123", "IMSI002020000000456", "IMSI002020000000789")

// empty gateway state
emptyState := storage.GatewaySubscriberState{Subscribers: map[string]state.ArbitraryJSON{}}
emptySerializedState := state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &emptyState, lte.GatewaySubscriberStateType)},
}
errs, err := idx.Index("nid0", emptySerializedState)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err := subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, emptyState))

// one gateway
serializedGatewayState := state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &gatewayState0, lte.GatewaySubscriberStateType)},
}
errs, err = idx.Index("nid0", serializedGatewayState)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, gatewayState0))

// two gatewayIDs
serializedGatewayState = state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &gatewayState1, lte.GatewaySubscriberStateType)},
id1: {SerializedReportedState: serialize(t, &gatewayState0, lte.GatewaySubscriberStateType)},
}
errs, err = idx.Index("nid0", serializedGatewayState)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, gatewayState1))
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid2)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, gatewayState0))

// empty gateways
emptySerializedState = state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &emptyState, lte.GatewaySubscriberStateType)},
id1: {SerializedReportedState: serialize(t, &emptyState, lte.GatewaySubscriberStateType)},
}
errs, err = idx.Index("nid0", emptySerializedState)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, emptyState))
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid2)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, emptyState))
}

func TestIndexerSubscriberAndIP(t *testing.T) {
const (
version indexer.Version = 1 // copied from indexer_servicer.go
)

var (
types = []string{lte.GatewaySubscriberStateType, lte.MobilitydStateType}
)

subscriberStorage := subscriberdb_test_init.StartTestService(t)
idx := indexer.NewRemoteIndexer(subscriberdb.ServiceName, version, types...)

// define IDs
id0 := state_types.ID{Type: lte.GatewaySubscriberStateType, DeviceID: gwid1}
id1 := state_types.ID{Type: lte.GatewaySubscriberStateType, DeviceID: gwid2}
// define gateway states
gatewayState0 := storage.CreateTestGatewaySubscriberState("IMSI001010000000123", "IMSI001010000000456", "IMSI001010000000789", "IMSI001010000001011")
gatewayState1 := storage.CreateTestGatewaySubscriberState("IMSI002020000000123", "IMSI002020000000456", "IMSI002020000000789")

// define mobilityd state ids
idm0 := state_types.ID{Type: lte.MobilitydStateType, DeviceID: "IMSI0.apn0"}
idm1 := state_types.ID{Type: lte.MobilitydStateType, DeviceID: "IMSI0.apn1"}

// mixed state
states := state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &gatewayState0, lte.GatewaySubscriberStateType)},
idm0: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}}, lte.MobilitydStateType)},
idm1: {SerializedReportedState: serialize(t, &state.ArbitraryJSON{"ip": state.ArbitraryJSON{"address": encodeIP("127.0.0.1")}}, lte.MobilitydStateType)},
id1: {SerializedReportedState: serialize(t, &gatewayState1, lte.GatewaySubscriberStateType)},
}
errs, err := idx.Index("nid0", states)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err := subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, gatewayState0))
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid2)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, gatewayState1))
foundIPs, err := subscriberdb.GetIMSIsForIP(context.Background(), "nid0", "127.0.0.1")
assert.NoError(t, err)
assert.Equal(t, []string{"IMSI0"}, foundIPs)

// create empty gateway states
emptyState := storage.GatewaySubscriberState{Subscribers: map[string]state.ArbitraryJSON{}}
emptySerializedState := state_types.SerializedStatesByID{
id0: {SerializedReportedState: serialize(t, &emptyState, lte.GatewaySubscriberStateType)},
id1: {SerializedReportedState: serialize(t, &emptyState, lte.GatewaySubscriberStateType)},
}
// empty gateway states
errs, err = idx.Index("nid0", emptySerializedState)
assert.NoError(t, err)
assert.Empty(t, errs)
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid1)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, emptyState))
foundStates, err = subscriberStorage.GetSubscribersForGateway("nid0", gwid2)
assert.NoError(t, err)
assert.True(t, cmp.Equal(*foundStates, emptyState))
foundIPs, err = subscriberdb.GetIMSIsForIP(context.Background(), "nid0", "127.0.0.1")
assert.NoError(t, err)
assert.Equal(t, []string{"IMSI0"}, foundIPs)
}