/
pendingresearcharchivist.go
121 lines (105 loc) · 3.49 KB
/
pendingresearcharchivist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package archivists
import (
"context"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/jecolasurdo/tbtlarchivist/pkg/accessors/datastore"
"github.com/jecolasurdo/tbtlarchivist/pkg/accessors/messagebus"
"github.com/jecolasurdo/tbtlarchivist/pkg/contracts"
"github.com/jecolasurdo/tbtlarchivist/pkg/utils"
"google.golang.org/protobuf/proto"
)
const (
episodeLeaseDuration = 2 * time.Hour
lowerPacingBound = 2000.0
upperPacingBound = 5000.0
pacingBasis = time.Millisecond
clipLimit = 100
)
// A PendingResearchArchivist determines if any research work should be done,
// and, if so, produces a pending-work-item for a downstream researcher to act
// upon.
type PendingResearchArchivist struct {
Errors <-chan error
Done <-chan struct{}
}
// StartPendingResearchArchivist starts the archivist, which attempts to create
// pending work-items for downstream researchers to consume.
//
// An archivist's host should expect the archivist to exit when the archivist
// has determined that no overhead is available to queue more work. It is the
// host's responsibility to initialize the archivist periodically to check if
// work needs to be queued. This can be done via an automated cron job or
// other scheduler as desired.
func StartPendingResearchArchivist(ctx context.Context, messageBus messagebus.Sender, db datastore.DataStorer) *PendingResearchArchivist {
errorSource := make(chan error)
done := make(chan struct{})
go func() {
defer close(errorSource)
defer close(done)
pace := utils.SetUniformPace(lowerPacingBound, upperPacingBound, pacingBasis)
for {
if utils.ContextIsDone(ctx) {
break
}
pace.Wait()
queueInfo, err := messageBus.Inspect()
if err != nil {
errorSource <- fmt.Errorf("error while inspecting queue: %v", err)
return
}
if !(queueInfo.Consumers == 0 && queueInfo.Messages == 0) {
overhead := queueInfo.Consumers - queueInfo.Messages
if overhead <= 0 {
log.Println("The pending work queue is at capacity. No need to assign anything.")
break
}
}
episode, err := db.GetHighestPriorityEpisode()
if err != nil {
errorSource <- fmt.Errorf("error occured finding highest priority episode, %v", err)
return
}
if episode == nil {
log.Println("No episodes available to assign for research.")
return
}
clips, err := db.GetHighestPriorityClipsForEpisode(episode, clipLimit)
if err != nil {
errorSource <- fmt.Errorf("error retrieving clips for episode: %v\n%v", err, episode)
return
}
if len(clips) == 0 {
log.Println("No clips available to assign for research for this episode.")
return
}
leaseID := uuid.New()
err = db.CreateResearchLease(&leaseID, episode, clips, time.Now().Add(episodeLeaseDuration).UTC())
if err != nil {
errorSource <- fmt.Errorf("error creating lease: %v\n%v", err, episode)
return
}
pendingResearchItem := &contracts.PendingResearchItem{
LeaseId: leaseID.String(),
Episode: episode,
Clips: clips,
}
messageBytes, err := proto.Marshal(pendingResearchItem)
if err != nil {
errorSource <- fmt.Errorf("error marshalling pendingResearchItem to protobuf. %v %v", pendingResearchItem, err)
return
}
err = messageBus.Send(messageBytes)
if err != nil {
errorSource <- fmt.Errorf("error while trying to push a pendingResearchItem to the message bus. %v %v", pendingResearchItem, err)
return
}
}
}()
return &PendingResearchArchivist{
Errors: errorSource,
Done: done,
}
}