Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.6-dev
1.7.0
8 changes: 4 additions & 4 deletions internal/metricdata/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func HandleSubscriberEvent(subsData *metricinfo.CoreSubscriberData, sourceNf met
func storeSubscriber(sub *metricinfo.CoreSubscriber, sourceNf metricinfo.NfType) error {
metricData.SubLock.Lock()

_, ok := metricData.Subscribers[sub.Imsi]
if !ok {
if _, ok := metricData.Subscribers[sub.Imsi]; !ok {
metricData.Subscribers[sub.Imsi] = sub

promclient.SetSmfSessStats(sub.SmfIp, sub.Slice, sub.Dnn, sub.UpfName, incSMContextActive())
Expand All @@ -69,10 +68,11 @@ func updateSubscriber(sub *metricinfo.CoreSubscriber, sourceNf metricinfo.NfType
if s, ok := metricData.Subscribers[sub.Imsi]; ok {
deletePrometheusCoreSubData(s)

if sourceNf == metricinfo.NfTypeSmf {
switch sourceNf {
case metricinfo.NfTypeSmf:
// SMF specific fields
fillSmfSubsriberData(sub, s)
} else if sourceNf == metricinfo.NfTypeAmf {
case metricinfo.NfTypeAmf:
// AMF specific fields
fillAmfSubsriberData(sub, s)
}
Expand Down
21 changes: 10 additions & 11 deletions internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func StartKafkaReader(cfg *config.Configuration) {
// Start Kafka Event Reader
for _, nfStream := range cfg.NfStreams {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: makeUrlsFromUriPort(nfStream.Urls),
Topic: nfStream.Topic.TopicName,
GroupID: nfStream.Topic.TopicGroups,
Brokers: makeUrlsFromUriPort(nfStream.Urls),
Topic: nfStream.Topic.TopicName,
MaxBytes: 10e6, // 10MB
})

go reader(r)
Expand All @@ -48,29 +48,28 @@ func getSourceNfType(r *kafka.Reader) metricinfo.NfType {
case "sdcore-data-source-amf":
return metricinfo.NfTypeAmf
default:
logger.AppLog.Fatalf("invalid topic name [%v]", topic)
logger.AppLog.Fatalf("invalid topic name: %s", topic)
return metricinfo.NfTypeEnd
}
}

func reader(r *kafka.Reader) {
logger.AppLog.Infof("kafka reader for topic [%v] initialised", r.Config().Topic)
logger.AppLog.Infof("kafka reader for topic [%s] initialised", r.Config().Topic)
sourceNf := getSourceNfType(r)
for {
// the `ReadMessage` function blocks until we receive the next event
ctxt := context.Background()
msg, err := r.ReadMessage(ctxt)
msg, err := r.ReadMessage(context.Background())
if err != nil {
logger.AppLog.Errorf("Error reading off kafka bus err: %v", err)
logger.AppLog.Errorf("error reading off kafka bus err: %v", err)
time.Sleep(10 * time.Millisecond)
continue
}
logger.AppLog.Debugf("stream [%v] message %s", r.Config().Topic, string(msg.Value))
logger.AppLog.Debugf("stream [%s] message %s", r.Config().Topic, string(msg.Value))

var metricEvent metricinfo.MetricEvent
// Unmarshal the msg
if err := json.Unmarshal(msg.Value, &metricEvent); err != nil {
logger.AppLog.Fatalf("unmarshal smf event error %v", err.Error())
logger.AppLog.Fatalf("unmarshal metric event error %+v", err)
}

switch metricEvent.EventType {
Expand All @@ -81,7 +80,7 @@ func reader(r *kafka.Reader) {
case metricinfo.CNfStatusEvt:
metricdata.HandleNfStatusEvent(&metricEvent.NfStatusData)
default:
logger.AppLog.Fatalf("unknown event type [%v]", metricEvent.EventType)
logger.AppLog.Fatalf("unknown event type: %+v", metricEvent.EventType)
}
}
}
3 changes: 2 additions & 1 deletion metricfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func main() {
}
}

logger.AppLog.Infof("configuration: %v", cfg.Configuration)
cfg.Configuration.ApiServer.Addr = PodIp
cfg.Configuration.PrometheusServer.Addr = PodIp

Expand All @@ -57,6 +56,8 @@ func main() {
logger.SetLogLevel(level)
}

logger.AppLog.Infof("configuration: %+v", cfg.Configuration)

// Start Kafka Event Reader
reader.StartKafkaReader(cfg.Configuration)

Expand Down