/
navigation-sync.go
120 lines (98 loc) 路 2.85 KB
/
navigation-sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package nav
import (
"context"
"fmt"
"reflect"
"time"
"github.com/snivilised/lorax/boost"
)
type NavigationSync interface {
Run(callback sessionCallback, nc syncable, args ...any) (*TraverseResult, error)
}
type baseSync struct {
}
func (s *baseSync) extract(args ...any) (bool, context.Context, context.CancelFunc) {
// NB: extracting a context does not necessarily mean we want to run accelerated. Down the line,
// it may be required that even for inline scenario's, we allow the client to pass in
// a context for cancellation reasons.
//
var (
ctx context.Context
cancel context.CancelFunc
extracted = len(args) > 0
)
for _, a := range args {
switch argument := a.(type) {
case context.Context:
ctx = argument
case context.CancelFunc:
cancel = argument
default:
// TODO: convert to i18n error
//
panic(
fmt.Errorf("extract found invalid type found in 'Run' arguments (val: '%v', type: '%v')",
a,
reflect.TypeOf(a).String(),
),
)
}
}
return extracted, ctx, cancel
}
type inlineSync struct {
}
func (s *inlineSync) Run(callback sessionCallback, _ syncable, _ ...any) (*TraverseResult, error) {
return callback()
}
type acceleratedSync struct {
baseSync
ai *AsyncInfo
noWorkers int
outputChOut boost.JobOutputStream[TraverseOutput]
outputChTimeout time.Duration
pool *boost.WorkerPool[TraverseItemInput, TraverseOutput]
}
func (s *acceleratedSync) Run(callback sessionCallback, nc syncable, args ...any) (*TraverseResult, error) {
defer s.finish(s.ai)
extracted, ctx, cancel := s.extract(args...)
if !extracted {
// TODO: convert to i18n error
//
panic("failed to extract context")
}
nc.ensync(ctx, cancel, s.ai)
s.start(ctx, cancel)
return callback()
}
func (s *acceleratedSync) start(ctx context.Context, cancel context.CancelFunc) {
s.pool = boost.NewWorkerPool[TraverseItemInput, TraverseOutput](
&boost.NewWorkerPoolParams[TraverseItemInput, TraverseOutput]{
NoWorkers: s.noWorkers,
OutputChTimeout: s.outputChTimeout,
Exec: workerExecutive,
JobsCh: s.ai.JobsChanOut,
WaitAQ: s.ai.WaitAQ,
})
// We are handing over ownership of this channel (ai.OutputsChIn) to the pool as
// its go routine will write to it, knows when no more data is available
// and thus knows when to close it.
//
s.ai.WaitAQ.Add(1, s.pool.RoutineName)
go s.pool.Start(ctx, cancel, s.outputChOut)
}
func (s *acceleratedSync) finish(
ai *AsyncInfo,
) {
close(ai.JobsChanOut) // 鈿狅笍 fastward: intermittent panic on close
ai.WaitAQ.Done(ai.NavigatorRoutineName)
}
func workerExecutive(job boost.Job[TraverseItemInput]) (boost.JobOutput[TraverseOutput], error) {
err := job.Input.Fn(job.Input.Item)
return boost.JobOutput[TraverseOutput]{
Payload: TraverseOutput{
Item: job.Input.Item,
Error: err,
},
}, err
}