Skip to content

Commit

Permalink
polygon/heimdall: optimise fetch checkpoints (#9897)
Browse files Browse the repository at this point in the history
Brings down time to fetch all checkpoints from 4 hours to 42 seconds in
combination with maticnetwork/heimdall#1154

```
DBUG[04-09|21:12:39.754] [bor.heimdall] fetching all checkpoints 
DBUG[04-09|21:13:10.355] [bor.heimdall] fetching all checkpoints progress page=5 len=40000
DBUG[04-09|21:13:22.061] [bor.heimdall] fetching all checkpoints done len=60474 duration=42.307967083s
```
  • Loading branch information
taratorio committed Apr 11, 2024
1 parent 63c7118 commit e7d5a84
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 15 deletions.
23 changes: 23 additions & 0 deletions cmd/devnet/services/polygon/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (h *Heimdall) FetchCheckpointCount(ctx context.Context) (int64, error) {
return 0, fmt.Errorf("TODO")
}

func (h *Heimdall) FetchCheckpoints(ctx context.Context, page uint64, limit uint64) (heimdall.Checkpoints, error) {
return nil, fmt.Errorf("TODO")
}

func (h *Heimdall) FetchMilestone(ctx context.Context, number int64) (*heimdall.Milestone, error) {
return nil, fmt.Errorf("TODO")
}
Expand Down Expand Up @@ -475,6 +479,25 @@ func makeHeimdallRouter(ctx context.Context, client heimdall.HeimdallClient) *ch
writeResponse(w, wrapResult(result), err)
})

router.Get("/checkpoints/list", func(w http.ResponseWriter, r *http.Request) {
pageStr := r.URL.Query().Get("page")
page, err := strconv.ParseUint(pageStr, 10, 64)
if err != nil {
http.Error(w, http.StatusText(400), 400)
return
}

limitStr := r.URL.Query().Get("limit")
limit, err := strconv.ParseUint(limitStr, 10, 64)
if err != nil {
http.Error(w, http.StatusText(400), 400)
return
}

result, err := client.FetchCheckpoints(ctx, page, limit)
writeResponse(w, wrapResult(result), err)
})

router.Get("/milestone/{number}", func(w http.ResponseWriter, r *http.Request) {
numberStr := chi.URLParam(r, "number")
number, err := strconv.ParseInt(numberStr, 10, 64)
Expand Down
4 changes: 4 additions & 0 deletions polygon/bor/bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (h test_heimdall) FetchCheckpointCount(ctx context.Context) (int64, error)
return 0, fmt.Errorf("TODO")
}

func (h *test_heimdall) FetchCheckpoints(ctx context.Context, page uint64, limit uint64) (heimdall.Checkpoints, error) {
return nil, fmt.Errorf("TODO")
}

func (h test_heimdall) FetchMilestone(ctx context.Context, number int64) (*heimdall.Milestone, error) {
return nil, fmt.Errorf("TODO")
}
Expand Down
21 changes: 21 additions & 0 deletions polygon/heimdall/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ func (c *Checkpoint) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &c.Fields)
}

type Checkpoints []*Checkpoint

func (cs *Checkpoints) Len() int {
return len(*cs)
}

func (cs *Checkpoints) Less(i, j int) bool {
v := *cs
return v[i].StartBlock().Uint64() < v[j].StartBlock().Uint64()
}

func (cs *Checkpoints) Swap(i, j int) {
v := *cs
v[i], v[j] = v[j], v[i]
}

