Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes elastic search indexer #1997

Merged
merged 4 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/indexer/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func serializeTransactions(
serializedData = append(serializedData, "\n"...)

buffLenWithCurrentTx := buff.Len() + len(meta) + len(serializedData)
if buffLenWithCurrentTx > txsBulkSizeThreshold {
if buffLenWithCurrentTx > txsBulkSizeThreshold && buff.Len() != 0 {
buffSlice = append(buffSlice, buff)
buff = bytes.Buffer{}
}
Expand Down
10 changes: 4 additions & 6 deletions core/indexer/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ func (ei *elasticIndexer) SaveBlock(
}

txsSizeInBytes := computeSizeOfTxs(ei.marshalizer, txPool)
go ei.database.SaveHeader(headerHandler, signersIndexes, body, notarizedHeadersHashes, txsSizeInBytes)
ei.database.SaveHeader(headerHandler, signersIndexes, body, notarizedHeadersHashes, txsSizeInBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be leave to go routines - add a go routine throttler ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the moment i will let as it is because method SaveBlock is called on a goroutine and i want the writing of header, miniblocks and transaction to be serial because this write do a lot of requests to elasticsearch database because have to check if miniblocks and transactions exits in database.


if len(body.MiniBlocks) == 0 {
return
}

go ei.database.SaveMiniblocks(headerHandler, body)
ei.database.SaveMiniblocks(headerHandler, body)

if ei.options.TxIndexingEnabled {
go ei.database.SaveTransactions(body, headerHandler, txPool, headerHandler.GetShardID())
ei.database.SaveTransactions(body, headerHandler, txPool, headerHandler.GetShardID())
}
}

Expand Down Expand Up @@ -149,9 +149,7 @@ func (ei *elasticIndexer) SaveValidatorsRating(indexID string, validatorsRatingI
//SaveValidatorsPubKeys will send all validators public keys to elasticsearch
func (ei *elasticIndexer) SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte, epoch uint32) {
for shardID, shardPubKeys := range validatorsPubKeys {
go func(id, epochNumber uint32, publicKeys [][]byte) {
ei.database.SaveShardValidatorsPubKeys(id, epochNumber, publicKeys)
}(shardID, epoch, shardPubKeys)
ei.database.SaveShardValidatorsPubKeys(shardID, epoch, shardPubKeys)
}
}

Expand Down
13 changes: 12 additions & 1 deletion core/indexer/elasticsearchDatabaseClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/elastic/go-elasticsearch/v7"
Expand Down Expand Up @@ -158,8 +159,18 @@ func (dc *databaseClient) DoMultiGet(obj object, index string) (object, error) {
return nil, fmt.Errorf("do multi get %s", res.String())
}

var responseBody []byte
responseBody, err = ioutil.ReadAll(res.Body)
if err != nil {
log.Warn("indexer: response body",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should delete this Warn, as responeBody should be nil if err != nil. Did you want to log res.Body? If you want to keep this warn you can add it in the next warn as additional info:
log.Warn("indexer:cannot read from response body", "body", string(responseBody), "error", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"body", string(responseBody))
log.Warn("indexer:cannot read from response body", "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil, err here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

var decodedBody object
if err := json.NewDecoder(res.Body).Decode(&decodedBody); err != nil {
if err := json.Unmarshal(responseBody, &decodedBody); err != nil {
log.Warn("indexer: response body",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this warn and add it in the next Warn as additional info, like suggested above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body

"body", string(responseBody))
log.Warn("indexer cannot decode body", "error", err)
return nil, err
}
Expand Down
51 changes: 28 additions & 23 deletions core/indexer/elasticsearchDatabase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strconv"
"strings"
"testing"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core"
Expand Down Expand Up @@ -601,32 +602,36 @@ func TestDoBulkRequestLimit(t *testing.T) {
esDatabase, _ := newElasticSearchDatabase(args)

//Generate transaction and hashes
numTransactions := 1000
dataSize := 12345
txs, hashes := generateTransactions(numTransactions, dataSize)

header := &dataBlock.Header{}
txsPool := make(map[string]data.TransactionHandler)
for i := 0; i < numTransactions; i++ {
txsPool[hashes[i]] = &txs[i]
}
for i := 0; i < 1000; i++ {
numTransactions := 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these two lines outside the for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

dataSize := 900001
txs, hashes := generateTransactions(numTransactions, dataSize)

miniblock := &dataBlock.MiniBlock{
TxHashes: make([][]byte, numTransactions),
Type: dataBlock.TxBlock,
}
for i := 0; i < numTransactions; i++ {
miniblock.TxHashes[i] = []byte(hashes[i])
}
header := &dataBlock.Header{}
txsPool := make(map[string]data.TransactionHandler)
for i := 0; i < numTransactions; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use j instead i. The outer for uses also i for iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

txsPool[hashes[i]] = &txs[i]
}

body := &dataBlock.Body{
MiniBlocks: []*dataBlock.MiniBlock{
miniblock,
},
miniblock := &dataBlock.MiniBlock{
TxHashes: make([][]byte, numTransactions),
Type: dataBlock.TxBlock,
}
for i := 0; i < numTransactions; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use j instead i. The outer for uses also i for iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

miniblock.TxHashes[i] = []byte(hashes[i])
}

body := &dataBlock.Body{
MiniBlocks: []*dataBlock.MiniBlock{
miniblock,
},
}
body.MiniBlocks[0].ReceiverShardID = 2
body.MiniBlocks[0].SenderShardID = 1

esDatabase.SaveTransactions(body, header, txsPool, 2)
}
body.MiniBlocks[0].ReceiverShardID = 2
body.MiniBlocks[0].SenderShardID = 1
esDatabase.SaveTransactions(body, header, txsPool, 2)
}

func generateTransactions(numTxs int, datFieldSize int) ([]transaction.Transaction, []string) {
Expand All @@ -647,7 +652,7 @@ func generateTransactions(numTxs int, datFieldSize int) ([]transaction.Transacti
Data: randomByteArray,
Signature: []byte("443e79a8d99ba093262c1db48c58ab3d59bcfeb313ca5cddf2a9d1d06f9894ec"),
}
hashes[i] = fmt.Sprintf("%d", i)
hashes[i] = fmt.Sprintf("%v", time.Now())
}

return txs, hashes
Expand Down