-
Notifications
You must be signed in to change notification settings - Fork 4
/
smreader.go
118 lines (97 loc) · 3.17 KB
/
smreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package datareader
import (
"github.com/insolar/assured-ledger/ledger-core/conveyor/smachine"
"github.com/insolar/assured-ledger/ledger-core/ledger/jet"
"github.com/insolar/assured-ledger/ledger-core/ledger/server/dataextractor"
"github.com/insolar/assured-ledger/ledger-core/ledger/server/readersvc"
"github.com/insolar/assured-ledger/ledger-core/vanilla/injector"
"github.com/insolar/assured-ledger/ledger-core/vanilla/throw"
)
func NewReader(cfg dataextractor.Config) smachine.SubroutineStateMachine {
cfg.Ensure()
return &SMLineReader{cfg: cfg}
}
var _ smachine.SubroutineStateMachine = &SMLineReader{}
type SMLineReader struct {
smachine.StateMachineDeclTemplate
// input
cfg dataextractor.Config
// inject
readAdapter readersvc.Adapter
// runtime - batched read
ready bool
// cabinet readersvc.Cabinet
}
func (p *SMLineReader) GetStateMachineDeclaration() smachine.StateMachineDeclaration {
return p
}
func (p *SMLineReader) InjectDependencies(_ smachine.StateMachine, _ smachine.SlotLink, injector injector.DependencyInjector) {
// injector.MustInject(&p.pulseSlot)
injector.MustInject(&p.readAdapter)
// injector.MustInject(&p.reader)
}
func (p *SMLineReader) GetInitStateFor(smachine.StateMachine) smachine.InitFunc {
return p.stepInit
}
func (p *SMLineReader) GetSubroutineInitState(smachine.SubroutineStartContext) smachine.InitFunc {
return p.stepInit
}
func (p *SMLineReader) stepInit(ctx smachine.InitializationContext) smachine.StateUpdate {
if p.readAdapter.ServiceNeedsBatching() {
p.readAdapter = readersvc.Adapter{}
return ctx.Jump(p.stepBatchRead)
}
return ctx.Jump(p.stepDirectRead)
}
func (p *SMLineReader) stepDirectRead(ctx smachine.ExecutionContext) smachine.StateUpdate {
cfg := p.cfg
return p.readAdapter.PrepareAsync(ctx, func(svc readersvc.Service) smachine.AsyncResultFunc {
cab, err := svc.FindCabinet(cfg.Target)
switch {
case err != nil:
panic(err)
case cab != nil:
if err = cab.Open(); err != nil {
panic(err)
}
}
err = readDataFromCabinet(svc, cab, cfg, 0)
return func(ctx smachine.AsyncResultContext) {
if err != nil {
ctx.Log().Error("read failed", err)
panic(err)
}
p.ready = true
}
}).DelayedStart().Sleep().ThenJump(p.stepDone)
}
func readDataFromCabinet(svc readersvc.Service, cab readersvc.Cabinet, cfg dataextractor.Config, id jet.DropID) (err error) {
defer func() {
switch closeErr := cab.Close(); {
case closeErr == nil:
case err == nil:
err = closeErr
default:
err = throw.WithDetails(closeErr, err)
}
err = throw.RW(recover(), err, "readDataFromCabinet")
}()
if cfg.Selector.Direction.IsToPast() {
return svc.ReadFromCabinet(cab, id, prevReader{ cfg }.ReadData)
}
return svc.ReadFromCabinet(cab, id, nextReader{ cfg }.ReadData)
}
func (p *SMLineReader) stepBatchRead(ctx smachine.ExecutionContext) smachine.StateUpdate {
// ctx.SetFinalizer(func(ctx smachine.FinalizationContext) {
// if p.cabinet != nil {
// if err := p.cabinet.Close(); err != nil {
// ctx.Log().Error("reader close", err)
// }
// }
// })
//
panic(throw.NotImplemented()) // TODO
}
func (p *SMLineReader) stepDone(ctx smachine.ExecutionContext) smachine.StateUpdate {
return ctx.Stop()
}