Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions cmds/admin_server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,28 @@ func (r *RouteHandler) status(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "live"})
}

// addLog inserts a new log entry inside the database
func (r *RouteHandler) addLog(c *gin.Context) {

var log Log
if err := c.Bind(&log); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted log"})
// addLogs inserts log's batches into the database
func (r *RouteHandler) addLogs(c *gin.Context) {
var logs []Log
if err := c.Bind(&logs); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"status": "err", "msg": "badly formatted logs"})
r.log.Errorf("Err while binding request body %v", err)
return
}

storageLogs := make([]storage.Log, 0, len(logs))
for _, log := range logs {
storageLogs = append(storageLogs, log.ToStorageLog())
}

ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
defer cancel()
ctx = ctx.WithLogger(r.log)
err := r.storage.StoreLog(ctx, log.ToStorageLog())
err := r.storage.StoreLogs(ctx, storageLogs)
if err != nil {
switch {
case errors.Is(err, storage.ErrInsert):
c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the log"})
c.JSON(http.StatusInternalServerError, gin.H{"status": "err", "msg": "error while storing the batch"})
case errors.Is(err, storage.ErrReadOnlyStorage):
c.JSON(http.StatusNotImplemented, gin.H{"status": "err", "msg": "not supported action"})
default:
Expand Down Expand Up @@ -168,7 +172,7 @@ func initRouter(ctx xcontext.Context, rh RouteHandler, middlewares []gin.Handler
}

r.GET("/status", rh.status)
r.POST("/log", rh.addLog)
r.POST("/log", rh.addLogs)
r.GET("/log", rh.getLogs)

// serve the frontend app
Expand Down
12 changes: 7 additions & 5 deletions cmds/admin_server/storage/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ func toMongoQuery(query storage.Query) bson.D {
return q
}

func (s *MongoStorage) StoreLog(ctx xcontext.Context, log storage.Log) error {
mongoLog := toMongoLog(&log)
_, err := s.collection.InsertOne(ctx, mongoLog)
func (s *MongoStorage) StoreLogs(ctx xcontext.Context, logs []storage.Log) error {
var mongoLogs []interface{}
for _, log := range logs {
mongoLogs = append(mongoLogs, toMongoLog(&log))
}
_, err := s.collection.InsertMany(ctx, mongoLogs)
if err != nil {
// for better debugging
ctx.Errorf("Error while inserting into the db: %v", err)
ctx.Errorf("Error while inserting a batch of logs: %v", err)
return storage.ErrInsert
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmds/admin_server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
)

type Storage interface {
StoreLog(ctx xcontext.Context, entry Log) error
StoreLogs(ctx xcontext.Context, logs []Log) error
GetLogs(ctx xcontext.Context, query Query) (*Result, error)

Close(ctx xcontext.Context) error
Expand Down
88 changes: 77 additions & 11 deletions pkg/loggerhook/httphook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,59 @@ import (

var (
DefaultBufferSize = 10
MaxBatchSize = 500000 // size in bytes
MaxBatchCount = 100
BatchSendFreq = 1 * time.Second
DefaultLogTimeout = 1 * time.Second
)

// Batch defines a log batch that handles the size in bytes of the logs
type Batch struct {
addr string
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isnt realy a property of the batch, but i can ignore this; you shouldve had it as a param in send

logs []server.Log
size uint64
}

func NewBatch(addr string) Batch {
return Batch{
addr: addr,
}
}

func (b *Batch) Add(log server.Log) {
b.logs = append(b.logs, log)
b.size += uint64(len(log.LogData))
}

func (b *Batch) Size() uint64 {
return b.size
}

func (b *Batch) Count() int {
return len(b.logs)
}

// PostAndReset makes a post request sending hh.batch and reseting the batch
func (b *Batch) PostAndReset() error {
logJson, err := json.Marshal(b.logs)
if err != nil {
return fmt.Errorf("Marshal Err: %v", err)
}
requestBody := bytes.NewBuffer(logJson)
_, err = http.Post(b.addr, "application/json", requestBody)
if err != nil {
return fmt.Errorf("Http Logger Err: %v", err)
}

b.logs = nil
b.size = 0
return nil
}

type HttpHook struct {
Addr string
batch Batch
batchTicker *time.Ticker

logChan chan server.Log
closeChan chan struct{}
}
Expand All @@ -36,9 +84,10 @@ func NewHttpHook(addr string) (*HttpHook, error) {
url.Path = path.Join(url.Path, "log")

hh := HttpHook{
Addr: url.String(),
logChan: make(chan server.Log, DefaultBufferSize),
closeChan: make(chan struct{}),
batch: NewBatch(url.String()),
batchTicker: time.NewTicker(BatchSendFreq),
logChan: make(chan server.Log, DefaultBufferSize),
closeChan: make(chan struct{}),
}

go hh.logHandler()
Expand Down Expand Up @@ -90,16 +139,32 @@ func (hh *HttpHook) logHandler() {
for {
select {
case log := <-hh.logChan:
logJson, err := json.Marshal(log)
if err != nil {
fmt.Fprintf(os.Stderr, "Marshal Err: %v", err)
hh.batch.Add(log)
if hh.batch.Count() > MaxBatchCount || hh.batch.Size() > uint64(MaxBatchSize) {
err := hh.batch.PostAndReset()
if err != nil {
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
break
Comment thread
mimir-d marked this conversation as resolved.
}
// if the batch is sent
// to avoid ticking on an empty batch
hh.batchTicker.Reset(BatchSendFreq)
}
requestBody := bytes.NewBuffer(logJson)
_, err = http.Post(hh.Addr, "application/json", requestBody)
if err != nil {
fmt.Fprintf(os.Stderr, "Http Logger Err: %v", err)
case <-hh.batchTicker.C:
if hh.batch.Size() > 0 {
err := hh.batch.PostAndReset()
if err != nil {
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
}
Comment thread
mimir-d marked this conversation as resolved.
}
case <-hh.closeChan:
// if there are logs in the buffered batch, send them
if hh.batch.Count() > 0 {
err := hh.batch.PostAndReset()
if err != nil {
fmt.Fprintf(os.Stderr, "Send Batch failed: %v", err)
}
}
fmt.Fprintf(os.Stderr, "Closing http logger")
Comment thread
mimir-d marked this conversation as resolved.
return
}
Expand All @@ -109,6 +174,7 @@ func (hh *HttpHook) logHandler() {
// Close ends the logHandler goroutine
func (hh *HttpHook) Close() {
hh.closeChan <- struct{}{}
hh.batchTicker.Stop()
// to mark further Close as no-op
hh.closeChan = nil
}
37 changes: 21 additions & 16 deletions tests/integ/admin_server/logendpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func setupCleanDB(uri string) (*mongo.Client, error) {
return client, err
}

func submitLog(addr string, log server.Log) error {
logJson, err := json.Marshal(log)
func submitLogs(addr string, logs []server.Log) error {
batchJson, err := json.Marshal(logs)
if err != nil {
fmt.Fprintf(os.Stderr, "Marshal Err: %v", err)
}
requestBody := bytes.NewBuffer(logJson)
requestBody := bytes.NewBuffer(batchJson)
_, err = http.Post(addr, "application/json", requestBody)

return err
Expand All @@ -77,34 +77,39 @@ func getAllLogs(t *testing.T, db *mongo.Client) []mongoStorage.Log {
return dbLogs
}

func TestLogPush(t *testing.T) {
func TestBatchPush(t *testing.T) {
var (
logData = "test log push"
loglevel = "info"
logDate = time.Now()
date = time.Now()
logsNum = 100
logs []server.Log
)
log := server.Log{
LogData: logData,
LogLevel: loglevel,
Date: logDate,

for i := 0; i < logsNum; i++ {
logs = append(logs, server.Log{
LogData: fmt.Sprintf("%s %d", logData, i),
LogLevel: loglevel,
Date: date,
})
}

db, err := setupCleanDB(*flagMongoEndpoint)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout)
defer cancel()
defer db.Disconnect(ctx)

err = submitLog(*flagAdminEndpoint, log)
err = submitLogs(*flagAdminEndpoint, logs)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), *flagOperationTimeout)
defer cancel()
defer db.Disconnect(ctx)

dbLogs := getAllLogs(t, db)

require.Equal(t, 1, len(dbLogs))
require.Equal(t, logData, dbLogs[0].LogData)
require.Equal(t, loglevel, dbLogs[0].LogLevel)
require.Equal(t, len(logs), len(dbLogs))
assertEqualResults(t, dbLogs, logs)
}