From 5119b408e433871b8731c99ed0e5676ea7c60e79 Mon Sep 17 00:00:00 2001 From: Mohamed Abokammer Date: Mon, 15 Aug 2022 11:28:16 +0100 Subject: [PATCH] Use batch logic to push logs Signed-off-by: Mohamed Abokammer --- cmds/admin_server/server/server.go | 22 +++-- cmds/admin_server/storage/mongo/mongo.go | 12 +-- cmds/admin_server/storage/storage.go | 2 +- pkg/loggerhook/httphook.go | 88 +++++++++++++++++--- tests/integ/admin_server/logendpoint_test.go | 37 ++++---- 5 files changed, 119 insertions(+), 42 deletions(-) diff --git a/cmds/admin_server/server/server.go b/cmds/admin_server/server/server.go index aa5c4616..533fd9f7 100644 --- a/cmds/admin_server/server/server.go +++ b/cmds/admin_server/server/server.go @@ -107,24 +107,28 @@ func (r *RouteHandler) status(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "live"}) } -// addLog inserts a new log entry inside the database -func (r *RouteHandler) addLog(c *gin.Context) { - - var log Log - if err := c.Bind(&log); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted log"}) +// addLogs inserts log's batches into the database +func (r *RouteHandler) addLogs(c *gin.Context) { + var logs []Log + if err := c.Bind(&logs); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted logs"}) r.log.Errorf("Err while binding request body %v", err) return } + storageLogs := make([]storage.Log, 0, len(logs)) + for _, log := range logs { + storageLogs = append(storageLogs, log.ToStorageLog()) + } + ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout) defer cancel() ctx = ctx.WithLogger(r.log) - err := r.storage.StoreLog(ctx, log.ToStorageLog()) + err := r.storage.StoreLogs(ctx, storageLogs) if err != nil { switch { case errors.Is(err, storage.ErrInsert): - c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the log"}) + c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the batch"}) case errors.Is(err, storage.ErrReadOnlyStorage): c.JSON(http.StatusNotImplemented, gin.H{"status": "err", "msg": "not supported action"}) default: @@ -168,7 +172,7 @@ func initRouter(ctx xcontext.Context, rh RouteHandler, middlewares []gin.Handler } r.GET("/status", rh.status) - r.POST("/log", rh.addLog) + r.POST("/log", rh.addLogs) r.GET("/log", rh.getLogs) // serve the frontend app diff --git a/cmds/admin_server/storage/mongo/mongo.go b/cmds/admin_server/storage/mongo/mongo.go index ae2c4f62..9a99b19f 100644 --- a/cmds/admin_server/storage/mongo/mongo.go +++ b/cmds/admin_server/storage/mongo/mongo.go @@ -106,12 +106,14 @@ func toMongoQuery(query storage.Query) bson.D { return q } -func (s *MongoStorage) StoreLog(ctx xcontext.Context, log storage.Log) error { - mongoLog := toMongoLog(&log) - _, err := s.collection.InsertOne(ctx, mongoLog) +func (s *MongoStorage) StoreLogs(ctx xcontext.Context, logs []storage.Log) error { + var mongoLogs []interface{} + for _, log := range logs { + mongoLogs = append(mongoLogs, toMongoLog(&log)) + } + _, err := s.collection.InsertMany(ctx, mongoLogs) if err != nil { - // for better debugging - ctx.Errorf("Error while inserting into the db: %v", err) + ctx.Errorf("Error while inserting a batch of logs: %v", err) return storage.ErrInsert } return nil diff --git a/cmds/admin_server/storage/storage.go b/cmds/admin_server/storage/storage.go index d15a4435..6740097c 100644 --- a/cmds/admin_server/storage/storage.go +++ b/cmds/admin_server/storage/storage.go @@ -19,7 +19,7 @@ var ( ) type Storage interface { - StoreLog(ctx xcontext.Context, entry Log) error + StoreLogs(ctx xcontext.Context, logs []Log) error GetLogs(ctx xcontext.Context, query Query) (*Result, error) Close(ctx xcontext.Context) error diff --git a/pkg/loggerhook/httphook.go b/pkg/loggerhook/httphook.go index 037cae6c..96511a18 100644 --- a/pkg/loggerhook/httphook.go +++ b/pkg/loggerhook/httphook.go @@ -18,11 +18,59 @@ import ( var ( DefaultBufferSize = 10 + MaxBatchSize = 500000 // size in bytes + MaxBatchCount = 100 + BatchSendFreq = 1 * time.Second DefaultLogTimeout = 1 * time.Second ) +// Batch defines a log batch that handles the size in bytes of the logs +type Batch struct { + addr string + logs []server.Log + size uint64 +} + +func NewBatch(addr string) Batch { + return Batch{ + addr: addr, + } +} + +func (b *Batch) Add(log server.Log) { + b.logs = append(b.logs, log) + b.size += uint64(len(log.LogData)) +} + +func (b *Batch) Size() uint64 { + return b.size +} + +func (b *Batch) Count() int { + return len(b.logs) +} + +// PostAndReset makes a post request sending hh.batch and reseting the batch +func (b *Batch) PostAndReset() error { + logJson, err := json.Marshal(b.logs) + if err != nil { + return fmt.Errorf("Marshal Err: %v", err) + } + requestBody := bytes.NewBuffer(logJson) + _, err = http.Post(b.addr, "application/json", requestBody) + if err != nil { + return fmt.Errorf("Http Logger Err: %v", err) + } + + b.logs = nil + b.size = 0 + return nil +} + type HttpHook struct { - Addr string + batch Batch + batchTicker *time.Ticker + logChan chan server.Log closeChan chan struct{} } @@ -36,9 +84,10 @@ func NewHttpHook(addr string) (*HttpHook, error) { url.Path = path.Join(url.Path, "log") hh := HttpHook{ - Addr: url.String(), - logChan: make(chan server.Log, DefaultBufferSize), - closeChan: make(chan struct{}), + batch: NewBatch(url.String()), + batchTicker: time.NewTicker(BatchSendFreq), + logChan: make(chan server.Log, DefaultBufferSize), + closeChan: make(chan struct{}), } go hh.logHandler() @@ -90,16 +139,32 @@ func (hh *HttpHook) logHandler() { for { select { case log := <-hh.logChan: - logJson, err := json.Marshal(log) - if err != nil { - fmt.Fprintf(os.Stderr, "Marshal Err: %v", err) + hh.batch.Add(log) + if hh.batch.Count() > MaxBatchCount || hh.batch.Size() > uint64(MaxBatchSize) { + err := hh.batch.PostAndReset() + if err != nil { + fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err) + break + } + // if the batch is sent + // to avoid ticking on an empty batch + hh.batchTicker.Reset(BatchSendFreq) } - requestBody := bytes.NewBuffer(logJson) - _, err = http.Post(hh.Addr, "application/json", requestBody) - if err != nil { - fmt.Fprintf(os.Stderr, "Http Logger Err: %v", err) + case <-hh.batchTicker.C: + if hh.batch.Size() > 0 { + err := hh.batch.PostAndReset() + if err != nil { + fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err) + } } case <-hh.closeChan: + // if there are logs in the buffered batch, send them + if hh.batch.Count() > 0 { + err := hh.batch.PostAndReset() + if err != nil { + fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err) + } + } fmt.Fprintf(os.Stderr, "Closing http logger") return } @@ -109,6 +174,7 @@ func (hh *HttpHook) logHandler() { // Close ends the logHandler goroutine func (hh *HttpHook) Close() { hh.closeChan <- struct{}{} + hh.batchTicker.Stop() // to mark further Close as no-op hh.closeChan = nil } diff --git a/tests/integ/admin_server/logendpoint_test.go b/tests/integ/admin_server/logendpoint_test.go index 7cf70c73..dcfeb472 100644 --- a/tests/integ/admin_server/logendpoint_test.go +++ b/tests/integ/admin_server/logendpoint_test.go @@ -46,12 +46,12 @@ func setupCleanDB(uri string) (*mongo.Client, error) { return client, err } -func submitLog(addr string, log server.Log) error { - logJson, err := json.Marshal(log) +func submitLogs(addr string, logs []server.Log) error { + batchJson, err := json.Marshal(logs) if err != nil { fmt.Fprintf(os.Stderr, "Marshal Err: %v", err) } - requestBody := bytes.NewBuffer(logJson) + requestBody := bytes.NewBuffer(batchJson) _, err = http.Post(addr, "application/json", requestBody) return err @@ -77,34 +77,39 @@ func getAllLogs(t *testing.T, db *mongo.Client) []mongoStorage.Log { return dbLogs } -func TestLogPush(t *testing.T) { +func TestBatchPush(t *testing.T) { var ( logData = "test log push" loglevel = "info" - logDate = time.Now() + date = time.Now() + logsNum = 100 + logs []server.Log ) - log := server.Log{ - LogData: logData, - LogLevel: loglevel, - Date: logDate, + + for i := 0; i < logsNum; i++ { + logs = append(logs, server.Log{ + LogData: fmt.Sprintf("%s %d", logData, i), + LogLevel: loglevel, + Date: date, + }) } db, err := setupCleanDB(*flagMongoEndpoint) if err != nil { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout) - defer cancel() - defer db.Disconnect(ctx) - err = submitLog(*flagAdminEndpoint, log) + err = submitLogs(*flagAdminEndpoint, logs) if err != nil { t.Fatal(err) } + ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout) + defer cancel() + defer db.Disconnect(ctx) + dbLogs := getAllLogs(t, db) - require.Equal(t, 1, len(dbLogs)) - require.Equal(t, logData, dbLogs[0].LogData) - require.Equal(t, loglevel, dbLogs[0].LogLevel) + require.Equal(t, len(logs), len(dbLogs)) + assertEqualResults(t, dbLogs, logs) }