/
file.go
157 lines (131 loc) · 3.39 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package replaylog
import (
"fmt"
"io"
"log"
"os"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/dbin"
tea "github.com/charmbracelet/bubbletea"
"google.golang.org/protobuf/proto"
anypb "google.golang.org/protobuf/types/known/anypb"
"github.com/streamingfast/substreams/tui2/stream"
)
type File struct {
writer *dbin.Writer
path string
}
const ReplayContentType = "rpl-v1"
const ReplayFilename = "replay.log"
type Option func(*File)
func WithPath(path string) Option {
return func(f *File) {
f.path = path
}
}
func New(opts ...Option) *File {
f := &File{
path: ReplayFilename,
}
for _, opt := range opts {
opt(f)
}
return f
}
func (f *File) IsWriting() bool {
return f.writer != nil
}
func (f *File) OpenForWriting() error {
fl, err := os.OpenFile(ReplayFilename, os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return fmt.Errorf("open replay file for writing: %w", err)
}
f.writer = dbin.NewWriter(fl)
if err := f.writer.WriteHeader(ReplayContentType); err != nil {
return fmt.Errorf("write replay header: %w", err)
}
return nil
}
func (f *File) ReadReplay() (out stream.ReplayBundle, err error) {
fl, err := os.OpenFile(ReplayFilename, os.O_RDONLY, 0640)
if err != nil {
return nil, fmt.Errorf("read replay file: %w", err)
}
defer fl.Close()
reader := dbin.NewReader(fl)
header, err := reader.ReadHeader()
if err != nil {
return nil, fmt.Errorf("reading replay log header: %w", err)
}
if header.ContentType != ReplayContentType {
return nil, fmt.Errorf("invalid replay file content type %q", header.ContentType)
}
for {
anyBytes, err := reader.ReadMessage()
if err != nil {
if err == io.EOF {
break
}
if err == io.ErrUnexpectedEOF {
log.Println("Unexpected EOF reading replay file, ignoring it.")
break
}
return nil, fmt.Errorf("reading replay file: %w", err)
}
newAny := &anypb.Any{}
if err := proto.Unmarshal(anyBytes, newAny); err != nil {
return nil, fmt.Errorf("reading any from replay file: %w", err)
}
newVal, err := anypb.UnmarshalNew(newAny, proto.UnmarshalOptions{})
if err != nil {
return nil, fmt.Errorf("unmarshal any from replay file: %w", err)
}
out = append(out, mapTypeToUpdateMsg(newVal))
}
out = append(out, stream.ReplayedMsg)
return
}
func mapTypeToUpdateMsg(in any) any {
switch m := in.(type) {
case *pbsubstreamsrpc.Request,
*pbsubstreamsrpc.BlockScopedData,
*pbsubstreamsrpc.ModulesProgress,
*pbsubstreamsrpc.InitialSnapshotData,
*pbsubstreamsrpc.InitialSnapshotComplete,
*pbsubstreamsrpc.SessionInit:
return m
}
panic("unsupported payload")
}
func (f *File) Push(msg tea.Msg) error {
if f.writer == nil {
return nil
}
switch msg.(type) {
case *pbsubstreamsrpc.Request,
*pbsubstreamsrpc.BlockScopedData,
*pbsubstreamsrpc.ModulesProgress,
*pbsubstreamsrpc.InitialSnapshotData,
*pbsubstreamsrpc.InitialSnapshotComplete,
*pbsubstreamsrpc.SessionInit:
anyMsg, err := anypb.New(msg.(proto.Message))
if err != nil {
return fmt.Errorf("encoding any: %w", err)
}
_ = anyMsg
cnt, err := proto.Marshal(anyMsg)
if err != nil {
return fmt.Errorf("proto marshal replay msg: %w", err)
}
if err := f.writer.WriteMessage(cnt); err != nil {
return fmt.Errorf("write replay message: %w", err)
}
}
return nil
}
func (f *File) Close() error {
if f.writer != nil {
return f.writer.Close()
}
return nil
}