Skip to content

Commit

Permalink
chore(tmc): remove LogSyncer line length limit. (#1187)
Browse files Browse the repository at this point in the history
## Reasons for the changes

The first implementation of the LogSyncer had a line length limit
which amounted to the size of the scanner buffer.

### Description of changes

A custom implementation of `readLines()` was implemented without any
line length limit (other than available memory).

Signed-off-by: Tiago Natel <t.nateldemoura@gmail.com>
  • Loading branch information
i4k-tm committed Oct 20, 2023
2 parents 7104be0 + 7d7462a commit 9d9bf56
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 70 deletions.
134 changes: 79 additions & 55 deletions cloud/log_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package cloud

import (
"bufio"
"bytes"
"io"
"sync"
Expand All @@ -25,7 +24,6 @@ type (
wg sync.WaitGroup
shutdown chan struct{}

maxLineSize int
batchSize int
idleDuration time.Duration
}
Expand All @@ -34,10 +32,6 @@ type (
Syncer func(l DeploymentLogs)
)

// DefaultLogMaxLineSize is the default maximum line.
// TODO(i4k): to be removed.
const DefaultLogMaxLineSize = 4096

// DefaultLogBatchSize is the default batch size.
const DefaultLogBatchSize = 256

Expand All @@ -46,25 +40,20 @@ const DefaultLogIdleDuration = 1 * time.Second

// NewLogSyncer creates a new log syncer.
func NewLogSyncer(syncfn Syncer) *LogSyncer {
return NewLogSyncerWith(syncfn, DefaultLogMaxLineSize, DefaultLogBatchSize, DefaultLogIdleDuration)
return NewLogSyncerWith(syncfn, DefaultLogBatchSize, DefaultLogIdleDuration)
}

// NewLogSyncerWith creates a new customizable syncer.
func NewLogSyncerWith(
syncfn Syncer,
maxLineSize int,
batchSize int,
idleDuration time.Duration,
) *LogSyncer {
if maxLineSize == 0 {
panic("max line size must be set")
}
l := &LogSyncer{
in: make(chan *DeploymentLog, batchSize),
syncfn: syncfn,
shutdown: make(chan struct{}),

maxLineSize: maxLineSize,
batchSize: batchSize,
idleDuration: idleDuration,
}
Expand All @@ -79,34 +68,38 @@ func (s *LogSyncer) NewBuffer(channel LogChannel, out io.Writer) io.Writer {
s.wg.Add(1)
go func() {
defer s.wg.Done()
line := int64(1)
scanner := bufio.NewScanner(r)
maxLineSize := s.maxLineSize
buf := make([]byte, maxLineSize)
scanner.Buffer(buf, maxLineSize)
// no internal allocation
scanner.Split(scanLines)
linenum := int64(1)

var pending []byte
errs := errors.L()
for scanner.Scan() {
message := scanner.Text()
_, err := out.Write([]byte(message))
if err != nil {
errs.Append(errors.E(err, "writing to terminal"))
continue
for {
lines, rest, readErr := readLines(r, pending)
if readErr != nil && readErr != io.EOF {
errs.Append(readErr)
break
}

t := time.Now().UTC()
s.in <- &DeploymentLog{
Channel: channel,
Line: line,
Message: string(dropCRLN([]byte(message))),
Timestamp: &t,
if readErr == io.EOF && len(rest) > 0 {
lines = [][]byte{rest}
}
line++
}
if err := scanner.Err(); err != nil {
errs.Append(errors.E(err, "scanning output lines"))
for _, line := range lines {
_, err := out.Write(line)
if err != nil {
errs.Append(errors.E(err, "writing to terminal"))
}

t := time.Now().UTC()
s.in <- &DeploymentLog{
Channel: channel,
Line: linenum,
Message: string(dropCRLN([]byte(line))),
Timestamp: &t,
}
linenum++
}
if readErr == io.EOF {
break
}
pending = rest
}

errs.Append(r.Close())
Expand Down Expand Up @@ -158,27 +151,58 @@ func (s *LogSyncer) enqueue(l *DeploymentLog) {
s.lastEnqueued = time.Now()
}

// scanLines is a split function for a [bufio.Scanner] that returns each line of
// text. It's similar to [bufio.ScanLines] but do not remove the trailing newline
// marker and optional carriege return. The returned line may be empty.
// The end-of-line marker is one optional carriage return followed
// by one mandatory newline. In regular expression notation, it is `\r?\n`.
// The last non-empty line of input will be returned even if it has no
// newline.
func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0 : i+1], nil
func readLines(r io.Reader, pending []byte) (line [][]byte, rest []byte, err error) {
const readSize = 1024

var buf [readSize]byte
rest = pending
for {
n, err := r.Read(buf[:])
if n > 0 {
rest = append(rest, buf[:n]...)
var lines [][]byte

var nlpos int
for nlpos != -1 {
nlpos = bytes.IndexByte(rest, '\n')
if nlpos >= 0 {
lines = append(lines, rest[:nlpos+1]) // line includes ln
rest = rest[nlpos+1:]
}
}
if len(lines) > 0 {
return lines, rest, err
}
} else if err == nil {
// misbehaving reader
return nil, nil, io.EOF
}
if err != nil {
return nil, rest, err
}

// line ending not found, continue reading.
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}

func readLine(r io.Reader) (line []byte, err error) {
var buf [1]byte
for {
n, err := r.Read(buf[:])
if n > 0 {
b := buf[0]
line = append(line, b)
if b == '\n' {
return line, err
}
} else if err == nil {
// misbehaving reader
return nil, io.EOF
}
if err != nil {
return line, err
}
}
// Request more data.
return 0, nil, nil
}

// dropCRLN drops a terminating \n and \r from the data.
Expand Down
52 changes: 52 additions & 0 deletions cloud/log_syncer_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 Terramate GmbH
// SPDX-License-Identifier: MPL-2.0

package cloud

import (
"bytes"
"strings"
"testing"
)

const nlines = 10000

func BenchmarkCloudReadLines(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
var bufData string
for i := 0; i < nlines; i++ {
bufData += strings.Repeat("A", 80) + "\n"
}
b.StartTimer()
for i := 0; i < b.N; i++ {
buf := bytes.NewBufferString(bufData)
var pending []byte
for {
_, rest, err := readLines(buf, pending[:])
if err != nil {
break
}
pending = rest
}
}
}

func BenchmarkCloudReadLine(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
var bufData string
for i := 0; i < nlines; i++ {
bufData += strings.Repeat("A", 80) + "\n"
}
b.StartTimer()
for i := 0; i < b.N; i++ {
buf := bytes.NewBufferString(bufData)
for {
_, err := readLine(buf)
if err != nil {
break
}
}
}
}
35 changes: 21 additions & 14 deletions cloud/log_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,42 @@ type (
testcase struct {
name string
writes []write
maxLineSize int
batchSize int
idleDuration time.Duration
want want
}
)

func TestCloudLogSyncer(t *testing.T) {
hugeLine := bytes.Repeat([]byte{'A'}, 1*1024*1024) // 1 mib line, no line ending

for _, tc := range []testcase{
{
name: "no output",
},
/*
TODO(i4k): improve scanner to not have a line size limit
{
name: "unlimited line length",
writes: []write{
{
name: "log line bigger than maximum",
maxLineSize: 1,
writes: []write{
channel: cloud.StdoutLogChannel,
data: hugeLine,
},
},
want: want{
output: map[cloud.LogChannel][]byte{
cloud.StdoutLogChannel: hugeLine,
},
batches: []cloud.DeploymentLogs{
{
{
channel: cloud.StdoutLogChannel,
data: []byte("AA"),
Line: 1,
Channel: cloud.StdoutLogChannel,
Message: string(hugeLine),
},
},
want: want{},
},
*/
},
},
{
name: "multiple writes with no newline",
writes: []write{
Expand Down Expand Up @@ -358,7 +368,7 @@ func TestCloudLogSyncer(t *testing.T) {
var gotBatches []cloud.DeploymentLogs
s := cloud.NewLogSyncerWith(func(logs cloud.DeploymentLogs) {
gotBatches = append(gotBatches, logs)
}, tc.maxLineSize, tc.batchSize, tc.idleDuration)
}, tc.batchSize, tc.idleDuration)
var stdoutBuf, stderrBuf bytes.Buffer
stdoutProxy := s.NewBuffer(cloud.StdoutLogChannel, &stdoutBuf)
stderrProxy := s.NewBuffer(cloud.StderrLogChannel, &stderrBuf)
Expand Down Expand Up @@ -411,9 +421,6 @@ func (tc *testcase) validate(t *testing.T) {
if tc.name == "" {
t.Fatalf("testcase without name: %+v", tc)
}
if tc.maxLineSize == 0 {
tc.maxLineSize = cloud.DefaultLogMaxLineSize
}
if tc.batchSize == 0 {
tc.batchSize = cloud.DefaultLogBatchSize
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/terramate/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (c *cli) RunAll(runStacks []ExecContext, isSuccessCode func(exitCode int) b

func (c *cli) syncLogs(logger *zerolog.Logger, runContext ExecContext, logs cloud.DeploymentLogs) {
data, _ := json.Marshal(logs)
logger.Trace().RawJSON("logs", data).Msg("synchronizing logs")
logger.Debug().RawJSON("logs", data).Msg("synchronizing logs")
ctx, cancel := context.WithTimeout(context.Background(), defaultCloudTimeout)
defer cancel()
stackID := c.cloud.run.meta2id[runContext.Stack.ID]
Expand Down

0 comments on commit 9d9bf56

Please sign in to comment.