-
Notifications
You must be signed in to change notification settings - Fork 322
/
sync_indexers.go
125 lines (113 loc) · 3.6 KB
/
sync_indexers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.
package blockindex
import (
"context"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/blockdao"
)
// SyncIndexers is a special index that includes multiple indexes,
// which stay in sync when blocks are added.
type SyncIndexers struct {
indexers []blockdao.BlockIndexer
startHeights []uint64 // start height of each indexer, which will be determined when the indexer is started
minStartHeight uint64 // minimum start height of all indexers
}
// NewSyncIndexers creates a new SyncIndexers
// each indexer will PutBlock one by one in the order of the indexers
func NewSyncIndexers(indexers ...blockdao.BlockIndexer) *SyncIndexers {
return &SyncIndexers{indexers: indexers}
}
// Start starts the indexer group
func (ig *SyncIndexers) Start(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Start(ctx); err != nil {
return err
}
}
return ig.initStartHeight()
}
// Stop stops the indexer group
func (ig *SyncIndexers) Stop(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Stop(ctx); err != nil {
return err
}
}
return nil
}
// PutBlock puts a block into the indexers in the group
func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error {
for i, indexer := range ig.indexers {
// check if the block is higher than the indexer's start height
if blk.Height() < ig.startHeights[i] {
continue
}
// check if the block is higher than the indexer's height
height, err := indexer.Height()
if err != nil {
return err
}
if blk.Height() <= height {
continue
}
// put block
if err := indexer.PutBlock(ctx, blk); err != nil {
return err
}
}
return nil
}
// DeleteTipBlock deletes the tip block from the indexers in the group
func (ig *SyncIndexers) DeleteTipBlock(ctx context.Context, blk *block.Block) error {
for _, indexer := range ig.indexers {
if err := indexer.DeleteTipBlock(ctx, blk); err != nil {
return err
}
}
return nil
}
// StartHeight returns the minimum start height of the indexers in the group
func (ig *SyncIndexers) StartHeight() uint64 {
return ig.minStartHeight
}
// Height returns the minimum height of the indexers in the group
func (ig *SyncIndexers) Height() (uint64, error) {
var height uint64
for i, indexer := range ig.indexers {
h, err := indexer.Height()
if err != nil {
return 0, err
}
if i == 0 || h < height {
height = h
}
}
return height, nil
}
// initStartHeight initializes the start height of the indexers in the group
// for every indexer, the start height is the maximum of tipheight+1 and startheight
func (ig *SyncIndexers) initStartHeight() error {
ig.minStartHeight = 0
ig.startHeights = make([]uint64, len(ig.indexers))
for i, indexer := range ig.indexers {
tipHeight, err := indexer.Height()
if err != nil {
return err
}
indexStartHeight := tipHeight + 1
if indexerWithStart, ok := indexer.(blockdao.BlockIndexerWithStart); ok {
startHeight := indexerWithStart.StartHeight()
if startHeight > indexStartHeight {
indexStartHeight = startHeight
}
}
ig.startHeights[i] = indexStartHeight
if i == 0 || indexStartHeight < ig.minStartHeight {
ig.minStartHeight = indexStartHeight
}
}
return nil
}