-
Notifications
You must be signed in to change notification settings - Fork 211
/
runner.go
124 lines (111 loc) · 3.58 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package checkpoint
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spf13/afero"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)
const (
SchemaVersion = "https://spacemesh.io/checkpoint.schema.json.1.0"
CommandString = "grpcurl -plaintext -d '%s' 0.0.0.0:9093 spacemesh.v1.AdminService.CheckpointStream"
checkpointDir = "checkpoint"
schemaFile = "schema.json"
dirPerm = 0o700
)
func checkpointDB(ctx context.Context, db *sql.Database, snapshot types.LayerID, numAtxs int) (*types.Checkpoint, error) {
request, err := json.Marshal(&pb.CheckpointStreamRequest{
SnapshotLayer: uint32(snapshot),
NumAtxs: uint32(numAtxs),
})
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
checkpoint := &types.Checkpoint{
Command: fmt.Sprintf(CommandString, request),
Version: SchemaVersion,
Data: types.InnerData{
CheckpointId: fmt.Sprintf("snapshot-%d", snapshot),
},
}
tx, err := db.Tx(ctx)
if err != nil {
return nil, fmt.Errorf("create db tx: %s", err)
}
defer tx.Release()
atxSnapshot, err := atxs.LatestN(tx, numAtxs)
if err != nil {
return nil, fmt.Errorf("atxs snapshot: %w", err)
}
for i, catx := range atxSnapshot {
commitmentAtx, err := atxs.CommitmentATX(tx, catx.SmesherID)
if err != nil {
return nil, fmt.Errorf("atxs snapshot commitment: %w", err)
}
vrfNonce, err := atxs.VRFNonce(tx, catx.SmesherID, catx.Epoch+1)
if err != nil {
return nil, fmt.Errorf("atxs snapshot nonce: %w", err)
}
copy(atxSnapshot[i].CommitmentATX[:], commitmentAtx[:])
atxSnapshot[i].VRFNonce = vrfNonce
}
for _, catx := range atxSnapshot {
checkpoint.Data.Atxs = append(checkpoint.Data.Atxs, types.AtxSnapshot{
ID: catx.ID.Bytes(),
Epoch: catx.Epoch.Uint32(),
CommitmentAtx: catx.CommitmentATX.Bytes(),
VrfNonce: uint64(catx.VRFNonce),
NumUnits: catx.NumUnits,
BaseTickHeight: catx.BaseTickHeight,
TickCount: catx.TickCount,
PublicKey: catx.SmesherID.Bytes(),
Sequence: catx.Sequence,
Coinbase: catx.Coinbase.Bytes(),
})
}
acctSnapshot, err := accounts.Snapshot(tx, snapshot)
if err != nil {
return nil, fmt.Errorf("accounts snapshot: %w", err)
}
for _, acct := range acctSnapshot {
a := types.AccountSnapshot{
Address: acct.Address.Bytes(),
Balance: acct.Balance,
Nonce: acct.NextNonce,
}
if acct.TemplateAddress != nil {
a.Template = acct.TemplateAddress.Bytes()
}
if acct.State != nil {
a.State = acct.State
}
checkpoint.Data.Accounts = append(checkpoint.Data.Accounts, a)
}
return checkpoint, nil
}
func Generate(ctx context.Context, fs afero.Fs, db *sql.Database, dataDir string, snapshot types.LayerID, numAtxs int) error {
checkpoint, err := checkpointDB(ctx, db, snapshot, numAtxs)
if err != nil {
return err
}
rf, err := NewRecoveryFile(fs, SelfCheckpointFilename(dataDir, snapshot))
if err != nil {
return fmt.Errorf("new recovery file: %w", err)
}
// one writer persist the checkpoint data, one returning result to caller.
if err = json.NewEncoder(rf.fwriter).Encode(checkpoint); err != nil {
return fmt.Errorf("marshal checkpoint json: %w", err)
}
if err = rf.Save(fs); err != nil {
return err
}
return nil
}
func SelfCheckpointFilename(dataDir string, snapshot types.LayerID) string {
return filepath.Join(filepath.Join(dataDir, checkpointDir), fmt.Sprintf("snapshot-%d", snapshot))
}