Skip to content

Commit

Permalink
fix(worker,cdn): logs order
Browse files Browse the repository at this point in the history
 * read stdout and stderr in the same goroutine to avoid mixing logs
 * store the number of millisecond for each line of logs as score in redis and retrieve it in "Number" field from a log line in the UI

Signed-off-by: francois  samin <francois.samin@corp.ovh.com>
  • Loading branch information
fsamin committed Sep 17, 2021
1 parent 6b133b8 commit 573a711
Show file tree
Hide file tree
Showing 19 changed files with 110 additions and 135 deletions.
4 changes: 2 additions & 2 deletions engine/cdn/cdn_item.go
Expand Up @@ -262,7 +262,7 @@ func (s *Service) getItemLogValue(ctx context.Context, t sdk.CDNItemType, apiRef
return nil, 0, nil, "", err
}

rc, err := s.Units.LogsBuffer().NewAdvancedReader(ctx, *itemUnit, opts.format, opts.from, opts.size, opts.sort)
rc, err := s.Units.LogsBuffer().NewAdvancedReader(ctx, *itemUnit, opts.format, float64(opts.from), float64(opts.size), opts.sort)
if err != nil {
return nil, 0, nil, "", err
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *Service) getItemLogValue(ctx context.Context, t sdk.CDNItemType, apiRef
}

log.Debug(ctx, "getItemLogValue> Getting logs from cache")
return it, int64(linesCount), s.LogCache.NewReader(it.ID, opts.format, opts.from, opts.size, opts.sort), filename, nil
return it, int64(linesCount), s.LogCache.NewReader(it.ID, opts.format, float64(opts.from), float64(opts.size), opts.sort), filename, nil
}

