From e4c375b7eed85f453cc673c81af3d3263f6d3181 Mon Sep 17 00:00:00 2001 From: Dominik Zajkowski Date: Tue, 4 May 2021 16:22:29 +0200 Subject: [PATCH] [Kaizen] Add debug logs for PendingTransactionsManager --- .../iohk/ethereum/utils/ByteStringUtils.scala | 4 ++++ .../ethereum/ledger/BlockImporterItSpec.scala | 10 +++++---- .../ethereum/consensus/pow/MockedMiner.scala | 4 +++- .../io/iohk/ethereum/domain/BlockBody.scala | 3 ++- .../ethereum/domain/SignedTransaction.scala | 1 - .../ethereum/jsonrpc/PersonalService.scala | 7 ++++--- .../ethereum/ledger/BlockPreparator.scala | 9 ++++---- .../network/p2p/messages/CommonMessages.scala | 3 ++- .../PendingTransactionsManager.scala | 21 +++++++++++++++---- .../sync/regular/RegularSyncSpec.scala | 5 ++++- 10 files changed, 47 insertions(+), 20 deletions(-) diff --git a/bytes/src/main/scala/io/iohk/ethereum/utils/ByteStringUtils.scala b/bytes/src/main/scala/io/iohk/ethereum/utils/ByteStringUtils.scala index 1101741dad..321987bcaa 100644 --- a/bytes/src/main/scala/io/iohk/ethereum/utils/ByteStringUtils.scala +++ b/bytes/src/main/scala/io/iohk/ethereum/utils/ByteStringUtils.scala @@ -30,6 +30,10 @@ object ByteStringUtils { } } + implicit class ByteStringOps(val bytes: ByteString) extends AnyVal { + def toHex: String = Hex.toHexString(bytes.toArray[Byte]) + } + sealed trait ByteStringElement { def len: Int def asByteArray: Array[Byte] diff --git a/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala b/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala index e805d4083c..040e8c69ae 100644 --- a/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala +++ b/src/it/scala/io/iohk/ethereum/ledger/BlockImporterItSpec.scala @@ -265,10 +265,12 @@ class BlockImporterItSpec eventually { - val msg = fetcherProbe.fishForMessage(Timeouts.longTimeout) { - case BlockFetcher.FetchStateNode(_) => true - case _ => false - }.asInstanceOf[BlockFetcher.FetchStateNode] + val msg = fetcherProbe + .fishForMessage(Timeouts.longTimeout) { + case BlockFetcher.FetchStateNode(_) => true + case _ => false + } + .asInstanceOf[BlockFetcher.FetchStateNode] msg.hash.length should be > 0 } diff --git a/src/main/scala/io/iohk/ethereum/consensus/pow/MockedMiner.scala b/src/main/scala/io/iohk/ethereum/consensus/pow/MockedMiner.scala index be6ed4523c..b2a34be3ca 100644 --- a/src/main/scala/io/iohk/ethereum/consensus/pow/MockedMiner.scala +++ b/src/main/scala/io/iohk/ethereum/consensus/pow/MockedMiner.scala @@ -13,7 +13,9 @@ import io.iohk.ethereum.domain.{Block, Blockchain} import io.iohk.ethereum.ledger.InMemoryWorldStateProxy import io.iohk.ethereum.nodebuilder.Node import io.iohk.ethereum.utils.ByteStringUtils +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps import monix.execution.Scheduler + import scala.concurrent.duration._ class MockedMiner( @@ -79,7 +81,7 @@ class MockedMiner( log.info( s"Mining mocked block {} successful. Included transactions: {}", minedBlock.idTag, - minedBlock.body.transactionList.map(_.hashAsHexString) + minedBlock.body.transactionList.map(_.hash.toHex) ) syncEventListener ! SyncProtocol.MinedBlock(minedBlock) // because of using seconds to calculate block timestamp, we can't mine blocks faster than one block per second diff --git a/src/main/scala/io/iohk/ethereum/domain/BlockBody.scala b/src/main/scala/io/iohk/ethereum/domain/BlockBody.scala index d1e2be7084..fbafe00c82 100644 --- a/src/main/scala/io/iohk/ethereum/domain/BlockBody.scala +++ b/src/main/scala/io/iohk/ethereum/domain/BlockBody.scala @@ -2,13 +2,14 @@ package io.iohk.ethereum.domain import io.iohk.ethereum.domain.BlockHeaderImplicits._ import io.iohk.ethereum.rlp.{RLPEncodeable, RLPList, RLPSerializable, rawDecode} +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps case class BlockBody(transactionList: Seq[SignedTransaction], uncleNodesList: Seq[BlockHeader]) { override def toString: String = s"BlockBody{ transactionList: $transactionList, uncleNodesList: $uncleNodesList }" def toShortString: String = - s"BlockBody { transactionsList: ${transactionList.map(_.hashAsHexString)}, uncleNodesList: ${uncleNodesList.map(_.hashAsHexString)} }" + s"BlockBody { transactionsList: ${transactionList.map(_.hash.toHex)}, uncleNodesList: ${uncleNodesList.map(_.hashAsHexString)} }" lazy val numberOfTxs: Int = transactionList.size diff --git a/src/main/scala/io/iohk/ethereum/domain/SignedTransaction.scala b/src/main/scala/io/iohk/ethereum/domain/SignedTransaction.scala index a135f76133..eca2539e82 100644 --- a/src/main/scala/io/iohk/ethereum/domain/SignedTransaction.scala +++ b/src/main/scala/io/iohk/ethereum/domain/SignedTransaction.scala @@ -170,7 +170,6 @@ case class SignedTransaction(tx: Transaction, signature: ECDSASignature) { signature.v != ECDSASignature.negativePointSign && signature.v != ECDSASignature.positivePointSign lazy val hash: ByteString = ByteString(kec256(this.toBytes: Array[Byte])) - lazy val hashAsHexString: String = Hex.toHexString(hash.toArray[Byte]) } case class SignedTransactionWithSender(tx: SignedTransaction, senderAddress: Address) diff --git a/src/main/scala/io/iohk/ethereum/jsonrpc/PersonalService.scala b/src/main/scala/io/iohk/ethereum/jsonrpc/PersonalService.scala index 07ed3fd5ba..a318c44341 100644 --- a/src/main/scala/io/iohk/ethereum/jsonrpc/PersonalService.scala +++ b/src/main/scala/io/iohk/ethereum/jsonrpc/PersonalService.scala @@ -1,7 +1,6 @@ package io.iohk.ethereum.jsonrpc import java.time.Duration - import akka.actor.ActorRef import akka.util.{ByteString, Timeout} import io.iohk.ethereum.crypto @@ -15,10 +14,11 @@ import io.iohk.ethereum.keystore.{KeyStore, Wallet} import io.iohk.ethereum.rlp.RLPList import io.iohk.ethereum.transactions.PendingTransactionsManager import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddOrOverrideTransaction, PendingTransactionsResponse} -import io.iohk.ethereum.utils.{BlockchainConfig, TxPoolConfig} +import io.iohk.ethereum.utils.{BlockchainConfig, Logger, TxPoolConfig} import io.iohk.ethereum.rlp import io.iohk.ethereum.rlp.RLPImplicits._ import io.iohk.ethereum.rlp.RLPImplicitConversions._ +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps import monix.eval.Task import scala.util.Try @@ -72,7 +72,7 @@ class PersonalService( appStateStorage: AppStateStorage, blockchainConfig: BlockchainConfig, txPoolConfig: TxPoolConfig -) { +) extends Logger { private val unlockedWallets: ExpiringMap[Address, Wallet] = ExpiringMap.empty(Duration.ofSeconds(defaultUnlockTime)) @@ -212,6 +212,7 @@ class PersonalService( } else { wallet.signTx(tx, None) } + log.debug("Trying to add personal transaction: {}", stx.tx.hash.toHex) txPool ! AddOrOverrideTransaction(stx.tx) diff --git a/src/main/scala/io/iohk/ethereum/ledger/BlockPreparator.scala b/src/main/scala/io/iohk/ethereum/ledger/BlockPreparator.scala index d5b57559f8..b01148a419 100644 --- a/src/main/scala/io/iohk/ethereum/ledger/BlockPreparator.scala +++ b/src/main/scala/io/iohk/ethereum/ledger/BlockPreparator.scala @@ -7,6 +7,7 @@ import io.iohk.ethereum.domain._ import io.iohk.ethereum.ledger.BlockExecutionError.{StateBeforeFailure, TxsExecutionError} import io.iohk.ethereum.ledger.Ledger._ import io.iohk.ethereum.ledger.BlockPreparator._ +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps import io.iohk.ethereum.utils.{BlockchainConfig, Config, Logger} import io.iohk.ethereum.vm.{PC => _, _} @@ -237,7 +238,7 @@ class BlockPreparator( blockHeader: BlockHeader, world: InMemoryWorldStateProxy ): TxResult = { - log.debug(s"Transaction ${stx.hashAsHexString} execution start") + log.debug(s"Transaction ${stx.hash.toHex} execution start") val gasPrice = UInt256(stx.tx.gasPrice) val gasLimit = stx.tx.gasLimit @@ -266,7 +267,7 @@ class BlockPreparator( val world2 = (deleteAccountsFn andThen deleteTouchedAccountsFn andThen persistStateFn)(worldAfterPayments) - log.debug(s"""Transaction ${stx.hashAsHexString} execution end. Summary: + log.debug(s"""Transaction ${stx.hash.toHex} execution end. Summary: | - Error: ${result.error}. | - Total Gas to Refund: $totalGasToRefund | - Execution gas paid to miner: $executionGasToPayToMiner""".stripMargin) @@ -339,7 +340,7 @@ class BlockPreparator( logs = logs ) - log.debug(s"Receipt generated for tx ${stx.hashAsHexString}, $receipt") + log.debug(s"Receipt generated for tx ${stx.hash.toHex}, $receipt") executeTransactions(otherStxs, newWorld, blockHeader, receipt.cumulativeGasUsed, acumReceipts :+ receipt) case Left(error) => @@ -361,7 +362,7 @@ class BlockPreparator( result match { case Left(TxsExecutionError(stx, StateBeforeFailure(worldState, gas, receipts), reason)) => - log.debug(s"failure while preparing block because of $reason in transaction with hash ${stx.hashAsHexString}") + log.debug(s"failure while preparing block because of $reason in transaction with hash ${stx.hash.toHex}") val txIndex = signedTransactions.indexWhere(tx => tx.hash == stx.hash) executePreparedTransactions( signedTransactions.drop(txIndex + 1), diff --git a/src/main/scala/io/iohk/ethereum/network/p2p/messages/CommonMessages.scala b/src/main/scala/io/iohk/ethereum/network/p2p/messages/CommonMessages.scala index 1622b57b06..6b312f21f5 100644 --- a/src/main/scala/io/iohk/ethereum/network/p2p/messages/CommonMessages.scala +++ b/src/main/scala/io/iohk/ethereum/network/p2p/messages/CommonMessages.scala @@ -7,6 +7,7 @@ import io.iohk.ethereum.network.p2p.{Message, MessageSerializableImplicit} import io.iohk.ethereum.rlp.RLPImplicitConversions._ import io.iohk.ethereum.rlp.RLPImplicits._ import io.iohk.ethereum.rlp._ +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps import io.iohk.ethereum.utils.Config import org.bouncycastle.util.encoders.Hex @@ -204,6 +205,6 @@ object CommonMessages { case class SignedTransactions(txs: Seq[SignedTransaction]) extends Message { override def code: Int = Codes.SignedTransactionsCode override def toShortString: String = - s"SignedTransactions { txs: ${txs.map(_.hashAsHexString)} }" + s"SignedTransactions { txs: ${txs.map(_.hash.toHex)} }" } } diff --git a/src/main/scala/io/iohk/ethereum/transactions/PendingTransactionsManager.scala b/src/main/scala/io/iohk/ethereum/transactions/PendingTransactionsManager.scala index 0f9587eccf..fb920b20d2 100644 --- a/src/main/scala/io/iohk/ethereum/transactions/PendingTransactionsManager.scala +++ b/src/main/scala/io/iohk/ethereum/transactions/PendingTransactionsManager.scala @@ -1,6 +1,6 @@ package io.iohk.ethereum.transactions -import akka.actor.{Actor, ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.util.{ByteString, Timeout} import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} import io.iohk.ethereum.domain.{SignedTransaction, SignedTransactionWithSender} @@ -10,7 +10,9 @@ import io.iohk.ethereum.network.PeerManagerActor.Peers import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId, PeerManagerActor} import io.iohk.ethereum.transactions.SignedTransactionsFilterActor.ProperSignedTransactions +import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps import io.iohk.ethereum.utils.TxPoolConfig + import scala.jdk.CollectionConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ @@ -52,7 +54,8 @@ class PendingTransactionsManager( etcPeerManager: ActorRef, peerEventBus: ActorRef ) extends Actor - with MetricsContainer { + with MetricsContainer + with ActorLogging { import PendingTransactionsManager._ import akka.pattern.ask @@ -77,16 +80,17 @@ class PendingTransactionsManager( .maximumSize(txPoolConfig.txPoolSize) .removalListener((notification: RemovalNotification[ByteString, PendingTransaction]) => if (notification.wasEvicted()) { + log.debug("Evicting transaction: {} due to {}", notification.getKey.toHex, notification.getCause) knownTransactions = knownTransactions.filterNot(_._1 == notification.getKey) } ) .build() - implicit val timeout = Timeout(3.seconds) + implicit val timeout: Timeout = Timeout(3.seconds) peerEventBus ! Subscribe(SubscriptionClassifier.PeerHandshaked) - val transactionFilter = context.actorOf(SignedTransactionsFilterActor.props(context.self, peerEventBus)) + val transactionFilter: ActorRef = context.actorOf(SignedTransactionsFilterActor.props(context.self, peerEventBus)) // scalastyle:off method.length override def receive: Receive = { @@ -101,6 +105,7 @@ class PendingTransactionsManager( case AddTransactions(signedTransactions) => pendingTransactions.cleanUp() + log.debug("Adding transactions: {}", signedTransactions.map(_.tx.hash.toHex)) val stxs = pendingTransactions.asMap().values().asScala.map(_.stx).toSet val transactionsToAdd = signedTransactions.diff(stxs) if (transactionsToAdd.nonEmpty) { @@ -115,6 +120,7 @@ class PendingTransactionsManager( case AddOrOverrideTransaction(newStx) => pendingTransactions.cleanUp() + log.debug("Overriding transaction: {}", newStx.hash.toHex) // Only validated transactions are added this way, it is safe to call get val newStxSender = SignedTransaction.getSender(newStx).get val obsoleteTxs = pendingTransactions @@ -135,6 +141,11 @@ class PendingTransactionsManager( case NotifyPeers(signedTransactions, peers) => pendingTransactions.cleanUp() + log.debug( + "Notifying peers {} about transactions {}", + peers.map(_.nodeId.map(_.toHex)), + signedTransactions.map(_.tx.hash.toHex) + ) val pendingTxMap = pendingTransactions.asMap() val stillPending = signedTransactions .filter(stx => pendingTxMap.containsKey(stx.tx.hash)) // signed transactions that are still pending @@ -153,6 +164,7 @@ class PendingTransactionsManager( case RemoveTransactions(signedTransactions) => pendingTransactions.invalidateAll(signedTransactions.map(_.hash).asJava) + log.debug("Removing transactions: {}", signedTransactions.map(_.hash.toHex)) knownTransactions = knownTransactions -- signedTransactions.map(_.hash) case ProperSignedTransactions(transactions, peerId) => @@ -160,6 +172,7 @@ class PendingTransactionsManager( transactions.foreach(stx => setTxKnown(stx.tx, peerId)) case ClearPendingTransactions => + log.debug("Dropping all cached transactions") pendingTransactions.invalidateAll() } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala index bd42dcf9e0..383126f518 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSyncSpec.scala @@ -295,7 +295,10 @@ class RegularSyncSpec "fetching state node" should { abstract class MissingStateNodeFixture(system: ActorSystem) extends Fixture(system) { val failingBlock: Block = testBlocksChunked.head.head - ledger.setImportResult(failingBlock, Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash)))) + ledger.setImportResult( + failingBlock, + Task.now(BlockImportFailedDueToMissingNode(new MissingNodeException(failingBlock.hash))) + ) } "blacklist peer which returns empty response" in sync(new MissingStateNodeFixture(testSystem) {