Skip to content

Commit 3d6808f

Browse files
author
Michal Mrozek
committed
EC-544 Improve pending transaction manager performance
1 parent 1298f1a commit 3d6808f

File tree

4 files changed

+65
-61
lines changed

4 files changed

+65
-61
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/RegularSync.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ class RegularSync(
361361
private def processBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = ledger.resolveBranch(headers) match {
362362
case NewBetterBranch(oldBranch) =>
363363
val transactionsToAdd = oldBranch.flatMap(_.body.transactionList)
364-
pendingTransactionsManager ! PendingTransactionsManager.AddTransactions(transactionsToAdd.toList)
364+
pendingTransactionsManager ! PendingTransactionsManager.AddTransactions(transactionsToAdd.toSet)
365365
val hashes = headers.take(blockBodiesPerRequest).map(_.hash)
366366
requestBlockBodies(peer, GetBlockBodies(hashes))
367367

@@ -505,7 +505,7 @@ class RegularSync(
505505

506506
private def updateTxAndOmmerPools(blocksAdded: Seq[Block], blocksRemoved: Seq[Block]): Unit = {
507507
blocksRemoved.headOption.foreach(block => ommersPool ! AddOmmers(block.header))
508-
blocksRemoved.foreach(block => pendingTransactionsManager ! AddTransactions(block.body.transactionList.toList))
508+
blocksRemoved.foreach(block => pendingTransactionsManager ! AddTransactions(block.body.transactionList.toSet))
509509

510510
blocksAdded.foreach { block =>
511511
ommersPool ! RemoveOmmers(block.header :: block.body.uncleNodesList.toList)

src/main/scala/io/iohk/ethereum/transactions/PendingTransactionsManager.scala

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,27 @@ package io.iohk.ethereum.transactions
22

33
import akka.actor.{Actor, ActorRef, Cancellable, Props}
44
import akka.util.{ByteString, Timeout}
5+
import com.google.common.cache.{Cache, CacheBuilder, RemovalCause, RemovalNotification}
56
import io.iohk.ethereum.domain.SignedTransaction
67
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
78
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier
89
import io.iohk.ethereum.network.PeerEventBusActor.{PeerEvent, PeerSelector, Subscribe, SubscriptionClassifier}
910
import io.iohk.ethereum.network.PeerManagerActor.Peers
10-
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId, PeerManagerActor}
1111
import io.iohk.ethereum.network.p2p.messages.CommonMessages.SignedTransactions
12+
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId, PeerManagerActor}
1213
import io.iohk.ethereum.utils.TxPoolConfig
13-
14-
import scala.concurrent.duration._
14+
import scala.collection.JavaConverters._
1515
import scala.concurrent.ExecutionContext.Implicits.global
16+
import scala.concurrent.duration._
1617

1718
object PendingTransactionsManager {
1819
def props(txPoolConfig: TxPoolConfig, peerManager: ActorRef, etcPeerManager: ActorRef, peerMessageBus: ActorRef): Props =
1920
Props(new PendingTransactionsManager(txPoolConfig, peerManager, etcPeerManager, peerMessageBus))
2021

21-
case class AddTransactions(signedTransactions: List[SignedTransaction])
22+
case class AddTransactions(signedTransactions: Set[SignedTransaction])
2223

2324
object AddTransactions{
24-
def apply(txs: SignedTransaction*): AddTransactions = AddTransactions(txs.toList)
25+
def apply(txs: SignedTransaction*): AddTransactions = AddTransactions(txs.toSet)
2526
}
2627

2728
case class AddOrOverrideTransaction(signedTransaction: SignedTransaction)
@@ -39,21 +40,28 @@ object PendingTransactionsManager {
3940
}
4041

4142
class PendingTransactionsManager(txPoolConfig: TxPoolConfig, peerManager: ActorRef,
42-
etcPeerManager: ActorRef, peerEventBus: ActorRef) extends Actor {
43+
etcPeerManager: ActorRef, peerEventBus: ActorRef) extends Actor {
4344

4445
import PendingTransactionsManager._
4546
import akka.pattern.ask
4647

47-
/**
48-
* stores all pending transactions
49-
*/
50-
var pendingTransactions: List[PendingTransaction] = Nil
51-
5248
/**
5349
* stores information which tx hashes are "known" by which peers
5450
*/
5551
var knownTransactions: Map[ByteString, Set[PeerId]] = Map.empty
5652

53+
/**
54+
* stores all pending transactions
55+
*/
56+
val pendingTransactions: Cache[ByteString, PendingTransaction] = CacheBuilder.newBuilder()
57+
.expireAfterWrite(txPoolConfig.transactionTimeout._1, txPoolConfig.transactionTimeout._2)
58+
.maximumSize(txPoolConfig.txPoolSize)
59+
.removalListener(
60+
(notification: RemovalNotification[ByteString, PendingTransaction]) => if (notification.getCause == RemovalCause.EXPIRED) {
61+
knownTransactions = knownTransactions.filterNot(_._1 == notification.getKey)
62+
}
63+
).build()
64+
5765
/**
5866
* stores transactions timeouts by tx hash
5967
*/
@@ -67,68 +75,61 @@ class PendingTransactionsManager(txPoolConfig: TxPoolConfig, peerManager: ActorR
6775
// scalastyle:off method.length
6876
override def receive: Receive = {
6977
case PeerEvent.PeerHandshakeSuccessful(peer, _) =>
70-
self ! NotifyPeer(pendingTransactions.map(_.stx), peer)
78+
pendingTransactions.cleanUp()
79+
val stxs = pendingTransactions.asMap().values().asScala.toSeq.map(_.stx)
80+
self ! NotifyPeer(stxs, peer)
7181

7282
case AddTransactions(signedTransactions) =>
73-
val transactionsToAdd = signedTransactions.filterNot(t => pendingTransactions.map(_.stx).contains(t))
83+
pendingTransactions.cleanUp()
84+
val stxs = pendingTransactions.asMap().values().asScala.map(_.stx).toSet
85+
val transactionsToAdd = signedTransactions.diff(stxs)
7486
if (transactionsToAdd.nonEmpty) {
75-
transactionsToAdd.foreach(setTimeout)
7687
val timestamp = System.currentTimeMillis()
77-
pendingTransactions = (transactionsToAdd.map(PendingTransaction(_, timestamp)) ++ pendingTransactions).take(txPoolConfig.txPoolSize)
88+
transactionsToAdd.foreach(t => pendingTransactions.put(t.hash, PendingTransaction(t, timestamp)))
7889
(peerManager ? PeerManagerActor.GetPeers).mapTo[Peers].foreach { peers =>
79-
peers.handshaked.foreach { peer => self ! NotifyPeer(transactionsToAdd, peer) }
90+
peers.handshaked.foreach { peer => self ! NotifyPeer(transactionsToAdd.toSeq, peer) }
8091
}
8192
}
8293

8394
case AddOrOverrideTransaction(newStx) =>
84-
val (obsoleteTxs, txsWithoutObsoletes) = pendingTransactions.partition(ptx =>
85-
ptx.stx.senderAddress == newStx.senderAddress &&
86-
ptx.stx.tx.nonce == newStx.tx.nonce)
87-
obsoleteTxs.map(_.stx).foreach(clearTimeout)
95+
pendingTransactions.cleanUp()
96+
val obsoleteTxs = pendingTransactions.asMap().asScala.filter(
97+
ptx => ptx._2.stx.senderAddress == newStx.senderAddress && ptx._2.stx.tx.nonce == newStx.tx.nonce
98+
)
99+
pendingTransactions.invalidateAll(obsoleteTxs.keys.asJava)
88100

89101
val timestamp = System.currentTimeMillis()
90-
pendingTransactions = (PendingTransaction(newStx, timestamp) +: txsWithoutObsoletes).take(txPoolConfig.txPoolSize)
91-
setTimeout(newStx)
102+
pendingTransactions.put(newStx.hash, PendingTransaction(newStx, timestamp))
92103

93-
(peerManager ? PeerManagerActor.GetPeers).mapTo[Peers].foreach { peers =>
94-
peers.handshaked.foreach { peer => self ! NotifyPeer(List(newStx), peer) }
104+
(peerManager ? PeerManagerActor.GetPeers).mapTo[Peers].foreach {
105+
peers => peers.handshaked.foreach { peer => self ! NotifyPeer(List(newStx), peer) }
95106
}
96107

97108
case NotifyPeer(signedTransactions, peer) =>
109+
pendingTransactions.cleanUp()
98110
val txsToNotify = signedTransactions
99-
.filter(stx => pendingTransactions.exists(_.stx.hash == stx.hash)) // signed transactions that are still pending
111+
.filter(stx => pendingTransactions.asMap().containsKey(stx.hash)) // signed transactions that are still pending
100112
.filterNot(isTxKnown(_, peer.id)) // and not known by peer
101113

102-
if (txsToNotify.nonEmpty) {
103-
etcPeerManager ! EtcPeerManagerActor.SendMessage(SignedTransactions(txsToNotify), peer.id)
104-
txsToNotify.foreach(setTxKnown(_, peer.id))
105-
}
114+
if (txsToNotify.nonEmpty) {
115+
etcPeerManager ! EtcPeerManagerActor.SendMessage(SignedTransactions(txsToNotify), peer.id)
116+
txsToNotify.foreach(setTxKnown(_, peer.id))
117+
}
106118

107119
case GetPendingTransactions =>
108-
sender() ! PendingTransactionsResponse(pendingTransactions)
120+
pendingTransactions.cleanUp()
121+
sender() ! PendingTransactionsResponse(pendingTransactions.asMap().asScala.values.toSeq)
109122

110123
case RemoveTransactions(signedTransactions) =>
111-
pendingTransactions = pendingTransactions.filterNot(pt => signedTransactions.contains(pt.stx))
112-
knownTransactions = knownTransactions.filterNot(signedTransactions.map(_.hash).contains)
113-
signedTransactions.foreach(clearTimeout)
124+
pendingTransactions.invalidateAll(signedTransactions.map(_.hash).asJava)
125+
knownTransactions = knownTransactions -- signedTransactions.map(_.hash)
114126

115127
case MessageFromPeer(SignedTransactions(signedTransactions), peerId) =>
116-
self ! AddTransactions(signedTransactions.toList)
128+
self ! AddTransactions(signedTransactions.toSet)
117129
signedTransactions.foreach(setTxKnown(_, peerId))
118130

119131
case ClearPendingTransactions =>
120-
pendingTransactions = Nil
121-
}
122-
123-
private def setTimeout(stx: SignedTransaction): Unit = {
124-
timeouts.get(stx.hash).map(_.cancel())
125-
val cancellable = context.system.scheduler.scheduleOnce(txPoolConfig.transactionTimeout, self, RemoveTransactions(Seq(stx)))
126-
timeouts += (stx.hash -> cancellable)
127-
}
128-
129-
private def clearTimeout(stx: SignedTransaction): Unit = {
130-
timeouts.get(stx.hash).map(_.cancel())
131-
timeouts -= stx.hash
132+
pendingTransactions.invalidateAll()
132133
}
133134

134135
private def isTxKnown(signedTransaction: SignedTransaction, peerId: PeerId): Boolean =

src/test/scala/io/iohk/ethereum/blockchain/sync/RegularSyncSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
7373
sendNewBlockMsg(newBlock)
7474

7575
ommersPool.expectMsg(AddOmmers(List(oldBlock.header)))
76-
txPool.expectMsg(AddTransactions(oldBlock.body.transactionList.toList))
76+
txPool.expectMsg(AddTransactions(oldBlock.body.transactionList.toSet))
7777

7878
ommersPool.expectMsg(RemoveOmmers(newBlock.header :: newBlock.body.uncleNodesList.toList))
7979
txPool.expectMsg(RemoveTransactions(newBlock.body.transactionList.toList))
@@ -213,7 +213,7 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
213213
sendMinedBlockMsg(newBlock)
214214

215215
ommersPool.expectMsg(AddOmmers(List(oldBlock.header)))
216-
txPool.expectMsg(AddTransactions(oldBlock.body.transactionList.toList))
216+
txPool.expectMsg(AddTransactions(oldBlock.body.transactionList.toSet))
217217

218218
ommersPool.expectMsg(RemoveOmmers(newBlock.header :: newBlock.body.uncleNodesList.toList))
219219
txPool.expectMsg(RemoveTransactions(newBlock.body.transactionList.toList))
@@ -268,7 +268,7 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
268268

269269
etcPeerManager.expectMsg(EtcPeerManagerActor.GetHandshakedPeers)
270270
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(GetBlockBodies(newBlocks.map(_.header.hash)), peer1.id))
271-
txPool.expectMsg(AddTransactions(oldBlocks.flatMap(_.body.transactionList).toList))
271+
txPool.expectMsg(AddTransactions(oldBlocks.flatMap(_.body.transactionList).toSet))
272272
ommersPool.expectMsg(AddOmmers(oldBlocks.head.header))
273273
}
274274

src/test/scala/io/iohk/ethereum/transactions/PendingTransactionsManagerSpec.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.iohk.ethereum.transactions
22

33
import java.net.InetSocketAddress
4-
54
import akka.actor.ActorSystem
65
import akka.pattern.ask
76
import akka.testkit.TestProbe
@@ -21,7 +20,6 @@ import io.iohk.ethereum.utils.TxPoolConfig
2120
import org.scalatest.concurrent.ScalaFutures
2221
import org.scalatest.{FlatSpec, Matchers}
2322
import org.bouncycastle.crypto.AsymmetricCipherKeyPair
24-
2523
import scala.concurrent.duration._
2624

2725
class PendingTransactionsManagerSpec extends FlatSpec with Matchers with ScalaFutures with NormalPatience {
@@ -70,28 +68,33 @@ class PendingTransactionsManagerSpec extends FlatSpec with Matchers with ScalaFu
7068
pendingTransactionsManager ! MessageFromPeer(msg1, peer1.id)
7169
peerManager.expectMsg(PeerManagerActor.GetPeers)
7270
peerManager.reply(Peers(Map(peer1 -> Handshaked, peer2 -> Handshaked, peer3 -> Handshaked)))
73-
etcPeerManager.expectMsgAllOf(
74-
EtcPeerManagerActor.SendMessage(SignedTransactions(msg1.txs), peer2.id),
75-
EtcPeerManagerActor.SendMessage(SignedTransactions(msg1.txs), peer3.id)
71+
72+
val resps1 = etcPeerManager.expectMsgAllConformingOf(
73+
classOf[EtcPeerManagerActor.SendMessage], classOf[EtcPeerManagerActor.SendMessage]
7674
)
75+
76+
resps1.map(_.peerId) should contain allOf (peer2.id, peer3.id)
77+
resps1.map(_.message.underlyingMsg).foreach { case SignedTransactions(txs) => txs.toSet shouldEqual msg1.txs.toSet }
7778
etcPeerManager.expectNoMsg()
7879

7980
val msg2 = SignedTransactions(Seq.fill(5)(newStx()))
8081
pendingTransactionsManager ! MessageFromPeer(msg2, peer2.id)
8182
peerManager.expectMsg(PeerManagerActor.GetPeers)
8283
peerManager.reply(Peers(Map(peer1 -> Handshaked, peer2 -> Handshaked, peer3 -> Handshaked)))
83-
etcPeerManager.expectMsgAllOf(
84-
EtcPeerManagerActor.SendMessage(SignedTransactions(msg2.txs), peer1.id),
85-
EtcPeerManagerActor.SendMessage(SignedTransactions(msg2.txs), peer3.id)
84+
85+
val resps2 = etcPeerManager.expectMsgAllConformingOf(
86+
classOf[EtcPeerManagerActor.SendMessage], classOf[EtcPeerManagerActor.SendMessage]
8687
)
88+
resps2.map(_.peerId) should contain allOf (peer1.id, peer3.id)
89+
resps2.map(_.message.underlyingMsg).foreach { case SignedTransactions(txs) => txs.toSet shouldEqual msg2.txs.toSet }
8790
etcPeerManager.expectNoMsg()
8891

8992
pendingTransactionsManager ! RemoveTransactions(msg1.txs.dropRight(4))
9093
pendingTransactionsManager ! RemoveTransactions(msg2.txs.drop(2))
9194

9295
val pendingTxs = (pendingTransactionsManager ? GetPendingTransactions).mapTo[PendingTransactionsResponse].futureValue
9396
pendingTxs.pendingTransactions.size shouldBe 6
94-
pendingTxs.pendingTransactions.map(_.stx) shouldBe msg2.txs.take(2) ++ msg1.txs.takeRight(4)
97+
pendingTxs.pendingTransactions.map(_.stx).toSet shouldBe (msg2.txs.take(2) ++ msg1.txs.takeRight(4)).toSet
9598
}
9699

97100
it should "not add pending transaction again when it was removed while waiting for peers" in new TestSetup {
@@ -132,7 +135,7 @@ class PendingTransactionsManagerSpec extends FlatSpec with Matchers with ScalaFu
132135
val pendingTxs = (pendingTransactionsManager ? GetPendingTransactions).mapTo[PendingTransactionsResponse]
133136
.futureValue.pendingTransactions
134137

135-
pendingTxs.map(_.stx) shouldEqual List(overrideTx, otherTx)
138+
pendingTxs.map(_.stx).toSet shouldEqual Set(overrideTx, otherTx)
136139

137140
// overriden TX will still be broadcast to peers
138141
etcPeerManager.expectMsgAllOf(

0 commit comments

Comments
 (0)