/
file_workload.go
86 lines (69 loc) · 1.78 KB
/
file_workload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package consim
import (
"bufio"
"encoding/json"
"fmt"
"math/rand"
"os"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
genesis "github.com/oasisprotocol/oasis-core/go/genesis/api"
)
const (
cfgFileTxs = "consim.workload.file.txs"
fileWorkloadName = "file"
)
var fileTxsFlag = flag.NewFlagSet("", flag.ContinueOnError)
type fileWorkload struct {
ch chan []BlockTx
dec *json.Decoder
f *os.File
}
func (w *fileWorkload) Init(doc *genesis.Document) error {
return nil
}
func (w *fileWorkload) Start(doc *genesis.Document, cancelCh <-chan struct{}, errCh chan<- error) (<-chan []BlockTx, error) {
if _, err := w.dec.Token(); err != nil {
return nil, fmt.Errorf("consim/workload/file: failed to find opening delimiter: %w", err)
}
w.ch = make(chan []BlockTx)
go func() {
defer close(w.ch)
for w.dec.More() {
var txVec []BlockTx
if err := w.dec.Decode(&txVec); err != nil {
errCh <- fmt.Errorf("consim/workload/file: failed to decode block tx: %w", err)
return
}
select {
case <-cancelCh:
return
case w.ch <- txVec:
}
}
}()
return w.ch, nil
}
func (w *fileWorkload) Finalize(*genesis.Document) error {
if _, err := w.dec.Token(); err != nil {
return fmt.Errorf("consim/workload/file: failed to find closing delimiter: %w", err)
}
return nil
}
func (w *fileWorkload) Cleanup() {
_ = w.f.Close()
}
func newFileWorkload(rng *rand.Rand) (Workload, error) {
f, err := os.Open(viper.GetString(cfgFileTxs))
if err != nil {
return nil, fmt.Errorf("consim/workload/file: failed to open transaction file: %w", err)
}
return &fileWorkload{
dec: json.NewDecoder(bufio.NewReader(f)),
f: f,
}, nil
}
func init() {
fileTxsFlag.String(cfgFileTxs, "transactions.json", "path to transactions document")
_ = viper.BindPFlags(fileTxsFlag)
}