Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Handle staking info since kaia fork in downloader." #2182

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions datasync/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,18 +396,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return errBusy
}
defer func() {
atomic.StoreInt32(&d.synchronising, 0)
// Staking info for kaia block has been temporarily stored in DB for post-download VerifyHeader.
// Now that the download and sync are complete, remove staking info for kaia block from DB
// to save space. They might stay in the cache though.
for _, blockNum := range d.queue.stakingInfoStoredBlocks {
if d.blockchain.Config().IsKaiaForkEnabled(big.NewInt(int64(blockNum) + 1)) {
reward.DeleteStakingInfoFromDB(blockNum)
}
}
d.queue.stakingInfoStoredBlocks = []uint64{} // Reset stored blocks
}()
defer atomic.StoreInt32(&d.synchronising, 0)

// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
Expand Down Expand Up @@ -1899,7 +1888,6 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
logger.Error("Inserting downloaded staking info is failed", "err", err)
return fmt.Errorf("failed to insert the downloaded staking information: %v", err)
} else {
d.queue.stakingInfoStoredBlocks = append(d.queue.stakingInfoStoredBlocks, result.StakingInfo.BlockNum)
logger.Info("Imported new staking information", "number", result.StakingInfo.BlockNum)
}
}
Expand All @@ -1919,7 +1907,6 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
logger.Error("Inserting downloaded staking info is failed on pivot block", "err", err, "pivot", block.Number())
return fmt.Errorf("failed to insert the downloaded staking information on pivot block (%v) : %v", block.Number(), err)
} else {
d.queue.stakingInfoStoredBlocks = append(d.queue.stakingInfoStoredBlocks, result.StakingInfo.BlockNum)
logger.Info("Imported new staking information on pivot block", "number", result.StakingInfo.BlockNum, "pivot", block.Number())
}
}
Expand Down
48 changes: 2 additions & 46 deletions datasync/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ var (
lock sync.Mutex
setter *govSetter
testStakingUpdateInterval = uint64(4)
kaiaCompatibleBlock *big.Int
)

// govSetter sets governance items for testing purpose
Expand Down Expand Up @@ -359,9 +358,7 @@ func (dl *downloadTester) CurrentBlock() *types.Block {

// Config retrieves the chain configuration of the tester.
func (dl *downloadTester) Config() *params.ChainConfig {
config := params.TestChainConfig
config.KaiaCompatibleBlock = kaiaCompatibleBlock
return config
return params.TestChainConfig
}

// CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
Expand Down Expand Up @@ -775,12 +772,7 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
receipts += length - common - fsMinFullBlocks
stakingInfos += length - common - fsMinFullBlocks
}
stakingInfos = stakingInfos / int(testStakingUpdateInterval) // assuming that staking information update interval is 4 or 1 (kaia fork)
if testStakingUpdateInterval == 1 {
// Since staking information is a previous block on kaia fork, it needs to be subtracted by 1
// [common = 1, 2, 3, 4, 5] => We need staking information at [1, 2, 3, 4], not [1, 2, 3, 4, 5]
stakingInfos = stakingInfos - 1
}
stakingInfos = stakingInfos / int(testStakingUpdateInterval) // assuming that staking information update interval is 4
switch tester.downloader.getMode() {
case FullSync:
receipts, stakingInfos = 1, 0
Expand Down Expand Up @@ -1008,11 +1000,6 @@ func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, L
func TestBoundedForkedSync65Full(t *testing.T) { testBoundedForkedSync(t, 65, FullSync) }
func TestBoundedForkedSync65Fast(t *testing.T) { testBoundedForkedSync(t, 65, FastSync) }

// The staking info after kaia should be deleted after finishing the sync process.
func TestBoundedForkedSync65FastOnKaia(t *testing.T) {
testBoundedForkedSync65FastOnKaia(t, 65, FastSync)
}

func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()

Expand All @@ -1038,37 +1025,6 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
}
}

func testBoundedForkedSync65FastOnKaia(t *testing.T, protocol int, mode SyncMode) {
originalStakingUpdateInterval := testStakingUpdateInterval
kaiaCompatibleBlock = big.NewInt(0)
testStakingUpdateInterval = 1
tester := newTester()
defer func() {
tester.terminate()
kaiaCompatibleBlock = nil
testStakingUpdateInterval = originalStakingUpdateInterval
}()

// Create a long enough forked chain
common, fork := 13, int(MaxForkAncestry+17)
hashesA, _, headersA, _, blocksA, _, receiptsA, _, stakingInfoA, _ := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true)

tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA, stakingInfoA)

// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("original", nil, mode); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
assertOwnChain(t, tester, common+fork+1)

for _, stakingInfo := range tester.ownStakingInfo {
has, _ := reward.HasStakingInfoFromDB(stakingInfo.BlockNum)
if has {
t.Fatalf("staking info should be deleted after sync process")
}
}
}

// Tests that chain forks are contained within a certain interval of the current
// chain head for short but heavy forks too. These are a bit special because they
// take different ancestor lookup paths.
Expand Down
20 changes: 7 additions & 13 deletions datasync/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newFetchResult(header *types.Header, mode SyncMode, proposerPolicy uint64,
if (fastSync || snapSync) && !header.EmptyReceipts() {
item.pending |= (1 << receiptType)
}
if (fastSync || snapSync) && proposerPolicy == uint64(istanbul.WeightedRandom) && (params.IsStakingUpdateInterval(header.Number.Uint64()) || isKaiaFork) {
if (fastSync || snapSync) && proposerPolicy == uint64(istanbul.WeightedRandom) && (params.IsStakingUpdateInterval(header.Number.Uint64()) && !isKaiaFork) {
item.pending |= (1 << stakingInfoType)
}
return item
Expand Down Expand Up @@ -155,8 +155,6 @@ type queue struct {
stakingInfoTaskQueue *prque.Prque // [kaia/65] Priority queue of the headers to fetch the staking infos for
stakingInfoPendPool map[string]*fetchRequest // [kaia/65] Currently pending staking info retrieval operations

stakingInfoStoredBlocks []uint64 // Block numbers for which staking info is stored in the DB

resultCache *resultStore // Downloaded but not yet delivered fetch results
resultSize common.StorageSize // Approximate size of a block (exponential moving average)

Expand Down Expand Up @@ -211,8 +209,6 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.stakingInfoTaskQueue.Reset()
q.stakingInfoPendPool = make(map[string]*fetchRequest)

q.stakingInfoStoredBlocks = []uint64{}

q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
}
Expand Down Expand Up @@ -382,14 +378,12 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
}

if (q.mode == FastSync || q.mode == SnapSync) && q.proposerPolicy == uint64(istanbul.WeightedRandom) {
if params.IsStakingUpdateInterval(header.Number.Uint64()) || q.IsKaiaFork(header.Number) {
if _, ok := q.stakingInfoTaskPool[hash]; ok {
logger.Trace("Header already scheduled for staking info fetch", "number", header.Number, "hash", hash)
} else {
q.stakingInfoTaskPool[hash] = header
q.stakingInfoTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
if (q.mode == FastSync || q.mode == SnapSync) && q.proposerPolicy == uint64(istanbul.WeightedRandom) && (params.IsStakingUpdateInterval(header.Number.Uint64()) && !q.IsKaiaFork(header.Number)) {
if _, ok := q.stakingInfoTaskPool[hash]; ok {
logger.Trace("Header already scheduled for staking info fetch", "number", header.Number, "hash", hash)
} else {
q.stakingInfoTaskPool[hash] = header
q.stakingInfoTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
inserts = append(inserts, header)
Expand Down
72 changes: 17 additions & 55 deletions datasync/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func init() {
targetBlocks := 128

var stakingInfos []*reward.StakingInfo
for i := 1; i <= 128; i += 1 {
for i := 4; i <= 128; i += 4 {
stakingInfos = append(stakingInfos, &reward.StakingInfo{BlockNum: uint64(i)})
}

Expand Down Expand Up @@ -112,33 +112,16 @@ func dummyPeer(id string) *peerConnection {
}

func TestBasics(t *testing.T) {
testBasics(t, false)
}

func TestBasicsKaia(t *testing.T) {
testBasics(t, true)
}

func testBasics(t *testing.T, kaiaFork bool) {
// set test staking update interval
orig := params.StakingUpdateInterval()
params.SetStakingUpdateInterval(testInterval)
defer params.SetStakingUpdateInterval(orig)

numOfBlocks := len(chain.blocks)
numOfReceipts := len(chain.blocks) / 2
numOfStakingInfos := len(chain.stakingInfos) / 4
effectiveStakingInterval := 4
config := params.TestChainConfig
if kaiaFork {
numOfStakingInfos = len(chain.stakingInfos)
effectiveStakingInterval = 1
config.KaiaCompatibleBlock = big.NewInt(0)
} else {
config.KaiaCompatibleBlock = nil
}
numOfStakingInfos := len(chain.stakingInfos)

q := newQueue(10, 10, uint64(istanbul.WeightedRandom), config)
q := newQueue(10, 10, uint64(istanbul.WeightedRandom), nil)
if !q.Idle() {
t.Errorf("new queue should be idle")
}
Expand All @@ -161,7 +144,7 @@ func testBasics(t *testing.T, kaiaFork bool) {
}
// staking info on every 4th block get added to task-queue
if got, exp := q.PendingStakingInfos(), numOfStakingInfos; got != exp {
t.Errorf("wrong pending staking info count, got %d, exp %d", got, exp)
t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
}
// Items are now queued for downloading, next step is that we tell the
// queue that a certain peer will deliver them for us
Expand Down Expand Up @@ -247,10 +230,10 @@ func testBasics(t *testing.T, kaiaFork bool) {
t.Fatal("should throttle")
}
// But we should still get the first things to fetch
if got, exp := len(fetchReq.Headers), 10/effectiveStakingInterval; got != exp {
if got, exp := len(fetchReq.Headers), 2; got != exp {
t.Fatalf("expected %d requests, got %d", exp, got)
}
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(effectiveStakingInterval); got != exp {
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(4); got != exp {
t.Fatalf("expected header %d, got %d", exp, got)
}
}
Expand All @@ -260,7 +243,7 @@ func testBasics(t *testing.T, kaiaFork bool) {
if got, exp := q.receiptTaskQueue.Size(), numOfReceipts-5; got != exp {
t.Fatalf("expected receipt task queue size %d, got %d", exp, got)
}
if got, exp := q.stakingInfoTaskQueue.Size(), numOfStakingInfos-(10/effectiveStakingInterval); got != exp {
if got, exp := q.stakingInfoTaskQueue.Size(), numOfStakingInfos-2; got != exp {
t.Fatalf("expected staking info task queue size %d, got %d", exp, got)
}
if got, exp := q.resultCache.countCompleted(), 0; got != exp {
Expand All @@ -277,7 +260,7 @@ func TestScheduleAfterKaia(t *testing.T) {
config := params.TestChainConfig
config.KaiaCompatibleBlock = big.NewInt(21)

numOfStakingInfos := 113 // [4, 8, 12, 16, 20] + [21, 22, ... , 128]
numOfStakingInfos := 5 // [4, 8, 12, 16, 20]; After kaia fork, it won't be scheduled.

q := newQueue(50, 50, uint64(istanbul.WeightedRandom), config)
if !q.Idle() {
Expand All @@ -303,53 +286,31 @@ func TestScheduleAfterKaia(t *testing.T) {
peer := dummyPeer("peer-1")
fetchReq, _, _ := q.ReserveStakingInfos(peer, 50)
// But we should still get the first things to fetch
// 35 = [4, 8, 12, 16, 20] + [21, 22, ... , 50]
if got, exp := len(fetchReq.Headers), 35; got != exp {
if got, exp := len(fetchReq.Headers), 5; got != exp {
t.Fatalf("expected %d requests, got %d", exp, got)
}
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(4); got != exp {
t.Fatalf("expected header %d, got %d", exp, got)
}
}
// 113 - 35 = 78
if got, exp := q.stakingInfoTaskQueue.Size(), 78; got != exp {
if got, exp := q.stakingInfoTaskQueue.Size(), 0; got != exp {
t.Fatalf("expected staking info task queue size %d, got %d", exp, got)
}
}

func TestEmptyBlocks(t *testing.T) {
testEmptyBlocks(t, false)
}

// Empty block doesn't need to fetch body and receipt, but still need to fetch staking info
// if it's staking update interval or kaia compatible block.
func TestEmptyBlocksAfterKaia(t *testing.T) {
testEmptyBlocks(t, true)
}

func testEmptyBlocks(t *testing.T, kaiaFork bool) {
// set test staking update interval
orig := params.StakingUpdateInterval()
params.SetStakingUpdateInterval(testInterval)
defer params.SetStakingUpdateInterval(orig)

numOfBlocks := len(emptyChain.blocks)
numOfStakingInfos := len(chain.stakingInfos) / 4
effectiveStakingInterval := testInterval
config := params.TestChainConfig
if kaiaFork {
numOfStakingInfos = len(chain.stakingInfos)
effectiveStakingInterval = 1
config.KaiaCompatibleBlock = big.NewInt(0)
} else {
config.KaiaCompatibleBlock = nil
}
numOfStakingInfos := len(emptyChain.stakingInfos)

q := newQueue(10, 10, uint64(istanbul.WeightedRandom), config)
q := newQueue(10, 10, uint64(istanbul.WeightedRandom), nil)

q.Prepare(1, FastSync)
// Schedule a batch of headers
// Since it's empty chain, it doesn't schedule receipts
q.Schedule(emptyChain.headers(), 1)
if q.Idle() {
t.Errorf("queue should not be idle")
Expand All @@ -368,6 +329,7 @@ func testEmptyBlocks(t *testing.T, kaiaFork bool) {
if got, exp := q.resultCache.countCompleted(), 0; got != exp {
t.Errorf("wrong processable count, got %d, exp %d", got, exp)
}

// Items are now queued for downloading, next step is that we tell the
// queue that a certain peer will deliver them for us
// That should trigger all of them to suddenly become 'done'
Expand Down Expand Up @@ -418,10 +380,10 @@ func testEmptyBlocks(t *testing.T, kaiaFork bool) {
t.Fatal("should throttle")
}
// But we should still get the first things to fetch
if got, exp := len(fetchReq.Headers), int(10/effectiveStakingInterval); got != exp {
if got, exp := len(fetchReq.Headers), 2; got != exp {
t.Fatalf("expected %d requests, got %d", exp, got)
}
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(effectiveStakingInterval); got != exp {
if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(4); got != exp {
t.Fatalf("expected header %d, got %d", exp, got)
}
}
Expand All @@ -431,10 +393,10 @@ func testEmptyBlocks(t *testing.T, kaiaFork bool) {
if q.receiptTaskQueue.Size() != 0 {
t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
}
if got, exp := q.stakingInfoTaskQueue.Size(), numOfStakingInfos-int(10/effectiveStakingInterval); got != exp {
if got, exp := q.stakingInfoTaskQueue.Size(), numOfStakingInfos-2; got != exp {
t.Fatalf("expected staking info task queue size %d, got %d", exp, got)
}
if got, exp := q.resultCache.countCompleted(), int(effectiveStakingInterval-1); got != exp {
if got, exp := q.resultCache.countCompleted(), 3; got != exp {
t.Errorf("wrong processable count, got %d, exp %d", got, exp)
}
}
Expand Down
4 changes: 4 additions & 0 deletions node/cn/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func senderTxHashIndexer(db database.DBManager, chainEvent <-chan blockchain.Cha
}

func checkSyncMode(config *Config) error {
// TODO-Kaia: allow snap sync after resolving the staking info sync issue
if config.SyncMode == downloader.SnapSync {
return errors.New("snap sync is temporarily disabled")
}
if !config.SyncMode.IsValid() {
return fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
Expand Down
10 changes: 0 additions & 10 deletions reward/staking_info_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type stakingInfoDB interface {
HasStakingInfo(blockNum uint64) (bool, error)
ReadStakingInfo(blockNum uint64) ([]byte, error)
WriteStakingInfo(blockNum uint64, stakingInfo []byte) error
DeleteStakingInfo(blockNum uint64)
}

// HasStakingInfoFromDB returns existence of staking information from miscdb.
Expand Down Expand Up @@ -72,12 +71,3 @@ func AddStakingInfoToDB(stakingInfo *StakingInfo) error {

return nil
}

func DeleteStakingInfoFromDB(blockNum uint64) error {
if stakingManager.stakingInfoDB == nil {
return ErrStakingDBNotSet
}

stakingManager.stakingInfoDB.DeleteStakingInfo(blockNum)
return nil
}
12 changes: 1 addition & 11 deletions reward/staking_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,7 @@ func GetStakingInfo(blockNum uint64) *StakingInfo {
if blockNum > 0 {
stakingBlockNumber--
}
if stakingInfo = GetStakingInfoForKaiaBlock(stakingBlockNumber); stakingInfo == nil {
// Get staking info from DB if it exists.
// The staking info stored in DB is for downloader to verify the header.
// After downloader verifies all headers, the staking info in DB will be removed.
if stakingInfo, err := getStakingInfoFromDB(stakingBlockNumber); stakingInfo != nil && err == nil {
// Fill in Gini coeff before adding to cache.
if err := fillMissingGiniCoefficient(stakingInfo, stakingBlockNumber); err != nil {
logger.Warn("Cannot fill in gini coefficient", "staking block number", stakingBlockNumber, "err", err)
}
}
}
stakingInfo = GetStakingInfoForKaiaBlock(stakingBlockNumber)
} else {
stakingBlockNumber = params.CalcStakingBlockNumber(blockNum)
stakingInfo = GetStakingInfoOnStakingBlock(stakingBlockNumber)
Expand Down
Loading
Loading