Skip to content

Commit

Permalink
exp/ingest/io: Skip storing live entries seen in the oldest bucket (#…
Browse files Browse the repository at this point in the history
…2618)

This commit updates `SingleLedgerStateReader` to skip updating a temp
ledger key store when processing the oldest bucket. This lowered memory
usage of `SingleLedgerStateReader` from 550MB to 325MB when processing
recent pubnet ledgers.
  • Loading branch information
bartekn committed May 27, 2020
1 parent 08ec13c commit 4b6180a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
36 changes: 23 additions & 13 deletions exp/ingest/io/single_ledger_state_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,25 @@ func (msr *SingleLedgerStateReader) streamBuckets() {
close(msr.readChan)
}()

var buckets []string
var buckets []historyarchive.Hash
for i := 0; i < len(msr.has.CurrentBuckets); i++ {
b := msr.has.CurrentBuckets[i]
buckets = append(buckets, b.Curr, b.Snap)
}
for _, hashString := range []string{b.Curr, b.Snap} {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash"))
return
}

for _, hashString := range buckets {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash"))
return
}
if hash.IsZero() {
continue
}

if hash.IsZero() {
continue
buckets = append(buckets, hash)
}
}

for i, hash := range buckets {
exists, err := msr.bucketExists(hash)
if err != nil {
msr.readChan <- msr.error(
Expand All @@ -192,7 +194,8 @@ func (msr *SingleLedgerStateReader) streamBuckets() {
return
}

if shouldContinue := msr.streamBucketContents(hash); !shouldContinue {
oldestBucket := i == len(buckets)-1
if shouldContinue := msr.streamBucketContents(hash, oldestBucket); !shouldContinue {
break
}
}
Expand Down Expand Up @@ -261,7 +264,7 @@ func (msr *SingleLedgerStateReader) newXDRStream(hash historyarchive.Hash) (
}

// streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true
func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash) bool {
func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash, oldestBucket bool) bool {
rdr, e := msr.newXDRStream(hash)
if e != nil {
msr.readChan <- msr.error(
Expand Down Expand Up @@ -428,6 +431,13 @@ LoopBucketEntry:
// > that the (chronologically) preceding entry with the same ledger
// > key was DEADENTRY.
if entry.Type == xdr.BucketEntryTypeLiveentry {
// We skip adding entries from the last bucket to tempStore because:
// 1. Ledger keys are unique within a single bucket.
// 2. This is the last bucket we process so there's no need to track
// seen last entries in this bucket.
if oldestBucket {
continue
}
err := msr.tempStore.Add(h)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error updating to tempStore"))
Expand Down
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* Drop support for MuxedAccounts strkeys (spec'ed in [SEP23](https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0023.md)).
SEP23 is still a draft and we don't want to encourage storing strkeys which may not be definite.
* Replace `SequenceProvider` implementation with one which queries the Horizon DB for sequence numbers instead of the Stellar Core DB.
* Decreased a memory usage of initial state ingestion stage and state verifier ([#2618](https://github.com/stellar/go/pull/2618)).

## v1.3.0

Expand Down

0 comments on commit 4b6180a

Please sign in to comment.