/
PerformanceSpec.scala
229 lines (196 loc) · 7.31 KB
/
PerformanceSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.datareplication
import scala.concurrent.duration._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.Actor
import akka.actor.Props
import scala.concurrent.Await
import akka.actor.ActorRef
object PerformanceSpec extends MultiNodeConfig {
val n1 = role("n1")
val n2 = role("n2")
val n3 = role("n3")
val n4 = role("n4")
val n5 = role("n5")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = ERROR
akka.remote.log-frame-size-exceeding=1000b
akka.testconductor.barrier-timeout = 60 s
akka.contrib.data-replication.gossip-interval = 1 s
"""))
def countDownProps(latch: TestLatch): Props = Props(new CountDown(latch))
class CountDown(latch: TestLatch) extends Actor {
def receive = {
case _ =>
latch.countDown()
if (latch.isOpen)
context.stop(self)
}
}
}
class PerformanceSpecMultiJvmNode1 extends PerformanceSpec
class PerformanceSpecMultiJvmNode2 extends PerformanceSpec
class PerformanceSpecMultiJvmNode3 extends PerformanceSpec
class PerformanceSpecMultiJvmNode4 extends PerformanceSpec
class PerformanceSpecMultiJvmNode5 extends PerformanceSpec
class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpec with ImplicitSender {
import PerformanceSpec._
import Replicator._
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val replicator = DataReplication(system).replicator
val timeout = 3.seconds.dilated
val factor = 1
val repeatCount = 3 // use at least 10 here for serious tuning
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
def repeat(description: String, keys: Iterable[String], n: Int, expectedAfterReplication: Option[Set[Int]] = None)(
block: (String, Int, ActorRef) => Unit, afterEachKey: String => Unit = _ => ()): Unit = {
keys foreach { key =>
val startTime = System.nanoTime()
runOn(n1) {
val latch = TestLatch(n)
val replyTo = system.actorOf(countDownProps(latch))
for (i <- 0 until n)
block(key, i, replyTo)
Await.ready(latch, 5.seconds * factor)
}
expectedAfterReplication foreach { expected =>
enterBarrier("repeat-" + key + "-before-awaitReplicated")
awaitReplicated(key, expected)
enterBarrier("repeat-" + key + "-after-awaitReplicated")
}
runOn(n1) {
val endTime = System.nanoTime()
val durationMs = (endTime - startTime).nanos.toMillis
val tps = (n * 1000.0 / durationMs).toInt
println(s"## $n $description took $durationMs ms, $tps TPS")
}
afterEachKey(key)
enterBarrier("repeat-" + key + "-done")
}
}
def awaitReplicated(keys: Iterable[String], expectedData: Set[Int]): Unit =
keys foreach { key => awaitReplicated(key, expectedData) }
def awaitReplicated(key: String, expectedData: Set[Int]): Unit = {
within(20.seconds) {
awaitAssert {
val readProbe = TestProbe()
replicator.tell(Get(key), readProbe.ref)
val result = readProbe.expectMsgPF() { case GetSuccess(key, set: ORSet, _) ⇒ set }
result.value should be(expectedData)
}
}
}
"Performance" must {
"setup cluster" in {
roles foreach { join(_, n1) }
within(10.seconds) {
awaitAssert {
replicator ! Internal.GetNodeCount
expectMsg(Internal.NodeCount(roles.size))
}
}
enterBarrier("after-setup")
}
"be great for ORSet Update WriteOne" in {
val keys = (1 to repeatCount).map("A" + _)
val n = 500 * factor
val expectedData = (0 until n).toSet
repeat("ORSet Update WriteOne", keys, n)({ (key, i, replyTo) =>
replicator.tell(Update(key, ORSet())(_ + i), replyTo)
}, key => awaitReplicated(key, expectedData))
enterBarrier("after-1")
}
"be blazingly fast for ORSet Get ReadOne" in {
val keys = (1 to repeatCount).map("A" + _)
repeat("Get ReadOne", keys, 1000000 * factor) { (key, i, replyTo) =>
replicator.tell(Get(key), replyTo)
}
enterBarrier("after-2")
}
"be good for ORSet Update WriteOne and gossip replication" in {
val keys = (1 to repeatCount).map("B" + _)
val n = 500 * factor
val expected = Some((0 until n).toSet)
repeat("ORSet Update WriteOne + gossip", keys, n, expected) { (key, i, replyTo) =>
replicator.tell(Update(key, ORSet())(_ + i), replyTo)
}
enterBarrier("after-3")
}
"be good for ORSet Update WriteOne and gossip of existing keys" in {
val keys = (1 to repeatCount).map("B" + _)
val n = 500 * factor
val expected = Some((0 until n).toSet ++ (0 until n).map(-_).toSet)
repeat("ORSet Update WriteOne existing + gossip", keys, n, expected) { (key, i, replyTo) =>
replicator.tell(Update(key, ORSet())(_ + (-i)), replyTo)
}
enterBarrier("after-4")
}
"be good for ORSet Update WriteTwo and gossip replication" in {
val keys = (1 to repeatCount).map("C" + _)
val n = 500 * factor
val expected = Some((0 until n).toSet)
repeat("ORSet Update WriteTwo + gossip", keys, n, expected) { (key, i, replyTo) =>
replicator.tell(Update(key, ORSet(), WriteTwo, timeout)(_ + i), replyTo)
}
enterBarrier("after-5")
}
"be awesome for GCounter Update WriteOne" in {
val startTime = System.nanoTime()
val n = 100000 * factor
val key = "D"
runOn(n1, n2, n3) {
val latch = TestLatch(n)
val replyTo = system.actorOf(countDownProps(latch))
for (_ <- 0 until n)
replicator.tell(Update(key, GCounter())(_ + 1), replyTo)
Await.ready(latch, 5.seconds * factor)
enterBarrier("update-done-6")
runOn(n1) {
val endTime = System.nanoTime()
val durationMs = (endTime - startTime).nanos.toMillis
val tps = (3 * n * 1000.0 / durationMs).toInt
println(s"## ${3 * n} GCounter Update took $durationMs ms, $tps TPS")
}
}
runOn(n4, n5) {
enterBarrier("update-done-6")
}
within(20.seconds) {
awaitAssert {
val readProbe = TestProbe()
replicator.tell(Get(key), readProbe.ref)
val result = readProbe.expectMsgPF() { case GetSuccess(key, c: GCounter, _) ⇒ c }
result.value should be(3 * n)
}
}
enterBarrier("replication-done-6")
runOn(n1) {
val endTime = System.nanoTime()
val durationMs = (endTime - startTime).nanos.toMillis
val tps = (n * 1000.0 / durationMs).toInt
println(s"## $n GCounter Update + gossip took $durationMs ms, $tps TPS")
}
enterBarrier("after-6")
}
}
}