Skip to content

Commit

Permalink
[Kaizen] Add debug logs for PendingTransactionsManager
Browse files Browse the repository at this point in the history
  • Loading branch information
dzajkowski committed May 4, 2021
1 parent c38c2c2 commit acdc618
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Expand Up @@ -30,6 +30,10 @@ object ByteStringUtils {
}
}

implicit class ByteStringOps(val bytes: ByteString) extends AnyVal {
def toHex: String = Hex.toHexString(bytes.toArray[Byte])
}

sealed trait ByteStringElement {
def len: Int
def asByteArray: Array[Byte]
Expand Down
@@ -1,6 +1,6 @@
package io.iohk.ethereum.transactions

import akka.actor.{Actor, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.util.{ByteString, Timeout}
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.iohk.ethereum.domain.{SignedTransaction, SignedTransactionWithSender}
Expand All @@ -10,7 +10,9 @@ import io.iohk.ethereum.network.PeerManagerActor.Peers
import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId, PeerManagerActor}
import io.iohk.ethereum.transactions.SignedTransactionsFilterActor.ProperSignedTransactions
import io.iohk.ethereum.utils.ByteStringUtils.ByteStringOps
import io.iohk.ethereum.utils.TxPoolConfig

import scala.jdk.CollectionConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
Expand Down Expand Up @@ -52,7 +54,8 @@ class PendingTransactionsManager(
etcPeerManager: ActorRef,
peerEventBus: ActorRef
) extends Actor
with MetricsContainer {
with MetricsContainer
with ActorLogging {

import PendingTransactionsManager._
import akka.pattern.ask
Expand All @@ -77,16 +80,17 @@ class PendingTransactionsManager(
.maximumSize(txPoolConfig.txPoolSize)
.removalListener((notification: RemovalNotification[ByteString, PendingTransaction]) =>
if (notification.wasEvicted()) {
log.debug("Evicting transaction: {} due to {}", notification.getKey.toHex, notification.getCause)
knownTransactions = knownTransactions.filterNot(_._1 == notification.getKey)
}
)
.build()

implicit val timeout = Timeout(3.seconds)
implicit val timeout: Timeout = Timeout(3.seconds)

peerEventBus ! Subscribe(SubscriptionClassifier.PeerHandshaked)

val transactionFilter = context.actorOf(SignedTransactionsFilterActor.props(context.self, peerEventBus))
val transactionFilter: ActorRef = context.actorOf(SignedTransactionsFilterActor.props(context.self, peerEventBus))

// scalastyle:off method.length
override def receive: Receive = {
Expand All @@ -101,6 +105,7 @@ class PendingTransactionsManager(

case AddTransactions(signedTransactions) =>
pendingTransactions.cleanUp()
log.debug("Added transactions: {}", signedTransactions.map(_.tx.hash.toHex))
val stxs = pendingTransactions.asMap().values().asScala.map(_.stx).toSet
val transactionsToAdd = signedTransactions.diff(stxs)
if (transactionsToAdd.nonEmpty) {
Expand All @@ -115,6 +120,7 @@ class PendingTransactionsManager(

case AddOrOverrideTransaction(newStx) =>
pendingTransactions.cleanUp()
log.debug("Override transactions: {}", newStx.hash.toHex)
// Only validated transactions are added this way, it is safe to call get
val newStxSender = SignedTransaction.getSender(newStx).get
val obsoleteTxs = pendingTransactions
Expand Down Expand Up @@ -153,13 +159,15 @@ class PendingTransactionsManager(

case RemoveTransactions(signedTransactions) =>
pendingTransactions.invalidateAll(signedTransactions.map(_.hash).asJava)
log.debug("Removing transactions: {}", signedTransactions.map(_.hash.toHex))
knownTransactions = knownTransactions -- signedTransactions.map(_.hash)

case ProperSignedTransactions(transactions, peerId) =>
self ! AddTransactions(transactions)
transactions.foreach(stx => setTxKnown(stx.tx, peerId))

case ClearPendingTransactions =>
log.debug("Dropping all cached transactions")
pendingTransactions.invalidateAll()
}

Expand Down

0 comments on commit acdc618

Please sign in to comment.