Skip to content

Commit

Permalink
fix(worker,cdn): logs order (#5939)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Sep 23, 2021
1 parent ccf9ff9 commit 947426a
Show file tree
Hide file tree
Showing 35 changed files with 151 additions and 103 deletions.
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-archive/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-clair/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v0.11.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-download/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-group-tmpl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-kafka-publish/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-marathon/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-npm-audit-parser/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-ssh-cmd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-tmpl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
1 change: 1 addition & 0 deletions contrib/grpcplugins/action/plugin-venom/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197 h1:qu90yDtRE5WEfRT5mn9v0Xz9RaopLguhbPwZKx4dHq8=
github.com/sguiheux/go-coverage v0.0.0-20190710153556-287b082a7197/go.mod h1:0hhKrsUsoT7yvxwNGKa+TSYNA26DNWMqReeZEQq/9FI=
github.com/shirou/gopsutil v0.0.0-20170406131756-e49a95f3d5f8/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
6 changes: 3 additions & 3 deletions engine/cdn/cdn_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestGetItemValue_ThousandLines(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 1000; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestGetItemValue_Reverse(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 5; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
UnitID: s.Units.LogsBuffer().ID(),
}
for i := 0; i < 1000; i++ {
require.NoError(t, s.Units.LogsBuffer().Add(iu, uint(i), fmt.Sprintf("Line %d\n", i)))
require.NoError(t, s.Units.LogsBuffer().Add(iu, float64(i), fmt.Sprintf("Line %d\n", i)))
}

require.NoError(t, s.completeItem(context.TODO(), db, iu))
Expand Down
20 changes: 19 additions & 1 deletion engine/cdn/cdn_log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/rockbears/log"
"github.com/shopspring/decimal"

"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
Expand Down Expand Up @@ -49,6 +50,9 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa
return err
}

var t0 = it.Created.UnixNano() / 1000000 // convert to ms
var t1 = signature.Timestamp / 1000000

ctx = context.WithValue(ctx, storage.FieldAPIRef, it.APIRefHash)

iu, err := s.loadOrCreateItemUnitBuffer(ctx, it.ID, itemType)
Expand All @@ -67,7 +71,20 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa
if err != nil {
return err
}
if err := bufferUnit.Add(*iu, uint(countLine), content); err != nil {

// Add the number of millisecond since creation
ms := t1 - t0
if ms < 0 {
ms = 0
}

// Build the score from the "countLine" as the interger part and "ms" as floating part
scoreD := decimal.NewFromInt(ms)
scoreD = scoreD.Shift(-sdk.DigitsCount(ms)) // bit shift to make 1234 become 0.1234
scoreD = scoreD.Add(decimal.NewFromInt(int64(countLine)))
score, _ := scoreD.Float64()

if err := bufferUnit.Add(*iu, score, content); err != nil {
return err
}

Expand Down Expand Up @@ -119,6 +136,7 @@ func (s *Service) loadOrCreateItem(ctx context.Context, itemType sdk.CDNItemType
Type: itemType,
APIRefHash: hashRef,
Status: sdk.CDNStatusItemIncoming,
Created: time.Unix(0, signature.Timestamp),
}

tx, err := s.mustDBWithCtx(ctx).Begin()
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/cdn_log_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestStoreNewStepLog(t *testing.T) {
require.NoError(t, err)

require.NoError(t, err)
require.Equal(t, "[EMERGENCY] this is a message\n", buf.String())
require.Equal(t, "this is a message\n", buf.String())
}

func TestStoreLastStepLog(t *testing.T) {
Expand Down Expand Up @@ -269,5 +269,5 @@ func TestStoreNewServiceLog(t *testing.T) {
require.NoError(t, err)

require.NoError(t, err)
require.Equal(t, "[EMERGENCY] this is a message\n", buf.String())
require.Equal(t, "this is a message\n", buf.String())
}
26 changes: 1 addition & 25 deletions engine/cdn/cdn_log_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,33 +220,9 @@ func buildMessage(hm handledMessage) string {
if !strings.HasSuffix(val, "\n") {
val += "\n"
}
return fmt.Sprintf("[%s] %s", getLevelString(hm.Msg.Level), val)
return fmt.Sprintf("%s", val)
}

func getLevelString(level int32) string {
var lvl string
switch level {
case int32(hook.LOG_DEBUG):
lvl = "DEBUG"
case int32(hook.LOG_INFO):
lvl = "INFO"
case int32(hook.LOG_NOTICE):
lvl = "NOTICE"
case int32(hook.LOG_WARNING):
lvl = "WARN"
case int32(hook.LOG_ERR):
lvl = "ERROR"
case int32(hook.LOG_CRIT):
lvl = "CRITICAL"
case int32(hook.LOG_ALERT):
lvl = "ALERT"
case int32(hook.LOG_EMERG):
lvl = "EMERGENCY"
}
return lvl
}

//
func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
var signature cdn.Signature
var pk *rsa.PublicKey
Expand Down
9 changes: 5 additions & 4 deletions engine/cdn/cdn_log_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package cdn
import (
"bytes"
"context"
"io"
"strconv"
"testing"

"github.com/mitchellh/hashstructure"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/sdk/cdn"
"github.com/ovh/cds/sdk/log/hook"
"io"
"strconv"
"testing"

"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/engine/test"
Expand Down Expand Up @@ -128,6 +129,6 @@ func TestStoreTruncatedLogs(t *testing.T) {
_, err = io.Copy(buf, rc)
require.NoError(t, err)

require.Equal(t, "[EMERGENCY] Bim bam boum\n...truncated\n", buf.String())
require.Equal(t, "Bim bam boum\n...truncated\n", buf.String())
require.Equal(t, int64(2), lineCount)
}
4 changes: 3 additions & 1 deletion engine/cdn/item/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func LoadAndLockByID(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.Sq
// Insert in database.
func Insert(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.SqlExecutorWithTx, i *sdk.CDNItem) error {
i.ID = sdk.UUID()
i.Created = time.Now()
if i.Created.IsZero() {
i.Created = time.Now()
}
i.LastModified = time.Now()

cdnItem := toItemDB(*i)
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestGetItemLogsDownloadHandler(t *testing.T) {
s.Router.Mux.ServeHTTP(rec, req)
require.Equal(t, 200, rec.Code)

assert.Equal(t, "[EMERGENCY] this is a message\n", string(rec.Body.Bytes()))
assert.Equal(t, "this is a message\n", string(rec.Body.Bytes()))

uri = s.Router.GetRoute("GET", s.getItemHandler, map[string]string{
"type": string(sdk.CDNTypeItemStepLog),
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/item_logs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *Service) sendLogsToWSClient(ctx context.Context, wsClient websocket.Cli
}
var lines []redis.Line
if err := sdk.JSONUnmarshal(buf.Bytes(), &lines); err != nil {
return sdk.WrapError(err, "cannot unmarshal lines from buffer %v", string(buf.Bytes()))
return sdk.WrapError(err, "cannot unmarshal lines from buffer %v", buf.String())
}

log.Debug(ctx, "getItemLogsStreamHandler> iterate over %d lines to send for client %s", len(lines), wsClient.UUID())
Expand All @@ -154,7 +154,7 @@ func (s *Service) sendLogsToWSClient(ctx context.Context, wsClient websocket.Cli
}

// If all the lines were sent, we can trigger another update, if only one line was send do not trigger an update wait for next event from broker
if len(lines) > 1 && (oldNextLineToSend > 0 || wsClientData.scoreNextLineToSend-oldNextLineToSend == int64(len(lines))) {
if len(lines) > 1 && (oldNextLineToSend > 0 || int(wsClientData.scoreNextLineToSend-oldNextLineToSend) == len(lines)) {
wsClientData.TriggerUpdate()
}

Expand Down
13 changes: 6 additions & 7 deletions engine/cdn/item_logs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ func TestGetItemLogsLinesHandler(t *testing.T) {
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &lines))
require.Len(t, lines, 1)
require.Equal(t, int64(0), lines[0].Number)
require.Equal(t, "[EMERGENCY] this is a message\n", lines[0].Value)

require.Equal(t, "this is a message\n", lines[0].Value)
}

func TestGetItemLogsStreamHandler(t *testing.T) {
Expand Down Expand Up @@ -409,9 +408,9 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 10)
require.Equal(t, "[EMERGENCY] message 0\n", lines[0].Value)
require.Equal(t, "message 0\n", lines[0].Value)
require.Equal(t, int64(0), lines[0].Number)
require.Equal(t, "[EMERGENCY] message 9\n", lines[9].Value)
require.Equal(t, "message 9\n", lines[9].Value)
require.Equal(t, int64(9), lines[9].Number)

// Send some messages
Expand All @@ -434,7 +433,7 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 20)
require.Equal(t, "[EMERGENCY] message 19\n", lines[19].Value)
require.Equal(t, "message 19\n", lines[19].Value)
require.Equal(t, int64(19), lines[19].Number)

// Try another connection with offset
Expand Down Expand Up @@ -467,8 +466,8 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
}

require.Len(t, lines, 5)
require.Equal(t, "[EMERGENCY] message 15\n", lines[0].Value)
require.Equal(t, "message 15\n", lines[0].Value)
require.Equal(t, int64(15), lines[0].Number)
require.Equal(t, "[EMERGENCY] message 19\n", lines[4].Value)
require.Equal(t, "message 19\n", lines[4].Value)
require.Equal(t, int64(19), lines[4].Number)
}
1 change: 1 addition & 0 deletions engine/cdn/redis/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Line struct {
Number int64 `json:"number"`
Value string `json:"value"`
Since int64 `json:"since,omitempty"`
}

func (l Line) Format(f sdk.CDNReaderFormat) ([]byte, error) {
Expand Down
22 changes: 14 additions & 8 deletions engine/cdn/redis/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"unicode"

"github.com/shopspring/decimal"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/sdk"
)
Expand All @@ -33,7 +35,11 @@ func (r *Reader) get(from uint, to uint) ([]Line, error) {
}
ls := make([]Line, len(res))
for i := range res {
ls[i].Number = int64(res[i].Score)
scoreD := decimal.NewFromFloat(res[i].Score)
ls[i].Number = scoreD.IntPart()
floatD := scoreD.Sub(decimal.NewFromInt(ls[i].Number))
ls[i].Since = floatD.Coefficient().Int64()

var value string
if err := sdk.JSONUnmarshal(res[i].Value, &value); err != nil {
return nil, sdk.WrapError(err, "cannot unmarshal line value from store")
Expand Down Expand Up @@ -71,7 +77,7 @@ func (r *Reader) loadMoreLines() error {
}
}

maxLinesToRead := uint(lineCount) - uint(r.From)
maxLinesToRead := lineCount - uint(r.From)
if r.Size == 0 || r.Size > maxLinesToRead {
r.Size = maxLinesToRead
}
Expand Down Expand Up @@ -100,7 +106,7 @@ func (r *Reader) loadMoreLines() error {

// Read 100 lines if possible or only the missing lines if less than 100
alreadyReadLinesLength := r.nextIndex - uint(r.From)
linesLeftToRead := uint(r.Size) - alreadyReadLinesLength
linesLeftToRead := r.Size - alreadyReadLinesLength
if linesLeftToRead == 0 {
if !r.readEOF {
r.readEOF = true
Expand All @@ -121,15 +127,15 @@ func (r *Reader) loadMoreLines() error {
from = r.nextIndex
to = newNextIndex - 1
} else {
if uint(lineCount) < newNextIndex {
if lineCount < newNextIndex {
from = 0
} else {
from = uint(lineCount) - newNextIndex
from = lineCount - newNextIndex
}
if uint(lineCount) < r.nextIndex {
to = uint(lineCount) - 1
if lineCount < r.nextIndex {
to = lineCount - 1
} else {
to = uint(lineCount) - (r.nextIndex + 1)
to = lineCount - (r.nextIndex + 1)
}
}
lines, err := r.get(from, to)
Expand Down
Loading

0 comments on commit 947426a

Please sign in to comment.