func (s *Service) pushItemLogIntoCache(ctx context.Context, it sdk.CDNItem, unitName string) error {
Expand Down
30 changes: 15 additions & 15 deletions engine/cdn/cdn_item_test.go
Expand Up @@ -242,7 +242,7 @@ func TestGetItemValue_ThousandLines(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 1000; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand All @@ -267,9 +267,9 @@ func TestGetItemValue_ThousandLines(t *testing.T) {
var lines []redis.Line
require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Len(t, lines, 200)
require.Equal(t, int64(773), lines[0].Number)
require.Equal(t, float64(773), lines[0].Number)
require.Equal(t, "Line 773\n", lines[0].Value)
require.Equal(t, int64(972), lines[199].Number)
require.Equal(t, float64(972), lines[199].Number)
require.Equal(t, "Line 972\n", lines[199].Value)

_, _, rc, _, err = s.getItemLogValue(context.Background(), sdk.CDNTypeItemStepLog, apiRefhash, getItemLogOptions{
Expand All @@ -284,9 +284,9 @@ func TestGetItemValue_ThousandLines(t *testing.T) {

require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Len(t, lines, 227)
require.Equal(t, int64(773), lines[0].Number)
require.Equal(t, float64(773), lines[0].Number)
require.Equal(t, "Line 773\n", lines[0].Value)
require.Equal(t, int64(999), lines[226].Number)
require.Equal(t, float64(999), lines[226].Number)
require.Equal(t, "Line 999\n", lines[226].Value)
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func TestGetItemValue_Reverse(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 5; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand All @@ -373,9 +373,9 @@ func TestGetItemValue_Reverse(t *testing.T) {
var lines []redis.Line
require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Len(t, lines, 5)
require.Equal(t, int64(4), lines[0].Number)
require.Equal(t, float64(4), lines[0].Number)
require.Equal(t, "Line 4\n", lines[0].Value)
require.Equal(t, int64(0), lines[4].Number)
require.Equal(t, float64(0), lines[4].Number)
require.Equal(t, "Line 0\n", lines[4].Value)

// Get From Buffer
Expand All @@ -393,9 +393,9 @@ func TestGetItemValue_Reverse(t *testing.T) {

require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Len(t, lines, 2)
require.Equal(t, int64(2), lines[0].Number)
require.Equal(t, float64(2), lines[0].Number)
require.Equal(t, "Line 2\n", lines[0].Value)
require.Equal(t, int64(1), lines[1].Number)
require.Equal(t, float64(1), lines[1].Number)
require.Equal(t, "Line 1\n", lines[1].Value)
}

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 1000; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand All @@ -486,9 +486,9 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
var lines []redis.Line
require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Len(t, lines, 200)
require.Equal(t, int64(226), lines[0].Number)
require.Equal(t, float64(226), lines[0].Number)
require.Equal(t, "Line 226\n", lines[0].Value)
require.Equal(t, int64(27), lines[199].Number)
require.Equal(t, float64(27), lines[199].Number)
require.Equal(t, "Line 27\n", lines[199].Value)

_, _, rc, _, err = s.getItemLogValue(context.Background(), sdk.CDNTypeItemStepLog, apiRefhash, getItemLogOptions{
Expand All @@ -504,8 +504,8 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {

require.NoError(t, json.Unmarshal(buf.Bytes(), &lines), "given json should be valid")
require.Equal(t, len(lines), 227)
require.Equal(t, int64(226), lines[0].Number)
require.Equal(t, float64(226), lines[0].Number)
require.Equal(t, "Line 226\n", lines[0].Value)
require.Equal(t, int64(0), lines[226].Number)
require.Equal(t, float64(0), lines[226].Number)
require.Equal(t, "Line 0\n", lines[226].Value)
}
19 changes: 18 additions & 1 deletion engine/cdn/cdn_log_store.go
Expand Up @@ -2,6 +2,8 @@ package cdn

import (
"context"
"fmt"
"strconv"
"time"

"github.com/rockbears/log"
Expand Down Expand Up @@ -49,6 +51,9 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa
return err
}

var t0 = it.Created.UnixNano() / 1000000 // convert to ms
var t1 = signature.Timestamp / 1000000

ctx = context.WithValue(ctx, storage.FieldAPIRef, it.APIRefHash)

iu, err := s.loadOrCreateItemUnitBuffer(ctx, it.ID, itemType)
Expand All @@ -67,7 +72,18 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa
if err != nil {
return err
}
if err := bufferUnit.Add(*iu, uint(countLine), content); err != nil {

// Add the number of millisecond since creation
ms := t1 - t0
if ms < 0 {
ms = 0
}
score, err := strconv.ParseFloat(fmt.Sprintf("%d.%d", countLine, ms), 64)
if err != nil {
log.ErrorWithStackTrace(ctx, err)
}

if err := bufferUnit.Add(*iu, score, content); err != nil {
return err
}

Expand Down Expand Up @@ -119,6 +135,7 @@ func (s *Service) loadOrCreateItem(ctx context.Context, itemType sdk.CDNItemType
Type: itemType,
APIRefHash: hashRef,
Status: sdk.CDNStatusItemIncoming,
Created: time.Unix(0, signature.Timestamp),
}

tx, err := s.mustDBWithCtx(ctx).Begin()
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/cdn_log_store_test.go
Expand Up @@ -103,7 +103,7 @@ func TestStoreNewStepLog(t *testing.T) {
require.NoError(t, err)

require.NoError(t, err)
require.Equal(t, "[EMERGENCY] this is a message\n", buf.String())
require.Equal(t, "this is a message\n", buf.String())
}

func TestStoreLastStepLog(t *testing.T) {
Expand Down Expand Up @@ -269,5 +269,5 @@ func TestStoreNewServiceLog(t *testing.T) {
require.NoError(t, err)

require.NoError(t, err)
require.Equal(t, "[EMERGENCY] this is a message\n", buf.String())
require.Equal(t, "this is a message\n", buf.String())
}
26 changes: 1 addition & 25 deletions engine/cdn/cdn_log_tcp.go
Expand Up @@ -220,33 +220,9 @@ func buildMessage(hm handledMessage) string {
if !strings.HasSuffix(val, "\n") {
val += "\n"
}
return fmt.Sprintf("[%s] %s", getLevelString(hm.Msg.Level), val)
return fmt.Sprintf("%s", val)
}

func getLevelString(level int32) string {
var lvl string
switch level {
case int32(hook.LOG_DEBUG):
lvl = "DEBUG"
case int32(hook.LOG_INFO):
lvl = "INFO"
case int32(hook.LOG_NOTICE):
lvl = "NOTICE"
case int32(hook.LOG_WARNING):
lvl = "WARN"
case int32(hook.LOG_ERR):
lvl = "ERROR"
case int32(hook.LOG_CRIT):
lvl = "CRITICAL"
case int32(hook.LOG_ALERT):
lvl = "ALERT"
case int32(hook.LOG_EMERG):
lvl = "EMERGENCY"
}
return lvl
}

//
func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
var signature cdn.Signature
var pk *rsa.PublicKey
Expand Down
9 changes: 5 additions & 4 deletions engine/cdn/cdn_log_tcp_test.go
Expand Up @@ -3,15 +3,16 @@ package cdn
import (
"bytes"
"context"
"io"
"strconv"
"testing"

"github.com/mitchellh/hashstructure"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/sdk/cdn"
"github.com/ovh/cds/sdk/log/hook"
"io"
"strconv"
"testing"

"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/engine/test"
Expand Down Expand Up @@ -128,6 +129,6 @@ func TestStoreTruncatedLogs(t *testing.T) {
_, err = io.Copy(buf, rc)
require.NoError(t, err)

require.Equal(t, "[EMERGENCY] Bim bam boum\n...truncated\n", buf.String())
require.Equal(t, "Bim bam boum\n...truncated\n", buf.String())
require.Equal(t, int64(2), lineCount)
}
4 changes: 3 additions & 1 deletion engine/cdn/item/dao.go
Expand Up @@ -115,7 +115,9 @@ func LoadAndLockByID(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.Sq
// Insert in database.
func Insert(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.SqlExecutorWithTx, i *sdk.CDNItem) error {
i.ID = sdk.UUID()
i.Created = time.Now()
if i.Created.IsZero() {
i.Created = time.Now()
}
i.LastModified = time.Now()

cdnItem := toItemDB(*i)
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item_handler_test.go
Expand Up @@ -185,7 +185,7 @@ func TestGetItemLogsDownloadHandler(t *testing.T) {
s.Router.Mux.ServeHTTP(rec, req)
require.Equal(t, 200, rec.Code)

assert.Equal(t, "[EMERGENCY] this is a message\n", string(rec.Body.Bytes()))
assert.Equal(t, "this is a message\n", string(rec.Body.Bytes()))

uri = s.Router.GetRoute("GET", s.getItemHandler, map[string]string{
"type": string(sdk.CDNTypeItemStepLog),
Expand Down
3 changes: 2 additions & 1 deletion engine/cdn/item_logs_handler.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"time"

Expand Down Expand Up @@ -154,7 +155,7 @@ func (s *Service) sendLogsToWSClient(ctx context.Context, wsClient websocket.Cli
}

// If all the lines were sent, we can trigger another update, if only one line was send do not trigger an update wait for next event from broker
if len(lines) > 1 && (oldNextLineToSend > 0 || wsClientData.scoreNextLineToSend-oldNextLineToSend == int64(len(lines))) {
if len(lines) > 1 && (oldNextLineToSend > 0 || int(math.Round(wsClientData.scoreNextLineToSend-oldNextLineToSend)) == len(lines)) {
wsClientData.TriggerUpdate()
}

Expand Down
24 changes: 12 additions & 12 deletions engine/cdn/item_logs_handler_test.go
Expand Up @@ -280,8 +280,8 @@ func TestGetItemLogsLinesHandler(t *testing.T) {
var lines []redis.Line
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &lines))
require.Len(t, lines, 1)
require.Equal(t, int64(0), lines[0].Number)
require.Equal(t, "[EMERGENCY] this is a message\n", lines[0].Value)
require.Equal(t, float64(0), lines[0].Number)
require.Equal(t, "this is a message\n", lines[0].Value)

}

Expand Down Expand Up @@ -409,10 +409,10 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 10)
require.Equal(t, "[EMERGENCY] message 0\n", lines[0].Value)
require.Equal(t, int64(0), lines[0].Number)
require.Equal(t, "[EMERGENCY] message 9\n", lines[9].Value)
require.Equal(t, int64(9), lines[9].Number)
require.Equal(t, "message 0\n", lines[0].Value)
require.Equal(t, float64(0), lines[0].Number)
require.Equal(t, "message 9\n", lines[9].Value)
require.Equal(t, float64(9), lines[9].Number)

// Send some messages
for i := 0; i < 10; i++ {
Expand All @@ -434,8 +434,8 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 20)
require.Equal(t, "[EMERGENCY] message 19\n", lines[19].Value)
require.Equal(t, int64(19), lines[19].Number)
require.Equal(t, "message 19\n", lines[19].Value)
require.Equal(t, float64(19), lines[19].Number)

// Try another connection with offset
ctx, cancel = context.WithTimeout(context.TODO(), time.Second*10)
Expand Down Expand Up @@ -467,8 +467,8 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 5)
require.Equal(t, "[EMERGENCY] message 15\n", lines[0].Value)
require.Equal(t, int64(15), lines[0].Number)
require.Equal(t, "[EMERGENCY] message 19\n", lines[4].Value)
require.Equal(t, int64(19), lines[4].Number)
require.Equal(t, "message 15\n", lines[0].Value)
require.Equal(t, float64(15), lines[0].Number)
require.Equal(t, "message 19\n", lines[4].Value)
require.Equal(t, float64(19), lines[4].Number)
}
2 changes: 1 addition & 1 deletion engine/cdn/lru/redis.go
Expand Up @@ -132,7 +132,7 @@ func (r *Redis) NewWriter(itemID string) io.WriteCloser {
}

// NewReader instanciates a new reader
func (r *Redis) NewReader(itemID string, format sdk.CDNReaderFormat, from int64, size uint, sort int64) io.ReadCloser {
func (r *Redis) NewReader(itemID string, format sdk.CDNReaderFormat, from float64, size float64, sort int64) io.ReadCloser {
return &redis.Reader{
Store: r.store,
ItemID: itemID,
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/redis/line.go
Expand Up @@ -8,8 +8,8 @@ import (
)

type Line struct {
Number int64 `json:"number"`
Value string `json:"value"`
Number float64 `json:"number"`
Value string `json:"value"`
}

func (l Line) Format(f sdk.CDNReaderFormat) ([]byte, error) {
Expand Down

0 comments on commit 573a711

Please sign in to comment.