Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
- Use jj instead of git
- Use jj instead of git. Check with `ls .jj/` — if no `.jj` directory exists, fall back to git.

- **Branching:** `jj describe -m "type(scope): message"` on `@`, then `jj bookmark create <name>` and `jj git push -b <name>`.
- **Before push:** `jj git fetch && jj rebase -b @ -o main`. Resolve conflicts locally, verify with tests, then push.
- **Conflict resolution:** `jj resolve --list` to find conflicts, edit files to remove markers (`<<<<<<<` / `>>>>>>>`), then squash resolution with `jj squash` (no `--interactive` flag). No detached HEAD or rebase-in-progress state to manage.
- **Undo:** `jj op undo` reverts any operation. Safe to experiment.
239 changes: 239 additions & 0 deletions app/jobs/taskjob/result_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package taskjob

import (
"bytes"
"context"
"errors"
"hostlink/app/services/localtaskstore"
"hostlink/app/services/taskreporter"
"hostlink/domain/task"
"io"
"sync"
"testing"
"time"
)

func TestTaskJobStreamsOutputAndFinalOverResultChannel(t *testing.T) {
fetcher := &fakeTaskFetcher{tasks: []task.Task{{
ID: "task-1",
ExecutionAttemptID: "attempt-1",
Command: "printf 'out\\n'; printf 'err\\n' >&2",
Status: "pending",
}}}
reporter := &fakeTaskReporter{}
channel := &fakeResultChannel{}
job := NewJobWithConf(TaskJobConfig{Trigger: runOnceTrigger})

job.processTask(context.Background(), fetcher.tasks[0], reporter, channel)

if len(channel.outputs) != 2 {
t.Fatalf("outputs len = %d, want 2", len(channel.outputs))
}
stdout := outputByStream(channel.outputs, "stdout")
stderr := outputByStream(channel.outputs, "stderr")
if stdout == nil || stdout.Sequence != 1 || stdout.Payload != "out\n" {
t.Fatalf("stdout chunk = %#v", stdout)
}
if stderr == nil || stderr.Sequence != 1 || stderr.Payload != "err\n" {
t.Fatalf("stderr chunk = %#v", stderr)
}
if len(channel.finals) != 1 {
t.Fatalf("finals len = %d, want 1", len(channel.finals))
}
if channel.finals[0].TaskID != "task-1" || channel.finals[0].ExecutionAttemptID != "attempt-1" || channel.finals[0].Status != "completed" {
t.Fatalf("final = %#v", channel.finals[0])
}
if len(reporter.results) != 0 {
t.Fatalf("http reports len = %d, want 0 when result channel succeeds", len(reporter.results))
}
}

func TestTaskJobFallsBackToHTTPReporterWhenResultChannelDisabled(t *testing.T) {
fetcher := &fakeTaskFetcher{tasks: []task.Task{{ID: "task-1", Command: "printf 'out'", Status: "pending"}}}
reporter := &fakeTaskReporter{}
job := NewJobWithConf(TaskJobConfig{Trigger: runOnceTrigger})

job.processTask(context.Background(), fetcher.tasks[0], reporter, nil)

if len(reporter.results) != 1 {
t.Fatalf("http reports len = %d, want 1", len(reporter.results))
}
if reporter.results[0].Output != "out" {
t.Fatalf("output = %q, want out", reporter.results[0].Output)
}
}

func TestTaskJobFallsBackToHTTPReporterWhenFinalPersistenceFails(t *testing.T) {
fetcher := &fakeTaskFetcher{tasks: []task.Task{{
ID: "task-1",
ExecutionAttemptID: "attempt-1",
Command: "printf 'out'",
Status: "pending",
}}}
reporter := &fakeTaskReporter{}
channel := &fakeResultChannel{finalErr: errors.New("store down")}
job := NewJobWithConf(TaskJobConfig{Trigger: runOnceTrigger})

job.processTask(context.Background(), fetcher.tasks[0], reporter, channel)

if len(reporter.results) != 1 {
t.Fatalf("http reports len = %d, want 1", len(reporter.results))
}
if len(channel.finals) != 1 {
t.Fatalf("finals len = %d, want 1", len(channel.finals))
}
}

