This repository has been archived by the owner on Jan 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
shard_block_proposing.go
136 lines (127 loc) · 4.5 KB
/
shard_block_proposing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package eth2node
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/protolambda/zrnt/eth2/beacon"
"github.com/protolambda/ztyp/codec"
"go.uber.org/zap"
"io"
"math/rand"
"time"
)
func (n *Eth2Node) scheduleShardProposalsMaybe(slot Slot) {
proposers := n.computeShardProposers(slot)
for shard, proposer := range proposers {
if _, ok := n.localValidators[proposer]; ok {
n.log.With("proposer", proposer, "slot", slot, "shard", shard).Info("proposing shard block")
go func() {
if err := n.executeShardBlockProposal(slot, Shard(shard), proposer); err != nil {
n.log.With(zap.Error(err)).Errorf("proposer %d error for slot %d", proposer, slot)
}
}()
}
}
}
// proposers, 1 per shard, for all shards
func (n *Eth2Node) computeShardProposers(slot Slot) []ValidatorIndex {
out := make([]ValidatorIndex, n.conf.SHARD_COUNT, n.conf.SHARD_COUNT)
h := sha256.New()
for shard := Shard(0); shard < Shard(n.conf.SHARD_COUNT); shard++ {
// Mock seed, since there's no beacon chain. Just decide on a proposer by using the time and shard as seed.
h.Reset()
binary.Write(h, binary.LittleEndian, uint64(slot))
binary.Write(h, binary.LittleEndian, uint64(shard))
var seed beacon.Root
copy(seed[:], h.Sum(nil))
// Take the committee, shuffle lookup for 0, to get any random committee member as proposer.
// Not the actual proposer logic, but good enough for testing
committeee := n.shard2Vals[shard]
proposerCommIndex := beacon.PermuteIndex(n.conf.SHUFFLE_ROUND_COUNT, 0, uint64(len(committeee)), seed)
proposer := committeee[proposerCommIndex]
out[shard] = proposer
}
return out
}
func (n *Eth2Node) executeShardBlockProposal(slot Slot, shard Shard, proposer ValidatorIndex) error {
// create shard block, with mock data
dataSize := n.conf.MAX_DATA_SIZE
data := make([]byte, dataSize, dataSize)
seed := int64(uint64(slot)*n.conf.SHARD_COUNT + uint64(shard))
rng := rand.New(rand.NewSource(seed))
if _, err := io.ReadFull(rng, data); err != nil {
panic(fmt.Errorf("failed to create random mock data: %v", err))
}
block := SignedShardBlock{
Message: ShardBlock{
ShardParentRoot: Root{}, // TODO
BeaconParentRoot: Root{}, // TODO
Slot: slot,
Shard: shard,
ProposerIndex: proposer,
Body: ShardBlockData(data),
},
Signature: BLSSignature{}, // TODO
}
header := SignedShardBlockHeader{
Message: ShardBlockHeader{
ShardParentRoot: Root{}, // TODO
BeaconParentRoot: Root{}, // TODO
Slot: slot,
Shard: shard,
ProposerIndex: proposer,
BodyRoot: Root{}, // TODO
},
Signature: BLSSignature{}, // TODO
}
// try publishing everything for the extension of 2/3 of a slot. Give up afterwards.
slotDuration := time.Second * time.Duration(n.conf.SECONDS_PER_SLOT)
ctx, _ := context.WithTimeout(n.subProcesses.ctx, 2*slotDuration/3)
// Publish header to global net
{
var buf bytes.Buffer
if err := header.Serialize(codec.NewEncodingWriter(&buf)); err != nil {
return errors.Wrap(err, "proposer failed to encode header")
}
if err := n.shardHeaders.Publish(ctx, buf.Bytes()); err != nil {
return errors.Wrap(err, "proposer failed to publish header")
}
}
// Publish block to horizontal net
{
var buf bytes.Buffer
if err := block.Serialize(codec.NewEncodingWriter(&buf)); err != nil {
return errors.Wrap(err, "proposer failed to encode block")
}
if err := n.horizontalSubnets[shard].Publish(ctx, buf.Bytes()); err != nil {
return errors.Wrap(err, "proposer failed to publish block to horizontal net")
}
}
// Publish samples to vertical nets
{
samples, err := n.conf.MakeSamples(block.Message.Body)
if err != nil {
return errors.Wrap(err, "proposer failed to make samples")
}
// TODO: how long should the node try to spend on getting a publishing round done before skipping?
ctx, _ := context.WithTimeout(n.subProcesses.ctx, 2*time.Second*time.Duration(n.conf.SECONDS_PER_SLOT))
for i, sample := range samples {
go func(i int, sample ShardBlockDataChunk) { // TODO: high parallelism here, maybe too much, might need to change it
var buf bytes.Buffer
if err := sample.Serialize(codec.NewEncodingWriter(&buf)); err != nil {
n.log.With(zap.Error(err)).Error("failed to encode sample for vert net")
return
}
err := n.verticalSubnets[i].Publish(ctx, buf.Bytes())
if err != nil {
n.log.With(zap.Error(err)).Error("failed to publish to vert net")
return
}
}(i, sample)
}
}
return nil
}