/
pkg.go
103 lines (90 loc) · 2.72 KB
/
pkg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package researcher
import (
"context"
"log"
"runtime"
"github.com/jecolasurdo/tbtlarchivist/go/internal/accessors/analyst"
"github.com/jecolasurdo/tbtlarchivist/go/internal/accessors/messagebus"
"github.com/jecolasurdo/tbtlarchivist/go/internal/contracts"
"github.com/jecolasurdo/tbtlarchivist/go/internal/utils"
"google.golang.org/protobuf/proto"
)
// A ResearchAgent is responsible for gathering a pending work item from the
// pending work queue, spawning an Analyst sub-process, communicating with that
// process, and reporting completed research results back to the completed work
// queue.
type ResearchAgent struct {
Errors <-chan error
Done <-chan struct{}
}
// StartResearchAgent initializes a research agent. The agent will attempt to
// consume a pending work item from the pending work queue. If there is no work
// found on the queue, the agent will exit. Otherwise, the agent will spawn an
// Analyst process, and assign the work to that process. As the Analyst
// completes its work, it is reported back to the Agent, who then forwards the
// results to the completed work queue.
func StartResearchAgent(ctx context.Context, pendingResearchQueue messagebus.Receiver, completedWorkQueue messagebus.Sender, analyzer analyst.Analyzer) *ResearchAgent {
utils.PanicIfNil(pendingResearchQueue, completedWorkQueue, analyzer)
errorSource := make(chan error)
done := make(chan struct{})
go func() {
defer close(errorSource)
defer close(done)
msg, err := pendingResearchQueue.Receive()
if err != nil {
errorSource <- err
return
}
if msg == nil || len(msg.Body) == 0 {
log.Println("No pending-research to do.")
return
}
pendingResearchItem := new(contracts.PendingResearchItem)
err = proto.Unmarshal(msg.Body, pendingResearchItem)
if err != nil {
errorSource <- err
err := msg.Acknowledger.Nack(true)
if err != nil {
errorSource <- err
}
return
}
err = msg.Acknowledger.Ack()
if err != nil {
errorSource <- err
return
}
analyzer.Run(ctx, pendingResearchItem)
completedWorkSrcOpen, analystErrorSrcOpen := true, true
for completedWorkSrcOpen || analystErrorSrcOpen {
select {
case completedWorkItem, open := <-analyzer.CompletedWorkItems():
if !open {
completedWorkSrcOpen = false
break
}
cwiBytes, err := proto.Marshal(completedWorkItem)
if err != nil {
errorSource <- err
break
}
err = completedWorkQueue.Send(cwiBytes)
if err != nil {
errorSource <- err
}
case analystErr, open := <-analyzer.Errors():
if !open {
analystErrorSrcOpen = false
break
}
errorSource <- analystErr
default:
runtime.Gosched()
}
}
}()
return &ResearchAgent{
Errors: errorSource,
Done: done,
}
}