type CheckpointResponse struct {
Height string `json:"height"`
Result Checkpoint `json:"result"`
Expand All @@ -74,3 +90,8 @@ type CheckpointCountResponse struct {
Height string `json:"height"`
Result CheckpointCount `json:"result"`
}

type CheckpointListResponse struct {
Height string `json:"height"`
Result Checkpoints `json:"result"`
}
27 changes: 25 additions & 2 deletions polygon/heimdall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type HeimdallClient interface {

FetchCheckpoint(ctx context.Context, number int64) (*Checkpoint, error)
FetchCheckpointCount(ctx context.Context) (int64, error)
FetchCheckpoints(ctx context.Context, page uint64, limit uint64) (Checkpoints, error)
FetchMilestone(ctx context.Context, number int64) (*Milestone, error)
FetchMilestoneCount(ctx context.Context) (int64, error)

Expand Down Expand Up @@ -106,8 +107,10 @@ const (
fetchStateSyncEventsFormat = "from-id=%d&to-time=%d&limit=%d"
fetchStateSyncEventsPath = "clerk/event-record/list"

fetchCheckpoint = "/checkpoints/%s"
fetchCheckpointCount = "/checkpoints/count"
fetchCheckpoint = "/checkpoints/%s"
fetchCheckpointCount = "/checkpoints/count"
fetchCheckpointList = "/checkpoints/list"
fetchCheckpointListQueryFormat = "page=%d&limit=%d"

fetchMilestoneAt = "/milestone/%d"
fetchMilestoneLatest = "/milestone/latest"
Expand Down Expand Up @@ -219,6 +222,22 @@ func (c *Client) FetchCheckpoint(ctx context.Context, number int64) (*Checkpoint
return &response.Result, nil
}

func (c *Client) FetchCheckpoints(ctx context.Context, page uint64, limit uint64) (Checkpoints, error) {
url, err := checkpointListURL(c.urlString, page, limit)
if err != nil {
return nil, err
}

ctx = withRequestType(ctx, checkpointListRequest)

response, err := FetchWithRetry[CheckpointListResponse](ctx, c, url, c.logger)
if err != nil {
return nil, err
}

return response.Result, nil
}

func isInvalidMilestoneIndexError(err error) bool {
return errors.Is(err, ErrNotSuccessfulResponse) &&
strings.Contains(err.Error(), "Invalid milestone index")
Expand Down Expand Up @@ -459,6 +478,10 @@ func checkpointCountURL(urlString string) (*url.URL, error) {
return makeURL(urlString, fetchCheckpointCount, "")
}

func checkpointListURL(urlString string, page uint64, limit uint64) (*url.URL, error) {
return makeURL(urlString, fetchCheckpointList, fmt.Sprintf(fetchCheckpointListQueryFormat, page, limit))
}

func milestoneURL(urlString string, number int64) (*url.URL, error) {
if number == -1 {
return makeURL(urlString, fetchMilestoneLatest, "")
Expand Down
15 changes: 15 additions & 0 deletions polygon/heimdall/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 102 additions & 9 deletions polygon/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package heimdall
import (
"context"
"errors"
"sort"
"time"

"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -37,6 +38,8 @@ var ErrIncompleteMilestoneRange = errors.New("milestone range doesn't contain th
var ErrIncompleteCheckpointRange = errors.New("checkpoint range doesn't contain the start block")
var ErrIncompleteSpanRange = errors.New("span range doesn't contain the start block")

const checkpointsBatchFetchThreshold = 100

type heimdall struct {
client HeimdallClient
pollDelay time.Duration
Expand Down Expand Up @@ -65,27 +68,56 @@ func (h *heimdall) LastCheckpointId(ctx context.Context, _ CheckpointStore) (Che
}

func (h *heimdall) FetchCheckpointsFromBlock(ctx context.Context, store CheckpointStore, startBlock uint64) (Waypoints, error) {
h.logger.Trace(heimdallLogPrefix("fetching checkpoints from block"), "start", startBlock)
h.logger.Debug(heimdallLogPrefix("fetching checkpoints from block"), "start", startBlock)
startFetchTime := time.Now()

count, _, err := h.LastCheckpointId(ctx, store)
lastStoredCheckpointId, _, err := store.LastCheckpointId(ctx)
if err != nil {
return nil, err
}

count, err := h.client.FetchCheckpointCount(ctx)
if err != nil {
return nil, err
}

latestCheckpointId := CheckpointId(count)
checkpointsToFetch := count - int64(lastStoredCheckpointId)
if checkpointsToFetch >= checkpointsBatchFetchThreshold {
checkpoints, err := h.batchFetchCheckpoints(ctx, store, lastStoredCheckpointId, latestCheckpointId)
if err != nil {
return nil, err
}

startCheckpointIdx, found := sort.Find(len(checkpoints), func(i int) int {
return checkpoints[i].CmpRange(startBlock)
})
if !found {
return nil, ErrIncompleteCheckpointRange
}

checkpoints = checkpoints[startCheckpointIdx:]
waypoints := make(Waypoints, len(checkpoints))
for i, checkpoint := range checkpoints {
waypoints[i] = checkpoint
}

return waypoints, nil
}

progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

var checkpoints []Waypoint
var endBlock uint64
for i := count; i >= 1; i-- {
for i := latestCheckpointId; i >= 1; i-- {
select {
case <-progressLogTicker.C:
h.logger.Info(
heimdallLogPrefix("fetch checkpoints from block progress (backwards)"),
"checkpoint number", i,
"start", startBlock,
"last", count,
"latestCheckpointId", latestCheckpointId,
"currentCheckpointId", i,
"startBlock", startBlock,
)
default:
// carry on
Expand Down Expand Up @@ -119,7 +151,7 @@ func (h *heimdall) FetchCheckpointsFromBlock(ctx context.Context, store Checkpoi

common.SliceReverse(checkpoints)

h.logger.Trace(
h.logger.Debug(
heimdallLogPrefix("finished fetching checkpoints from block"),
"count", len(checkpoints),
"start", startBlock,
Expand Down Expand Up @@ -189,7 +221,7 @@ func (h *heimdall) LastMilestoneId(ctx context.Context, _ MilestoneStore) (Miles
}

func (h *heimdall) FetchMilestonesFromBlock(ctx context.Context, store MilestoneStore, startBlock uint64) (Waypoints, error) {
h.logger.Trace(heimdallLogPrefix("fetching milestones from block"), "start", startBlock)
h.logger.Debug(heimdallLogPrefix("fetching milestones from block"), "start", startBlock)
startFetchTime := time.Now()

last, _, err := h.LastMilestoneId(ctx, store)
Expand Down Expand Up @@ -243,7 +275,7 @@ func (h *heimdall) FetchMilestonesFromBlock(ctx context.Context, store Milestone

common.SliceReverse(milestones)

h.logger.Trace(
h.logger.Debug(
heimdallLogPrefix("finished fetching milestones from block"),
"count", len(milestones),
"start", startBlock,
Expand Down Expand Up @@ -559,6 +591,66 @@ func (h *heimdall) pollMilestones(ctx context.Context, store MilestoneStore, tip
}
}

func (h *heimdall) batchFetchCheckpoints(
ctx context.Context,
store CheckpointStore,
lastStored CheckpointId,
latest CheckpointId,
) (Checkpoints, error) {
// TODO: once heimdall API is fixed to return sorted items in pages we can only fetch
// the new pages after lastStoredCheckpointId using the checkpoints/list paging API
// (for now we have to fetch all of them)
// and also remove sorting we do after fetching

h.logger.Debug(heimdallLogPrefix("batch fetching checkpoints"))

fetchStartTime := time.Now()
progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

page := uint64(1)
count := int64(latest)
checkpoints := make(Checkpoints, 0, count)
for count > 0 {
checkpointsBatch, err := h.client.FetchCheckpoints(ctx, page, 10_000)
if err != nil {
return nil, err
}

select {
case <-progressLogTicker.C:
h.logger.Debug(
heimdallLogPrefix("batch fetch checkpoints progress"),
"page", page,
"len", len(checkpoints),
)
default:
// carry-on
}

checkpoints = append(checkpoints, checkpointsBatch...)
count = count - int64(len(checkpointsBatch))
page++
}

sort.Sort(&checkpoints)

for i, checkpoint := range checkpoints[lastStored:] {
err := store.PutCheckpoint(ctx, CheckpointId(i+1), checkpoint)
if err != nil {
return nil, err
}
}

h.logger.Debug(
heimdallLogPrefix("batch fetch checkpoints done"),
"len", len(checkpoints),
"duration", time.Since(fetchStartTime),
)

return checkpoints, nil
}

func (h *heimdall) waitPollingDelay(ctx context.Context) {
pollDelayTimer := time.NewTimer(h.pollDelay)
defer pollDelayTimer.Stop()
Expand All @@ -567,5 +659,6 @@ func (h *heimdall) waitPollingDelay(ctx context.Context) {
case <-ctx.Done():
return
case <-pollDelayTimer.C:
return
}
}
Loading

0 comments on commit e7d5a84

Please sign in to comment.