func TestCaptureStreamFlushesOnByteThreshold(t *testing.T) {
reader, writer := io.Pipe()
channel := &fakeResultChannel{}
job := NewJobWithConf(TaskJobConfig{
OutputFlushInterval: time.Hour,
OutputFlushThreshold: 4,
})
done := make(chan struct{})

go func() {
var sink bytes.Buffer
job.captureStream(context.Background(), task.Task{ID: "task-1", ExecutionAttemptID: "attempt-1"}, "stdout", reader, &sink, channel)
close(done)
}()

_, _ = writer.Write([]byte("ab"))
if len(channel.outputs) != 0 {
t.Fatalf("outputs len = %d, want no flush before threshold", len(channel.outputs))
}
_, _ = writer.Write([]byte("cd"))
waitForOutputs(t, channel, 1)
_ = writer.Close()
<-done

if channel.outputs[0].Payload != "abcd" {
t.Fatalf("payload = %q, want abcd", channel.outputs[0].Payload)
}
}

func TestCaptureStreamFlushesOnInterval(t *testing.T) {
reader, writer := io.Pipe()
channel := &fakeResultChannel{}
job := NewJobWithConf(TaskJobConfig{
OutputFlushInterval: 10 * time.Millisecond,
OutputFlushThreshold: 1024,
})
done := make(chan struct{})

go func() {
var sink bytes.Buffer
job.captureStream(context.Background(), task.Task{ID: "task-1", ExecutionAttemptID: "attempt-1"}, "stdout", reader, &sink, channel)
close(done)
}()

_, _ = writer.Write([]byte("slow"))
waitForOutputs(t, channel, 1)
_ = writer.Close()
<-done

if channel.outputs[0].Payload != "slow" {
t.Fatalf("payload = %q, want slow", channel.outputs[0].Payload)
}
}

func TestCaptureStreamRetainsChunkWhenPersistFails(t *testing.T) {
reader, writer := io.Pipe()
channel := &fakeResultChannel{outputErrs: []error{errors.New("store down"), nil}}
job := NewJobWithConf(TaskJobConfig{
OutputFlushInterval: 10 * time.Millisecond,
OutputFlushThreshold: 1024,
})
done := make(chan struct{})

go func() {
var sink bytes.Buffer
job.captureStream(context.Background(), task.Task{ID: "task-1", ExecutionAttemptID: "attempt-1"}, "stdout", reader, &sink, channel)
close(done)
}()

_, _ = writer.Write([]byte("retry"))
waitForOutputs(t, channel, 2)
_ = writer.Close()
<-done

if channel.outputs[1].Payload != "retry" || channel.outputs[1].Sequence != 1 {
t.Fatalf("retry output = %#v", channel.outputs[1])
}
}

func runOnceTrigger(ctx context.Context, fn func() error) {
_ = fn()
}

type fakeTaskFetcher struct {
tasks []task.Task
}

func (f *fakeTaskFetcher) Fetch() ([]task.Task, error) {
return f.tasks, nil
}

type fakeTaskReporter struct {
mu sync.Mutex
results []*taskreporter.TaskResult
}

func (f *fakeTaskReporter) Report(taskID string, result *taskreporter.TaskResult) error {
f.mu.Lock()
defer f.mu.Unlock()
f.results = append(f.results, result)
return nil
}

type fakeResultChannel struct {
mu sync.Mutex
outputs []localtaskstore.OutputChunk
finals []localtaskstore.FinalResult
outputErrs []error
finalErr error
}

func (f *fakeResultChannel) SendOutput(ctx context.Context, chunk localtaskstore.OutputChunk) error {
f.mu.Lock()
defer f.mu.Unlock()
f.outputs = append(f.outputs, chunk)
if len(f.outputErrs) > 0 {
err := f.outputErrs[0]
f.outputErrs = f.outputErrs[1:]
return err
}
return nil
}

func (f *fakeResultChannel) SendFinal(ctx context.Context, result localtaskstore.FinalResult) error {
f.mu.Lock()
defer f.mu.Unlock()
f.finals = append(f.finals, result)
return f.finalErr
}

func waitForOutputs(t *testing.T, channel *fakeResultChannel, count int) {
t.Helper()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
channel.mu.Lock()
current := len(channel.outputs)
channel.mu.Unlock()
if current >= count {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("timed out waiting for %d outputs", count)
}

func outputByStream(outputs []localtaskstore.OutputChunk, stream string) *localtaskstore.OutputChunk {
for i := range outputs {
if outputs[i].Stream == stream {
return &outputs[i]
}
}
return nil
}
Loading
Loading