-
Notifications
You must be signed in to change notification settings - Fork 0
/
navigator-abstract.go
126 lines (106 loc) 路 2.56 KB
/
navigator-abstract.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package nav
import (
"context"
"fmt"
"io/fs"
"log/slog"
"strings"
"github.com/google/uuid"
)
type navigator struct {
o *TraverseOptions
agent *navigationAgent
samplingActive bool
filteringActive bool
samplingFilterActive bool
samplingCtrl *samplingController
}
func (n *navigator) options() *TraverseOptions {
return n.o
}
func (n *navigator) init(ns *NavigationState) {
if n.samplingActive {
adapters := createSamplingAdapters()
n.samplingCtrl = &samplingController{
o: n.o,
fn: getSamplerControllerFunc(n.o),
adapters: adapters,
}
samplingType := n.o.Store.Sampling.SampleType
if (samplingType == SampleTypeFilterEn) || (samplingType == SampleTypeCustomEn) {
n.samplingCtrl.init(ns)
}
}
}
func (n *navigator) ensync(
ctx context.Context,
_ context.CancelFunc, // we don't need this here; only the worker pool needs it!
frame *navigationFrame,
ai *AsyncInfo,
) {
decorated := frame.client
decorator := &LabelledTraverseCallback{
Label: "boost decorator",
Fn: func(item *TraverseItem) error {
defer func() {
if pe := recover(); pe != nil {
if err, ok := pe.(error); ok || strings.Contains(err.Error(),
"send on closed channel") {
n.logger().Error("鈽狅笍鈽狅笍鈽狅笍 send on closed channel",
slog.String("item-path", item.Path),
)
} else {
panic(pe)
}
}
}()
var err error
select {
case <-ctx.Done():
err = fs.SkipDir
default:
job := TraverseItemJob{
ID: fmt.Sprintf("JOB-ID:%v", uuid.NewString()),
Input: TraverseItemInput{
Item: item,
Fn: decorated.Fn,
Label: decorated.Label,
},
SequenceNo: -999,
}
select {
case <-ctx.Done():
err = fs.SkipDir
case ai.JobsChanOut <- job:
//
// intermittent panic: send on closed channel, in fastward resume scenarios
// 'gr:observable-navigator'
}
}
return err
},
}
frame.decorate("boost decorator", decorator)
}
func (n *navigator) logger() *slog.Logger {
return n.o.Monitor.Log
}
func (n *navigator) descend(navi *NavigationInfo) bool {
if !navi.frame.periscope.descend(n.o.Store.Behaviours.Cascade.Depth) {
return false
}
navi.frame.notifiers.descend.invoke(navi.Item)
return true
}
func (n *navigator) ascend(navi *NavigationInfo, permit bool) {
if permit {
navi.frame.periscope.ascend()
navi.frame.notifiers.ascend.invoke(navi.Item)
}
}
func (n *navigator) finish() error {
return nil
}
func (n *navigator) keep(stash *inspection) {
n.agent.keep(stash)
}