forked from twitter-archive/iago
/
ParrotUdpTransportSpec.scala
191 lines (157 loc) · 6.86 KB
/
ParrotUdpTransportSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.parrot.server
import com.twitter.conversions.time._
import com.twitter.finagle.RequestTimeoutException
import com.twitter.io.TempFile
import com.twitter.ostrich.stats.Stats
import com.twitter.parrot.config.{ParrotFeederConfig, ParrotServerConfig}
import com.twitter.parrot.feeder.{InMemoryLog, ParrotFeeder}
import com.twitter.parrot.processor.{RecordProcessor, RecordProcessorFactory}
import com.twitter.parrot.thrift.ParrotJob
import com.twitter.parrot.thrift.TargetHost
import com.twitter.util.Try
import com.twitter.util.{RandomSocket, Eval}
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import org.jboss.netty.bootstrap.ConnectionlessBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory
import org.jboss.netty.handler.codec.string.{StringDecoder, StringEncoder}
import org.jboss.netty.util.CharsetUtil
import org.specs.SpecificationWithJUnit
class ParrotUdpTransportSpec extends SpecificationWithJUnit {
"Parrot UDP Transport" should {
"work inside a server config" in {
val serverConfig = makeServerConfig()
val server: ParrotServer[ParrotRequest, String] = new ParrotServerImpl(serverConfig)
server mustNotBe null
}
"send requests to an 'echo' service" in {
val victimPort = RandomSocket.nextPort()
val serverConfig = makeServerConfig()
val transport = serverConfig.transport.getOrElse(fail("no transport configured"))
val victim = new UdpEchoServer(victimPort)
victim.start()
val script = List("abc", "is as easy as", "123")
var targetHost = new TargetHost("", "127.0.0.1", victimPort)
script.foreach { request =>
val parrotRequest = new ParrotRequest(targetHost, rawLine = request)
val response: String = transport.sendRequest(parrotRequest).get()
response mustEqual "echo<" + request + ">"
}
transport.shutdown()
victim.stop()
}
"timeout if the service does not respond quickly enough" in {
val victimPort = RandomSocket.nextPort()
val serverConfig = makeServerConfig()
val transport = serverConfig.transport.getOrElse(fail("no transport configured"))
transport.asInstanceOf[ParrotUdpTransport[ParrotRequest, String]].requestTimeout = Some(100.milliseconds)
val victim = new UdpEchoServer(victimPort, true)
victim.start()
var targetHost = new TargetHost("", "localhost", victimPort)
val parrotRequest = new ParrotRequest(targetHost, rawLine = "data")
Stats.getCounter("udp_request_timeout").reset()
val result: Try[String] = transport.sendRequest(parrotRequest).get(1.minute)
result() must throwA[RequestTimeoutException]
Stats.getCounter("udp_request_timeout")() must eventually(be(1L))
transport.shutdown()
victim.stop()
}
"work in the context of feeder and server" in {
val victimPort = RandomSocket.nextPort()
val serverConfig = makeServerConfig()
RecordProcessorFactory.registerProcessor("default", new RecordProcessor {
val service = serverConfig.service.get
def processLines(job: ParrotJob, lines: Seq[String]) {
lines flatMap { line =>
val target = job.victims.get(0)
Some(service(new ParrotRequest(target, None, Nil, null, line)))
}
}
})
val transport = serverConfig.transport.getOrElse(fail("no transport configured"))
val victim = new UdpEchoServer(victimPort)
victim.start()
val requestStrings = List("a", "square peg", "cannot fit into a round hole")
val server: ParrotServer[ParrotRequest, String] = new ParrotServerImpl(serverConfig)
server.start()
val feederConfig = makeFeederConfig(serverConfig, victimPort)
feederConfig.logSource = Some(new InMemoryLog(requestStrings))
val feeder = new ParrotFeeder(feederConfig)
feeder.start()
try {
{ transport.asInstanceOf[ParrotUdpTransport[ParrotRequest, String]].allRequests.get } must
eventually(be(requestStrings.size))
} finally {
feeder.shutdown() // this will implicitly shut down the server as well
}
}
}
def makeServerConfig() = {
val result = new Eval().apply[ParrotServerConfig[ParrotRequest, String]](
TempFile.fromResourcePath("/test-udp.scala")
)
result.parrotPort = RandomSocket().getPort
result.thriftServer = Some(new ThriftServerImpl) // ParrotServer throws otherwise
result
}
def makeFeederConfig(serverConfig: ParrotServerConfig[ParrotRequest, String], victimPort: Int) = {
val result = new Eval().apply[ParrotFeederConfig](TempFile.fromResourcePath("/test-feeder.scala"))
result.parrotPort = serverConfig.parrotPort
result.victimHosts = List("localhost")
result.victimPort = victimPort
result.requestRate = 1000
result
}
}
class UdpEchoHandler(val blackHole: Boolean) extends SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
val input = e.getMessage.asInstanceOf[String]
if (!blackHole) {
// NIO: this will fail with NioDatagramChannelFactory (see NIO below)
val writeFuture = e.getChannel.write("echo<" + input + ">", e.getRemoteAddress)
writeFuture.addListener(new ChannelFutureListener {
def operationComplete(f: ChannelFuture) {
if (!f.isSuccess) {
println("response write failed: " + f.getCause.toString)
f.getCause.printStackTrace
}
}
})
}
}
}
class UdpEchoServer(val port: Int, val blackHole: Boolean = false) {
// NIO: Netty 3.4.0.Alpha1 NioDatagramChannelFactory seems broken (see NIO above)
val factory = new OioDatagramChannelFactory(Executors.newCachedThreadPool());
@volatile var channel: Option[Channel] = None
def start() {
val bootstrap = new ConnectionlessBootstrap(factory);
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
override def getPipeline(): ChannelPipeline = {
Channels.pipeline(
new StringEncoder(CharsetUtil.UTF_8),
new StringDecoder(CharsetUtil.UTF_8),
new UdpEchoHandler(blackHole))
}
});
channel = Some(bootstrap.bind(new InetSocketAddress(port)))
}
def stop() {
channel.foreach { _.close() }
factory.releaseExternalResources()
}
}