Skip to content

Commit

Permalink
chore(tmc): sync logs at fixed interval. (#1190)
Browse files Browse the repository at this point in the history
  • Loading branch information
i4k-tm committed Oct 20, 2023
2 parents 9d9bf56 + d48b360 commit c8a60ac
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 145 deletions.
62 changes: 36 additions & 26 deletions cloud/log_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ import (
type (
// LogSyncer is the log syncer controller type.
LogSyncer struct {
pending DeploymentLogs
fds []io.Closer
in chan *DeploymentLog
lastEnqueued time.Time
syncfn Syncer
wg sync.WaitGroup
shutdown chan struct{}
pending DeploymentLogs
fds []io.Closer
in chan *DeploymentLog
syncfn Syncer
wg sync.WaitGroup
shutdown chan struct{}

batchSize int
idleDuration time.Duration
syncInterval time.Duration
}

// Syncer is the actual synchronizer callback.
Expand All @@ -35,27 +34,27 @@ type (
// DefaultLogBatchSize is the default batch size.
const DefaultLogBatchSize = 256

// DefaultLogIdleDuration is the maximum idle duration before a sync could happen.
const DefaultLogIdleDuration = 1 * time.Second
// DefaultLogSyncInterval is the maximum idle duration before a sync could happen.
const DefaultLogSyncInterval = 1 * time.Second

// NewLogSyncer creates a new log syncer.
func NewLogSyncer(syncfn Syncer) *LogSyncer {
return NewLogSyncerWith(syncfn, DefaultLogBatchSize, DefaultLogIdleDuration)
return NewLogSyncerWith(syncfn, DefaultLogBatchSize, DefaultLogSyncInterval)
}

// NewLogSyncerWith creates a new customizable syncer.
func NewLogSyncerWith(
syncfn Syncer,
batchSize int,
idleDuration time.Duration,
syncInterval time.Duration,
) *LogSyncer {
l := &LogSyncer{
in: make(chan *DeploymentLog, batchSize),
syncfn: syncfn,
shutdown: make(chan struct{}),

batchSize: batchSize,
idleDuration: idleDuration,
syncInterval: syncInterval,
}
l.start()
return l
Expand Down Expand Up @@ -127,28 +126,39 @@ func (s *LogSyncer) Wait() {

func (s *LogSyncer) start() {
go func() {
s.lastEnqueued = time.Now()
for e := range s.in {
s.enqueue(e)
}
for len(s.pending) > 0 {
rest := min(s.batchSize, len(s.pending))
s.syncfn(s.pending[:rest])
s.pending = s.pending[rest:]
ticker := time.NewTicker(s.syncInterval)
defer ticker.Stop()

outer:
for {
select {
case e, ok := <-s.in:
if !ok {
break outer
}
s.enqueue(e)
case <-ticker.C:
s.syncAll()
}
}
s.syncAll()
s.shutdown <- struct{}{}
}()
}

func (s *LogSyncer) enqueue(l *DeploymentLog) {
s.pending = append(s.pending, l)
if len(s.pending) >= s.batchSize ||
(len(s.pending) > 0 && time.Since(s.lastEnqueued) > s.idleDuration) {
func (s *LogSyncer) syncAll() {
for len(s.pending) > 0 {
rest := min(s.batchSize, len(s.pending))
s.syncfn(s.pending[:rest])
s.pending = s.pending[rest:]
}
s.lastEnqueued = time.Now()
}

func (s *LogSyncer) enqueue(l *DeploymentLog) {
s.pending = append(s.pending, l)
if len(s.pending) >= s.batchSize {
s.syncAll()
}
}

func readLines(r io.Reader, pending []byte) (line [][]byte, rest []byte, err error) {
Expand Down
14 changes: 7 additions & 7 deletions cloud/log_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type (
name string
writes []write
batchSize int
idleDuration time.Duration
syncInterval time.Duration
want want
}
)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestCloudLogSyncer(t *testing.T) {
{
name: "batch size is respected",
batchSize: 1,
idleDuration: 10 * time.Second, // just to ensure it's not used in slow envs
syncInterval: 10 * time.Second, // just to ensure it's not used in slow envs
writes: []write{
{channel: cloud.StdoutLogChannel, data: []byte("A\n")},
{channel: cloud.StdoutLogChannel, data: []byte("B\nC\n")},
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestCloudLogSyncer(t *testing.T) {
{
name: "if no write happens after configured idle duration then pending data is synced",
batchSize: 6,
idleDuration: 100 * time.Millisecond,
syncInterval: 100 * time.Millisecond,
writes: []write{
{channel: cloud.StdoutLogChannel, data: []byte("first write\n")},
{
Expand All @@ -323,7 +323,7 @@ func TestCloudLogSyncer(t *testing.T) {
cloud.StdoutLogChannel: []byte("first write\nwrite after idle time\nanother\nmultiline\nwrite\nhere"),
},
batches: []cloud.DeploymentLogs{
// first batch is due to idle duration trigger.
// first batch is due to sync interval trigger.
{
{
Line: 1,
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestCloudLogSyncer(t *testing.T) {
var gotBatches []cloud.DeploymentLogs
s := cloud.NewLogSyncerWith(func(logs cloud.DeploymentLogs) {
gotBatches = append(gotBatches, logs)
}, tc.batchSize, tc.idleDuration)
}, tc.batchSize, tc.syncInterval)
var stdoutBuf, stderrBuf bytes.Buffer
stdoutProxy := s.NewBuffer(cloud.StdoutLogChannel, &stdoutBuf)
stderrProxy := s.NewBuffer(cloud.StderrLogChannel, &stderrBuf)
Expand Down Expand Up @@ -424,8 +424,8 @@ func (tc *testcase) validate(t *testing.T) {
if tc.batchSize == 0 {
tc.batchSize = cloud.DefaultLogBatchSize
}
if tc.idleDuration == 0 {
tc.idleDuration = 1 * time.Second
if tc.syncInterval == 0 {
tc.syncInterval = 1 * time.Second
}
}

Expand Down
4 changes: 3 additions & 1 deletion cloud/testserver/cmd/fakecloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
package main

import (
"fmt"
"net/http"

"github.com/terramate-io/terramate/cloud/testserver"
)

func main() {
s := &http.Server{
Addr: "localhost:3001",
Addr: "0.0.0.0:3001",
Handler: testserver.Router(),
}

fmt.Printf("listening at %s\n", s.Addr)
err := s.ListenAndServe()
if err != nil {
panic(err)
Expand Down
111 changes: 3 additions & 108 deletions cloud/testserver/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -248,101 +246,6 @@ func (dhandler *driftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNoContent)
}

func (handler *stackHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
params := httprouter.ParamsFromContext(r.Context())
orguuid := params.ByName("orguuid")
filterStatusStr := r.FormValue("status")
filterStatus := stack.AllFilter

if filterStatusStr != "" && filterStatusStr != "unhealthy" {
w.WriteHeader(http.StatusBadRequest)
return
}

if filterStatusStr == "unhealthy" {
filterStatus = stack.UnhealthyFilter
}

w.Header().Add("Content-Type", "application/json")

if r.Method == "GET" {
var resp cloud.StacksResponse
var stacks []cloud.StackResponse
stacksMap, ok := handler.stacks[orguuid]
if !ok {
w.WriteHeader(http.StatusOK)
data, _ := json.Marshal(resp)
_, _ = w.Write(data)
return
}
for _, st := range stacksMap {
if stack.FilterStatus(st.Status)&filterStatus != 0 {
stacks = append(stacks, st)
}
}

sort.Slice(stacks, func(i, j int) bool {
return stacks[i].ID < stacks[j].ID
})

resp.Stacks = stacks
data, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
writeString(w, "marshaling error")
}
write(w, data)
return
}

if r.Method == "PUT" {
stackIDStr := params.ByName("stackid")
if stackIDStr == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
stackid, err := strconv.Atoi(stackIDStr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
writeErr(w, err)
}
bodyData, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

justClose(r.Body)

var st cloud.StackResponse
err = json.Unmarshal(bodyData, &st)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
writeErr(w, err)
return
}

if stackid != st.ID {
w.WriteHeader(http.StatusBadRequest)
return
}

if _, ok := handler.stacks[orguuid]; !ok {
handler.stacks[orguuid] = make(map[int]cloud.StackResponse)
}
if _, ok := handler.statuses[orguuid]; !ok {
handler.statuses[orguuid] = make(map[int]stack.Status)
}

handler.stacks[orguuid][stackid] = st
handler.statuses[orguuid][stackid] = st.Status
w.WriteHeader(http.StatusNoContent)
return
}

w.WriteHeader(http.StatusMethodNotAllowed)
}

// Router returns the default fake cloud router.
func Router() *httprouter.Router {
return RouterWith(EnableAllConfig())
Expand All @@ -365,6 +268,9 @@ func RouterAdd(router *httprouter.Router, enabled map[string]bool) {
if enabled[cloud.StacksPath] {
stackHandler := newStackEndpoint()
router.Handler("GET", cloud.StacksPath+"/:orguuid", stackHandler)
router.Handler("POST", cloud.StacksPath+"/:orguuid/:stackid/deployments/:deployment_uuid/logs", stackHandler)
router.Handler("GET", cloud.StacksPath+"/:orguuid/:stackid/deployments/:deployment_uuid/logs", stackHandler)
router.Handler("GET", cloud.StacksPath+"/:orguuid/:stackid/deployments/:deployment_uuid/logs/events", stackHandler)

// not a real TMC handler, only used by tests to populate the stacks state.
router.Handler("PUT", cloud.StacksPath+"/:orguuid/:stackid", stackHandler)
Expand Down Expand Up @@ -416,10 +322,6 @@ func EnableAllConfig() map[string]bool {
type (
userHandler struct{}
membershipHandler struct{}
stackHandler struct {
stacks map[string]map[int]cloud.StackResponse
statuses map[string]map[int]stack.Status
}
deploymentHandler struct {
nextStackID int64
// as hacky as it can get:
Expand All @@ -443,13 +345,6 @@ func newUserEndpoint() *userHandler {
return &userHandler{}
}

func newStackEndpoint() *stackHandler {
return &stackHandler{
stacks: make(map[string]map[int]cloud.StackResponse),
statuses: make(map[string]map[int]stack.Status),
}
}

func newDeploymentEndpoint() *deploymentHandler {
return &deploymentHandler{
deployments: make(map[string]map[string]map[int64]cloud.DeploymentStackRequest),
Expand Down

0 comments on commit c8a60ac

Please sign in to comment.