Skip to content

Commit

Permalink
Integrated logging from pkg retryrpc in imgr & iclient
Browse files Browse the repository at this point in the history
  • Loading branch information
edmc-ss committed Mar 1, 2022
1 parent ebf3481 commit 76f9cf3
Show file tree
Hide file tree
Showing 18 changed files with 83 additions and 32 deletions.
1 change: 1 addition & 0 deletions iclient/dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ LogFilePath:
LogToConsole: true
TraceEnabled: false
FUSELogEnabled: false
RetryRPCLogEnabled: false
HTTPServerIPAddr: dev
HTTPServerPort: 15347
1 change: 1 addition & 0 deletions iclient/iclient.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ LogFilePath:
LogToConsole: true
TraceEnabled: false
FUSELogEnabled: false
RetryRPCLogEnabled: false
HTTPServerIPAddr: iclient
HTTPServerPort: 15347
1 change: 1 addition & 0 deletions iclient/iclientpkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
// LogToConsole: true
// TraceEnabled: false
// FUSELogEnabled: false
// RetryRPCLogEnabled: false
// HTTPServerIPAddr: # Defaults to 0.0.0.0 (i.e. all interfaces)
// HTTPServerPort: # Defaults to disabling the embedded HTTP Server
//
Expand Down
2 changes: 1 addition & 1 deletion iclient/iclientpkg/fission.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func performMountFUSE() (err error) {
fuseDefaultPermissions,
globals.config.FUSEAllowOther,
&globals,
newLogger(),
logLoggerNew("FISSION", globals.config.FUSELogEnabled),
globals.fissionErrChan,
)

