Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Add dataset "device syslog" to non-db client (POC) #83

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Binary file added doc/img/syslogNonDbFlowChart.PNG
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/img/syslogPipeline.PNG
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 45 additions & 0 deletions doc/syslogPipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SONiC Telemetry Syslog Pipeline
* [Overview](#overview)
* [Purpose](#purpose)
* [Syslog Extraction Method](#syslog-extraction-method)
* [Pipeline Design](#pipeline-design)
* [Design Implementation](#design-implementation)

# Overview
## Purpose:
To replace the original syslog pipeline to allow a secure transfer of syslog information from the SONiC device using the streaming telemetry servive to the client.


## Syslog Extraction Method
There were two methods considered for extracting the syslogs from the device to be available to the streaming telemetry service:

* Method 1: Reading the information directly from the log files (/var/log/syslog*)
* Method 2: Streaming the syslog updates from the device's syslog server

Method 2 was chosen to proceed with due it ability to bypass the log rotate issue and lessened CPU time.

# Pipeline Design

![SONiC TELEMETRY SYSLOG PIPELINE](img/syslogPipeline.PNG)

The syslog messages are sent to the telemetry service through a newly created syslog server port. The messages are taken to the gRPC server where it is forwarded to the collector service.

**The gMNI server will receive the syslog information utilizing streaming mode through the OTHERS database.**


# Design Implementation
In order to create this new functionality, A function and data set path was added to the [non-db client](/sonic-data-client/non_db_client.go).

This new function was created in order to retrieve and read messages from the syslog server as demonstrated through the following flowchart:

![SYSLOG NON-DB CLIENT FUNCTION](img/syslogNonDbFlowChart.PNG)










100 changes: 99 additions & 1 deletion sonic_data_client/non_db_client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client

import (
"container/list"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"sync"
"time"

Expand All @@ -28,6 +30,25 @@ const (

type dataGetFunc func() ([]byte, error)

// connects the port for the syslog messages
func openPort() {
hostName := "localhost"
portNum := "5150"
service := hostName + ":" + portNum
RemoteAddr, err := net.ResolveUDPAddr("udp4", service)
portConn, err := net.ListenUDP("udp4", RemoteAddr)
if err != nil {
log.V(2).Infof("%v", err)
return
}
syslogPort = portConn
}

// closes the port for the syslog messages
func closePort() {
syslogPort.Close()
}

type path2DataFunc struct {
path []string
getFunc dataGetFunc
Expand Down Expand Up @@ -61,6 +82,9 @@ var (
clientTrie *Trie
statsR statsRing

// the connection port for the syslog data set
syslogPort *net.UDPConn

versionFileStash sonicVersionYmlStash

// ImplIoutilReadFile points to the implementation of ioutil.ReadFile. Should be overridden by UTs only.
Expand Down Expand Up @@ -101,6 +125,10 @@ var (
path: []string{"OTHERS", "osversion", "build"},
getFunc: dataGetFunc(getBuildVersion),
},
{ // Get device syslog
path: []string{"OTHERS", "device", "syslog"},
getFunc: dataGetFunc(getDeviceSyslog),
},
}
)

Expand Down Expand Up @@ -229,6 +257,23 @@ func getCpuUtil() ([]byte, error) {
return b, nil
}

func getDeviceSyslog() ([]byte, error) {
bufferSize := 1024
buffer := make([]byte, bufferSize)
payloadSize, _, err := syslogPort.ReadFromUDP(buffer)
if err != nil {
log.V(2).Infof("%v", err)
return buffer, err
}
if bufferSize < payloadSize {
log.V(2).Infof("Payload was larger than buffer", err)
return nil, nil
}

log.V(4).Infof("getDeviceSyslog, output %v", string(buffer[:payloadSize]))
return buffer[:payloadSize], nil
}

func getProcMeminfo() ([]byte, error) {
memInfo, _ := linuxproc.ReadMemInfo("/proc/meminfo")
b, err := json.Marshal(memInfo)
Expand Down Expand Up @@ -428,6 +473,10 @@ func (c *NonDbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *s
for _, sub := range subscribe.GetSubscription() {
subMode := sub.GetMode()
if subMode != gnmipb.SubscriptionMode_SAMPLE {
if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to change the comment of the StreamRun as well: "It supports SAMPLE mode only."

streamOnChange(c, stop, sub)
return
}
putFatalMsg(c.q, fmt.Sprintf("Unsupported subscription mode: %v.", subMode))
return
}
Expand Down Expand Up @@ -494,6 +543,55 @@ func streamSample(c *NonDbClient, stop chan struct{}, sub *gnmipb.Subscription,
}
}

func streamOnChange(c *NonDbClient, stop chan struct{}, sub *gnmipb.Subscription) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does each subscriber have their owner streamOnChange? If how will you handle the case two client subscribe to syslog as the same time? have you tested it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to enable onchange mode for other dataset? If not you should disable them in the call flow to avoid expected issue in the server.

storageBuffer := list.New()
gnmiPath := sub.GetPath()
openPort()
getter, _ := c.path2Getter[gnmiPath]

c.q.Put(Value{
&spb.Value{
Timestamp: time.Now().UnixNano(),
SyncResponse: true,
},
})

for {
elem, _ := getter()
// adds new elem to storage space
storageBuffer.PushBack(elem)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you get elem as nil here and how do you handle it?

select {
case <-stop:
log.V(1).Infof("Stopping NonDbClient.streamOnChange routine for sub '%s'", sub)
// closes the port once the connection has been made
closePort()
return

default:
// prints value to the gnmi client
spbv := &spb.Value{
Prefix: c.prefix,
Path: gnmiPath,
Timestamp: time.Now().UnixNano(),
SyncResponse: false,
Val: &gnmipb.TypedValue{
Value: &gnmipb.TypedValue_JsonIetfVal{
JsonIetfVal: storageBuffer.Back().Value.([]byte),
}},
}
err := c.q.Put(Value{spbv})
if err != nil {
log.V(3).Infof("Failed to put for %v, %v", gnmiPath, err)
} else {
log.V(6).Infof("Added spbv #%v", spbv)
}
// removes the message once it has been sent to the client
storageBuffer.Remove(storageBuffer.Front())
}

}
}

// runGetterAndSend runs a given getter method and puts the result to client queue.
func runGetterAndSend(c *NonDbClient, gnmiPath *gnmipb.Path, getter dataGetFunc) error {
v, err := getter()
Expand Down Expand Up @@ -581,7 +679,7 @@ func (c *NonDbClient) Close() error {
return nil
}

func (c *NonDbClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error {
func (c *NonDbClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what change did you make here?

return nil
}
func (c *NonDbClient) Capabilities() []gnmipb.ModelData {
Expand Down