Expand Down
6 changes: 6 additions & 0 deletions iclient/iclientpkg/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type configStruct struct {
LogToConsole bool
TraceEnabled bool
FUSELogEnabled bool
RetryRPCLogEnabled bool
HTTPServerIPAddr string
HTTPServerPort uint16 // To be served on HTTPServerIPAddr via TCP
}
Expand Down Expand Up @@ -499,6 +500,10 @@ func initializeGlobals(confMap conf.ConfMap, fissionErrChan chan error) (err err
if nil != err {
logFatal(err)
}
globals.config.RetryRPCLogEnabled, err = confMap.FetchOptionValueBool("ICLIENT", "RetryRPCLogEnabled")
if nil != err {
logFatal(err)
}
globals.config.HTTPServerIPAddr, err = confMap.FetchOptionValueString("ICLIENT", "HTTPServerIPAddr")
if nil != err {
globals.config.HTTPServerIPAddr = "0.0.0.0"
Expand Down Expand Up @@ -589,6 +594,7 @@ func uninitializeGlobals() (err error) {
globals.config.LogToConsole = false
globals.config.TraceEnabled = false
globals.config.FUSELogEnabled = false
globals.config.RetryRPCLogEnabled = false
globals.config.HTTPServerIPAddr = ""
globals.config.HTTPServerPort = 0

Expand Down
30 changes: 19 additions & 11 deletions iclient/iclientpkg/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ import (
"time"
)

type logLoggerIOWriterStruct struct {
tag string
enabled bool
}

func logLoggerNew(tag string, enabled bool) (logLogger *log.Logger) {
logLogger = log.New(&logLoggerIOWriterStruct{tag: tag, enabled: enabled}, "", log.Lshortfile)
return
}

func (logLoggerIOWriter *logLoggerIOWriterStruct) Write(p []byte) (n int, err error) {
if logLoggerIOWriter.enabled {
logf(logLoggerIOWriter.tag, "%s", string(p[:]))
}

return len(p), nil
}

func logFatal(err error) {
logf("FATAL", "%v", err)
logStack()
Expand Down Expand Up @@ -102,6 +120,7 @@ func logf(level string, format string, args ...interface{}) {
} else {
globals.logFile.WriteString(logMsg + "\n")
}

if globals.config.LogToConsole {
fmt.Println(logMsg)
}
Expand All @@ -113,14 +132,3 @@ func logSIGHUP() {
globals.logFile = nil
}
}

func newLogger() *log.Logger {
return log.New(&globals, "", 0)
}

func (dummy *globalsStruct) Write(p []byte) (n int, err error) {
if globals.config.FUSELogEnabled {
logf("FISSION", "%s", string(p[:]))
}
return 0, nil
}
1 change: 1 addition & 0 deletions iclient/iclientpkg/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func startRPCHandler() (err error) {
Callbacks: &globals,
DeadlineIO: globals.config.RetryRPCDeadlineIO,
KeepAlivePeriod: globals.config.RetryRPCKeepAlivePeriod,
Logger: logLoggerNew("RETRYRPC", globals.config.RetryRPCLogEnabled),
}

globals.retryRPCClient, err = retryrpc.NewClient(globals.retryRPCClientConfig)
Expand Down
2 changes: 2 additions & 0 deletions iclient/iclientpkg/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func testSetup(t *testing.T) {
"ICLIENT.LogToConsole=true",
"ICLIENT.TraceEnabled=false",
"ICLIENT.FUSELogEnabled=false",
"ICLIENT.RetryRPCLogEnabled=false",
"ICLIENT.HTTPServerIPAddr=" + testIPAddr,
"ICLIENT.HTTPServerPort=" + fmt.Sprintf("%d", testClientHTTPServerPort),

Expand Down Expand Up @@ -217,6 +218,7 @@ func testSetup(t *testing.T) {
"IMGR.LogFilePath=",
"IMGR.LogToConsole=true",
"IMGR.TraceEnabled=false",
"IMGR.RetryRPCLogEnabled=false",

"ISWIFT.SwiftProxyIPAddr=" + testIPAddr,
"ISWIFT.SwiftProxyTCPPort=" + fmt.Sprintf("%d", testSwiftProxyTCPPort),
Expand Down
1 change: 1 addition & 0 deletions imgr/dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ RootDirMaxDirEntriesPerBPlusTreePage: 1024
LogFilePath:
LogToConsole: true
TraceEnabled: false
RetryRPCLogEnabled: false
1 change: 1 addition & 0 deletions imgr/imgr.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ RootDirMaxDirEntriesPerBPlusTreePage: 1024
LogFilePath:
LogToConsole: true
TraceEnabled: false
RetryRPCLogEnabled: false
1 change: 1 addition & 0 deletions imgr/imgrpkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
// LogFilePath: # imgr.log
// LogToConsole: true # false
// TraceEnabled: false
// RetryRPCLogEnabled: false
//
// Most of the config keys are required and must have values. One exception
// is LogFilePath that will default to "" and, hence, cause logging to not
Expand Down
12 changes: 9 additions & 3 deletions imgr/imgrpkg/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ type configStruct struct {
InodeTableMaxInodesPerBPlusTreePage uint64
RootDirMaxDirEntriesPerBPlusTreePage uint64

LogFilePath string // Unless starting with '/', relative to $CWD; == "" means disabled
LogToConsole bool
TraceEnabled bool
LogFilePath string // Unless starting with '/', relative to $CWD; == "" means disabled
LogToConsole bool
TraceEnabled bool
RetryRPCLogEnabled bool
}

type statsStruct struct {
Expand Down Expand Up @@ -637,6 +638,10 @@ func initializeGlobals(confMap conf.ConfMap) (err error) {
if nil != err {
logFatal(err)
}
globals.config.RetryRPCLogEnabled, err = confMap.FetchOptionValueBool("IMGR", "RetryRPCLogEnabled")
if nil != err {
logFatal(err)
}

configJSONified = utils.JSONify(globals.config, true)

Expand Down Expand Up @@ -703,6 +708,7 @@ func uninitializeGlobals() (err error) {
globals.config.LogFilePath = ""
globals.config.LogToConsole = false
globals.config.TraceEnabled = false
globals.config.RetryRPCLogEnabled = false

bucketstats.UnRegister("IMGR", "")

Expand Down
20 changes: 20 additions & 0 deletions imgr/imgrpkg/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,30 @@ package imgrpkg

import (
"fmt"
"log"
"os"
"runtime"
"time"
)

type logLoggerIOWriterStruct struct {
tag string
enabled bool
}

func logLoggerNew(tag string, enabled bool) (logLogger *log.Logger) {
logLogger = log.New(&logLoggerIOWriterStruct{tag: tag, enabled: enabled}, "", log.Lshortfile)
return
}

func (logLoggerIOWriter *logLoggerIOWriterStruct) Write(p []byte) (n int, err error) {
if logLoggerIOWriter.enabled {
logf(logLoggerIOWriter.tag, "%s", string(p[:]))
}

return len(p), nil
}

func logFatal(err error) {
logf("FATAL", "%v", err)
logStack()
Expand Down Expand Up @@ -101,6 +120,7 @@ func logf(level string, format string, args ...interface{}) {
} else {
globals.logFile.WriteString(logMsg + "\n")
}

if globals.config.LogToConsole {
fmt.Println(logMsg)
}
Expand Down
1 change: 1 addition & 0 deletions imgr/imgrpkg/retry-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func startRetryRPCServer() (err error) {
DeadlineIO: globals.config.RetryRPCDeadlineIO,
KeepAlivePeriod: globals.config.RetryRPCKeepAlivePeriod,
TLSCertificate: tlsCertificate,
Logger: logLoggerNew("RETRYRPC", globals.config.RetryRPCLogEnabled),
}

globals.retryrpcServer = retryrpc.NewServer(retryrpcServerConfig)
Expand Down
1 change: 1 addition & 0 deletions imgr/imgrpkg/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func testSetup(t *testing.T, retryrpcCallbacks interface{}) {
"IMGR.LogFilePath=",
"IMGR.LogToConsole=true",
"IMGR.TraceEnabled=false",
"IMGR.RetryRPCLogEnabled=false",

"ISWIFT.SwiftProxyIPAddr=" + testIPAddr,
"ISWIFT.SwiftProxyTCPPort=" + fmt.Sprintf("%d", testSwiftProxyTCPPort),
Expand Down
6 changes: 3 additions & 3 deletions retryrpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (server *Server) SendCallback(clientID uint64, msg []byte) {
server.Lock()
lci, ok := server.perClientInfo[clientID]
if !ok {
server.logger.Printf("SERVER: SendCallback() - unable to find client UniqueID: %v\n", clientID)
server.logger.Printf("SERVER: SendCallback() - unable to find client UniqueID: %v", clientID)
server.Unlock()
return
}
Expand All @@ -211,12 +211,12 @@ func (server *Server) Close() {
if len(server.tlsCertificate.Certificate) == 0 {
err := server.netListener.Close()
if err != nil {
server.logger.Printf("server.netListener.Close() returned err: %v\n", err)
server.logger.Printf("server.netListener.Close() returned err: %v", err)
}
} else {
err := server.tlsListener.Close()
if err != nil {
server.logger.Printf("server.tlsListener.Close() returned err: %v\n", err)
server.logger.Printf("server.tlsListener.Close() returned err: %v", err)
}
}

Expand Down
8 changes: 4 additions & 4 deletions retryrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (client *Client) send(method string, rpcRequest interface{}, rpcReply inter
if connectionRetryCount > ConnectionRetryLimit {
client.logger.Fatalf("In send(), ConnectionRetryLimit (%v) on calling dial() exceeded", ConnectionRetryLimit)
}
client.logger.Printf("initialDial() failed; retrying: %v\n", err)
client.logger.Printf("initialDial() failed; retrying: %v", err)
time.Sleep(connectionRetryDelay)
connectionRetryDelay *= ConnectionRetryDelayMultiplier
client.Lock()
Expand Down Expand Up @@ -221,7 +221,7 @@ func (client *Client) notifyReply(buf []byte, genNum uint64, recvResponse time.T
// Don't have ctx to reply. Assume read garbage on socket and
// reconnect.

client.logger.Printf("notifyReply failed to unmarshal buf: %+v err: %v\n", string(buf), err)
client.logger.Printf("notifyReply failed to unmarshal buf: %+v err: %v", string(buf), err)
client.retransmit(genNum)
return
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (client *Client) notifyReply(buf []byte, genNum uint64, recvResponse time.T
m := svrResponse{Result: ctx.rpcReply}
unmarshalErr := json.Unmarshal(buf, &m)
if unmarshalErr != nil {
client.logger.Printf("notifyReply failed to unmarshal buf: %v err: %v ctx: %v\n", string(buf), unmarshalErr, ctx)
client.logger.Printf("notifyReply failed to unmarshal buf: %v err: %v ctx: %v", string(buf), unmarshalErr, ctx)

// Assume read garbage on socket - close the socket and reconnect
// Drop client lock since retransmit() will acquire it.
Expand Down Expand Up @@ -369,7 +369,7 @@ func (client *Client) readReplies(nC net.Conn, callingGenNum uint64) {
client.stats.UpcallCalled.Add(1)

default:
client.logger.Printf("CLIENT - invalid msgType: %v\n", msgType)
client.logger.Printf("CLIENT - invalid msgType: %v", msgType)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions retryrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (server *Server) run() {
}
if err != nil {
if !server.halting {
server.logger.Printf("net.Accept failed for Retry RPC listener - err: %v\n", err)
server.logger.Printf("net.Accept failed for Retry RPC listener - err: %v", err)
}
server.listenersWG.Done()
return
Expand All @@ -72,7 +72,7 @@ func (server *Server) run() {
ci, err := server.getClientIDAndWait(cCtx)
if err != nil {
// Socket already had an error - just loop back
server.logger.Printf("getClientIDAndWait() from client addr: %v returned err: %v\n", conn.RemoteAddr(), err)
server.logger.Printf("getClientIDAndWait() from client addr: %v returned err: %v", conn.RemoteAddr(), err)

// Sleep to block over active clients from pounding on us
time.Sleep(1 * time.Second)
Expand All @@ -85,10 +85,10 @@ func (server *Server) run() {
go func(myCi *clientInfo, myCCtx *connCtx, myConn net.Conn, myElm *list.Element) {
defer server.goroutineWG.Done()

server.logger.Printf("Servicing client: %v address: %v\n", myCi.myUniqueID, myConn.RemoteAddr())
server.logger.Printf("Servicing client: %v address: %v", myCi.myUniqueID, myConn.RemoteAddr())
server.serviceClient(myCi, myCCtx)

server.logger.Printf("Closing client: %v address: %v\n", myCi.myUniqueID, myConn.RemoteAddr())
server.logger.Printf("Closing client: %v address: %v", myCi.myUniqueID, myConn.RemoteAddr())
server.closeClient(myConn, myElm)

// The clientInfo for this client will first be trimmed and then later
Expand All @@ -109,7 +109,7 @@ func (server *Server) processRequest(ci *clientInfo, myConnCtx *connCtx, buf []b
jReq := jsonRequest{}
unmarErr := json.Unmarshal(buf, &jReq)
if unmarErr != nil {
server.logger.Printf("Unmarshal of buf failed with err: %v\n", unmarErr)
server.logger.Printf("Unmarshal of buf failed with err: %v", unmarErr)
return
}

Expand Down Expand Up @@ -216,7 +216,7 @@ func (server *Server) getClientIDAndWait(cCtx *connCtx) (ci *clientInfo, err err
}

if (msgType != PassID) && (msgType != AskMyUniqueID) {
server.logger.Fatalf("Server expecting msgType PassID or AskMyUniqueID and received: %v\n", msgType)
server.logger.Fatalf("Server expecting msgType PassID or AskMyUniqueID and received: %v", msgType)
return
}

Expand Down Expand Up @@ -443,7 +443,7 @@ func (server *Server) returnResults(ior *ioReply, cCtx *connCtx) {
cCtx.conn.SetDeadline(time.Now().Add(server.deadlineIO))
cnt, e := cCtx.conn.Write(ior.JResult)
if e != nil {
server.logger.Printf("returnResults() returned err: %v cnt: %v length of JResult: %v\n", e, cnt, len(ior.JResult))
server.logger.Printf("returnResults() returned err: %v cnt: %v length of JResult: %v", e, cnt, len(ior.JResult))
}
cCtx.Unlock()
}
Expand Down Expand Up @@ -493,12 +493,12 @@ func (server *Server) trimCompleted(t time.Time, long bool) {
if ci.isEmpty() && ci.cCtx.serviceClientExited {
ci.unregsiterMethodStats(server)
delete(server.perClientInfo, key)
server.logger.Printf("Trim - DELETE inactive clientInfo with ID: %v\n", ci.myUniqueID)
server.logger.Printf("Trim - DELETE inactive clientInfo with ID: %v", ci.myUniqueID)
}
ci.cCtx.Unlock()
ci.Unlock()
}
server.logger.Printf("Trimmed completed RetryRpcs - Total: %v\n", totalItems)
server.logger.Printf("Trimmed completed RetryRpcs - Total: %v", totalItems)
} else {
for k, ci := range server.perClientInfo {
n := server.trimAClientBasedACK(k, ci)
Expand Down Expand Up @@ -556,7 +556,7 @@ func (server *Server) trimTLLBased(ci *clientInfo, t time.Time) (numItems int) {
}
}
s := ci.stats
server.logger.Printf("ID: %v largestReplySize: %v largestReplySizeMethod: %v longest RPC: %v longest RPC Method: %v\n",
server.logger.Printf("ID: %v largestReplySize: %v largestReplySizeMethod: %v longest RPC: %v longest RPC Method: %v",
ci.myUniqueID, s.largestReplySize, s.largestReplySizeMethod, s.longestRPC, s.longestRPCMethod)

ci.Unlock()
Expand Down

0 comments on commit 76f9cf3

Please sign in to comment.