Skip to content
Browse files

Twitter no longer maintains killdeer

  • Loading branch information...
1 parent 0fcf9a8 commit 33431ca4807e224b2cb928829c9af5cf5d750b27 @dhelder dhelder committed
Showing with 1 addition and 1,143 deletions.
  1. +0 −91 Capfile
  2. +1 −29 README
  3. +0 −49 config/Dots.scala
  4. +0 −23 config/Record.scala
  5. +0 −21 config/Replay.scala
  6. +0 −91 deploy/strategy/build.rb
  7. +0 −9 project/build.properties
  8. +0 −30 project/build/Project.scala
  9. +0 −7 project/plugins/Plugins.scala
  10. +0 −24 src/main/scala/com/twitter/killdeer/Cdf.scala
  11. +0 −15 src/main/scala/com/twitter/killdeer/DebuggingHandler.scala
  12. +0 −79 src/main/scala/com/twitter/killdeer/Deerkill.scala
  13. +0 −27 src/main/scala/com/twitter/killdeer/DotsHandler.scala
  14. +0 −117 src/main/scala/com/twitter/killdeer/FaultInjector.scala
  15. +0 −84 src/main/scala/com/twitter/killdeer/HttpFaultInjector.scala
  16. +0 −20 src/main/scala/com/twitter/killdeer/KeepAliveHandler.scala
  17. +0 −34 src/main/scala/com/twitter/killdeer/Killdeer.scala
  18. +0 −17 src/main/scala/com/twitter/killdeer/LatencyHandler.scala
  19. +0 −12 src/main/scala/com/twitter/killdeer/LoggingHandler.scala
  20. +0 −11 src/main/scala/com/twitter/killdeer/Main.scala
  21. +0 −75 src/main/scala/com/twitter/killdeer/ProxyHandler.scala
  22. +0 −11 src/main/scala/com/twitter/killdeer/RecordedRequest.scala
  23. +0 −49 src/main/scala/com/twitter/killdeer/RecordedResponse.scala
  24. +0 −34 src/main/scala/com/twitter/killdeer/RecordedResponseHandler.scala
  25. +0 −36 src/main/scala/com/twitter/killdeer/ResponseRecorderHandler.scala
  26. +0 −119 src/main/scala/com/twitter/killdeer/ThrottlingHandler.scala
  27. +0 −13 src/scripts/dk
  28. +0 −13 src/scripts/kd
  29. +0 −3 src/test/resources/response-sample.txt
View
91 Capfile
@@ -1,91 +0,0 @@
-# regular deploys are as easy as "cap deploy"
-# this will do a git clone of the code, build a package, sftp the package to the machine.
-
-# Command line arguments:
-#
-# key=/path/to/ec2.pem (default: ~/.ec2.pem)
-# host=ec2-123-example-amazon.com (no default, must specify)
-
-require 'deploy/strategy/build'
-
-BEGIN {
- KEY = ENV['key'] || "#{ENV['HOME']}/.ec2.pem"
- system "ssh-add '#{KEY}'"
-}
-
-load 'deploy'
-
-set :ssh_options, {:auth_methods => %w{ publickey }, :keys => [KEY] }
-
-set :user, ENV['user'] || 'root'
-set :application, File.basename(`git config --get remote.origin.url`.chomp, '.*')
-
-# Remote paths
-set :deploy_to, "/tmp/#{application}"
-set :currentloc, "#{deploy_to}/current"
-set :logloc, "#{deploy_to}/log"
-set :log, "#{logloc}/#{application}.log"
-
-set :repository, "git@github.com:twitter/#{application}.git"
-set :branch, "master"
-set :scm, :git
-set :strategy, Capistrano::Deploy::Strategy::Build.new(self)
-set :copy_cache, true
-
-set :remote_unzip_dir, release_name
-set :keep_releases, 3
-set :build_task, "sbt clean update package-dist"
-set :deploy_via, :copy
-set :dist_path, "dist"
-
-version = `git rev-parse HEAD`[0..7]
-set :version, version
-
-set :copy_compression, :zip
-set :package_name, "#{application}-#{version}.#{copy_compression}"
-
-hosts = [ENV['host'] || abort("You must specify a host with host=yourhost.com")]
-
-role :app, *hosts
-
-namespace :deploy do
- [:finalize_update, :restart].each do |default_task|
- task default_task do
- # nothing
- end
- end
-
- before 'deploy', 'deploy:setup'
- after 'deploy', 'deploy:restart'
-
- desc "Directory setup of remote machines."
- task :setup, :roles => :app do
- run "mkdir -p #{deploy_to}"
- run "mkdir -p #{deploy_to}/releases"
- run "mkdir -p #{logloc}"
- run "mkdir -p /var/log/#{application}"
- end
-
- desc "Start"
- task :start, :roles => :app do
- run "cd #{currentloc}; nohup bash #{currentloc}/scripts/#{application}.sh -f config/production.conf &"
- end
-
- desc "Stop"
- task :stop, :roles => :app do
- run "pkill -f #{application} &>/dev/null || exit 0"
- end
-
- desc "Restart"
- task :restart do
- stop
- start
- end
-
- task :update_code, :roles => :app do
- on_rollback { run "rm -rf #{release_path}; true" }
- run "mkdir -p #{release_path}"
- strategy.deploy!
- finalize_update
- end
-end
View
30 README
@@ -1,29 +1 @@
-Killdeer
-========
-
-Killdeer is a simple server for replaying a sample of responses to
-sythentically recreate production response characteristics.
-
-Build
-=====
-$ sbt update
-
-Run
-===
-
-$ sbt
-> run config/Dots.scala
-
-The response-sample.txt represents a response per line. Each request made to
-killdeer replays the responses in the sample file. When it reaches the end it
-loops back to the beginning. The file's format is currently not configurable.
-The columns represent the following:
-
-response-time-in-secs response-size response-status-code
-
-$ ab -n 1000 -c 10 http://localhost:6666/
-
-Authors
-=======
-Marcel Molina
-Steve Jenson
+Twitter no longer maintains Killdeer.
View
49 config/Dots.scala
@@ -1,49 +0,0 @@
-import com.twitter.killdeer._
-import org.jboss.netty.util.HashedWheelTimer
-import org.jboss.netty.channel.Channels
-import java.util.concurrent.TimeUnit
-import org.jboss.netty.handler.codec.http.{HttpChunkAggregator, HttpRequestDecoder, HttpResponseEncoder}
-import java.net.InetSocketAddress
-import util.Random
-
-new Config {
- val rng = new Random
-
- val contentLengthCdf = Cdf(
- 0.5 -> 500,
- 1.0 -> 1000
- )
- val latencyCdf = Cdf(
- 0.5 -> 1,
- 1.0 -> 1000
- )
- val timer = new HashedWheelTimer(1, TimeUnit.MILLISECONDS)
-
- val bandwidthCdf = Cdf(
- 0.5 -> 10000,
- 0.75 -> 1000,
- 0.95 -> 100,
- 1.0 -> 10
- )
-
- def pipeline = {
- val pipeline = Channels.pipeline()
-
- if (rng.nextFloat < 0.5) // else: unlimited bandwidth
- pipeline.addLast("throttler", new DownstreamThrottlingHandler(bandwidthCdf(), timer))
- pipeline.addLast("faults", new UpstreamFaultInjectorHandler(
- new TimeoutFaultInjector -> 0.005,
- new ConnectionDisconnectFaultInjector -> 0.1
- ))
- pipeline.addLast("decoder", new HttpRequestDecoder)
- pipeline.addLast("encoder", new HttpResponseEncoder)
- pipeline.addLast("http-faults", new HttpFaultInjectorHandler(
- ContentLengthManglingHttpFaultInjector -> 0.1,
- new ChunkDroppingHttpFaultInjector(0.3f) -> 0.1,
- ContentLengthDroppingHttpFaultInjector -> 0.1
- ))
- pipeline.addLast("latency", new LatencyHandler(timer, latencyCdf))
- pipeline.addLast("dots", new DotsHandler(contentLengthCdf))
- pipeline
- }
-}
View
23 config/Record.scala
@@ -1,23 +0,0 @@
-import com.twitter.killdeer._
-import org.jboss.netty.channel.Channels
-import org.jboss.netty.handler.codec.http.{HttpChunkAggregator, HttpRequestDecoder, HttpResponseEncoder}
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import java.net.InetSocketAddress
-import java.util.concurrent.Executors
-
-new Config {
- val executor = Executors.newCachedThreadPool()
- val clientSocketChannelFactory = new NioClientSocketChannelFactory(executor, executor)
-
- def pipeline = {
- val pipeline = Channels.pipeline()
-
- pipeline.addLast("decoder", new HttpRequestDecoder)
- pipeline.addLast("encoder", new HttpResponseEncoder)
-
- pipeline.addLast("log", new LoggingHandler)
- pipeline.addLast("recordReturned", new ResponseRecorderHandler(System.getProperty("user.home") + "/.killdeer/responses/"))
- pipeline.addLast("proxy", new ProxyHandler(new InetSocketAddress("localhost", 80), clientSocketChannelFactory))
- pipeline
- }
-}
View
21 config/Replay.scala
@@ -1,21 +0,0 @@
-import com.twitter.killdeer._
-import org.jboss.netty.channel.Channels
-import org.jboss.netty.util.HashedWheelTimer
-import java.util.concurrent.TimeUnit
-import org.jboss.netty.handler.codec.http.{HttpChunkAggregator, HttpRequestDecoder, HttpResponseEncoder}
-import java.net.InetSocketAddress
-
-new Config {
- val timer = new HashedWheelTimer(1, TimeUnit.MILLISECONDS)
- val latencyCdf = Cdf("0.5:001,1.0:1000")
-
- def pipeline = {
- val pipeline = Channels.pipeline()
-
- pipeline.addLast("decoder", new HttpRequestDecoder)
- pipeline.addLast("encoder", new HttpResponseEncoder)
- pipeline.addLast("latency", new LatencyHandler(timer, latencyCdf))
- pipeline.addLast("returnRecorded", new RecordedResponseHandler(System.getProperty("user.home") + "/.killdeer/responses/"))
- pipeline
- }
-}
View
91 deploy/strategy/build.rb
@@ -1,91 +0,0 @@
-require 'capistrano/recipes/deploy/strategy/copy'
-require 'fileutils'
-require 'tempfile'
-
-module Capistrano
- module Deploy
- module Strategy
- class Build < Copy
- def deploy!
- if copy_cache
- if File.exists?(copy_cache)
- logger.debug "refreshing local cache to revision #{revision} at #{copy_cache}"
- system(source.sync(revision, copy_cache))
- else
- logger.debug "preparing local cache at #{copy_cache}"
- system(source.checkout(revision, copy_cache))
- end
-
- logger.debug "copying cache to deployment staging area #{destination}"
- Dir.chdir(copy_cache) do
- FileUtils.mkdir_p(destination)
- queue = Dir.glob("*", File::FNM_DOTMATCH)
- while queue.any?
- item = queue.shift
- name = File.basename(item)
-
- next if name == "." || name == ".."
- next if copy_exclude.any? { |pattern| File.fnmatch(pattern, item) }
-
- if File.symlink?(item)
- FileUtils.ln_s(File.readlink(File.join(copy_cache, item)), File.join(destination, item))
- elsif File.directory?(item)
- queue += Dir.glob("#{item}/*", File::FNM_DOTMATCH)
- FileUtils.mkdir(File.join(destination, item))
- else
- FileUtils.ln(File.join(copy_cache, item), File.join(destination, item))
- end
- end
- end
- else
- logger.debug "getting (via #{copy_strategy}) revision #{revision} to #{destination}"
- system(command)
-
- if copy_exclude.any?
- logger.debug "processing exclusions..."
- copy_exclude.each { |pattern| FileUtils.rm_rf(Dir.glob(File.join(destination, pattern), File::FNM_DOTMATCH)) }
- end
- end
-
- raise unless system("cd #{source_folder} && #{build_task}")
- remote_filename = File.join(releases_path, package)
-
- begin
- upload(filename, remote_filename, :via => :scp)
- run "cd #{release_path} && #{decompress(remote_filename).join(" ")}"
- ensure
- run "rm -f #{remote_filename}"
- end
-
- ensure
- FileUtils.rm filename rescue nil
- FileUtils.rm_rf destination rescue nil
- end
-
- def build_task
- @build_task ||= configuration[:build_task]
- end
-
- def copy_compression
- configuration[:copy_compression]
- end
-
- def dist_path
- configuration[:dist_path]
- end
-
- def filename
- @filename ||= File.join(tmpdir, File.basename(destination), dist_path, package)
- end
-
- def package
- configuration[:package_name] || "#{application}-#{revision[0, 8]}.#{copy_compression}"
- end
-
- def source_folder
- File.join(tmpdir, File.basename(destination))
- end
- end
- end
- end
-end
View
9 project/build.properties
@@ -1,9 +0,0 @@
-#
-#Tue Aug 03 09:59:07 PDT 2010
-project.name=Killdeer
-project.organization=com.twitter
-project.version=0.5.1
-sbt.version=0.7.4
-def.scala.version=2.7.7
-build.scala.versions=2.8.0
-project.initialize=false
View
30 project/build/Project.scala
@@ -1,30 +0,0 @@
-import sbt._
-import com.twitter.sbt._
-
-class Project(info: ProjectInfo)
- extends StandardProject(info)
- with SubversionPublisher
- with InlineDependencies
-{
- override def managedStyle = ManagedStyle.Maven
- override def disableCrossPaths = true
- override def subversionRepository = Some("http://svn.local.twitter.com/maven-public")
-
- val codehaus = "codehaus" at "http://repository.codehaus.org/"
- val jboss = "repository.jboss.org" at "http://repository.jboss.org/nexus/content/groups/public/"
-
- val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5"
-
- // Twitter/inline deps:
- inline("com.twitter" % "util" % "1.2.4")
-
- // Necessary for Eval because of a bug in SBT:
- val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.0" % "compile"
- override def filterScalaJars = false
-
- val netty = "org.jboss.netty" % "netty" % "3.2.2.Final"
- val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.6.0"
- val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.6.0"
-
- override def mainClass = Some("com.twitter.killdeer.Main")
-}
View
7 project/plugins/Plugins.scala
@@ -1,7 +0,0 @@
-import sbt._
-
-class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
- val scalaToolsReleases = "scala-tools.org" at "http://scala-tools.org/repo-releases/"
- val twitterMaven = "twitter.com" at "http://maven.twttr.com/"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.17"
-}
View
24 src/main/scala/com/twitter/killdeer/Cdf.scala
@@ -1,24 +0,0 @@
-package com.twitter.killdeer
-
-import scala.util.Random
-
-object Cdf {
- def apply(str: String) = new Cdf(
- for {
- pctile <- str.split(",")
- Array(pct, x) = pctile.split(":")
- } yield (pct.toDouble, x.toInt)
- )
-
- def apply[T](spec: Tuple2[Double, T]*) = new Cdf(spec)
-}
-
-class Cdf[T](spec: Seq[Tuple2[Double, T]]) {
- val rng = new Random
-
- def apply() = {
- val pctPick = rng.nextFloat
- val Some((_, value)) = spec.find { case (pct, _) => pctPick < pct }
- value
- }
-}
View
15 src/main/scala/com/twitter/killdeer/DebuggingHandler.scala
@@ -1,15 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-
-class DebuggingHandler extends SimpleChannelHandler {
- override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
- println("UPSTREAM: %s".format(e))
- super.handleUpstream(ctx, e)
- }
-
- override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
- println("DOWNSTREAM: %s".format(e))
- super.handleDownstream(ctx, e)
- }
-}
View
79 src/main/scala/com/twitter/killdeer/Deerkill.scala
@@ -1,79 +0,0 @@
-package com.twitter.killdeer
-
-import scala.io.Source
-import scala.collection.mutable.Queue
-import java.util.concurrent.Executors
-import org.jboss.netty.bootstrap.ClientBootstrap
-import org.jboss.netty.channel._
-import org.jboss.netty.util._
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.http._
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
-
-object Deerkill {
- def apply(args: Seq[String]) {
- val host = args(0)
- val port = args(1).toInt
- val address = new InetSocketAddress(host, port)
- val concurrency = args(2).toInt
- val timeBetweenRequestMillis = args(3).toInt
- val timer = new HashedWheelTimer
- val requests = Source.fromInputStream(System.in).getLines map { requestLine =>
- val recordedRequest = RecordedRequest(requestLine)
- val httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, recordedRequest.uri)
- httpRequest.addHeader("Authorization", "UserId " + recordedRequest.userId)
- httpRequest.addHeader("X-Transaction", recordedRequest.transactionId)
- httpRequest.addHeader("Host", host)
- httpRequest
- }
- val queue = new collection.mutable.SynchronizedQueue[HttpRequest]
- queue ++= requests
-
- val executor = Executors.newCachedThreadPool()
- val socketChannelFactory = new NioClientSocketChannelFactory(executor, executor)
- val bootstrap = new ClientBootstrap(socketChannelFactory)
- bootstrap.setPipelineFactory(new ChannelPipelineFactory {
- def getPipeline = {
- val pipeline = Channels.pipeline()
- pipeline.addLast("decoder", new HttpResponseDecoder)
- pipeline.addLast("encoder", new HttpRequestEncoder)
- pipeline.addLast("outbound", new OutboundHandler(bootstrap, address, timer, queue))
- pipeline
- }
- })
- (0 until concurrency) foreach { _ =>
- bootstrap.connect(address)
- }
- }
-}
-
-class OutboundHandler(bootstrap: ClientBootstrap, address: InetSocketAddress, timer: Timer, requests: Queue[HttpRequest]) extends SimpleChannelUpstreamHandler {
- var channel: Channel = null
-
- override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- channel = e.getChannel
- start()
- }
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {}
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- bootstrap.connect(address)
- }
-
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- e.getCause().printStackTrace()
- bootstrap.connect(address)
- }
-
- private def start() {
- timer.newTimeout(new TimerTask {
- def run(timeout: Timeout) {
- val request = requests.dequeue()
- channel.write(request)
- start()
- }
- }, 100, TimeUnit.MILLISECONDS)
- }
-}
View
27 src/main/scala/com/twitter/killdeer/DotsHandler.scala
@@ -1,27 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.ChannelBuffers
-import org.jboss.netty.handler.codec.http._
-
-class DotsHandler(responseSizeCdf: Cdf[Int]) extends SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val channel = e.getChannel
- val length = responseSizeCdf()
- val response = {
- val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
- response.setHeader("Content-Length", length)
- response.setContent(ChannelBuffers.wrappedBuffer {
- val content = new Array[Byte](length)
- java.util.Arrays.fill(content, '.'.toByte)
- content
- })
- response
- }
- channel.write(response).addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- channel.close()
- }
- })
- }
-}
View
117 src/main/scala/com/twitter/killdeer/FaultInjector.scala
@@ -1,117 +0,0 @@
-// Copyright 2010 Twitter, Inc.
-//
-// Fault injection for netty channels.
-
-package com.twitter.killdeer
-
-import java.util.concurrent.TimeUnit
-
-import util.Random
-
-import org.jboss.netty.channel._
-import org.jboss.netty.util.{HashedWheelTimer, Timeout, TimerTask}
-
-object FaultInjector {
- val timer = new HashedWheelTimer(1, TimeUnit.MILLISECONDS)
-}
-
-trait FaultInjector {
- // TODO: define fire-once vs. not?
- def fault(channel: Channel)
-
- var fired = false
- val fireOnce = false
- def fire(channel: Channel) {
- if (!fired || !fireOnce)
- fault(channel)
- fired = true
- }
-}
-
-object NoFaultInjector extends FaultInjector {
- def fault(channel: Channel) = ()
-}
-
-class TimeoutFaultInjector extends FaultInjector {
- override val fireOnce = true
-
- def fault(channel: Channel) {
- channel.getPipeline.addFirst(
- "timeout", new SimpleChannelDownstreamHandler {
- override def writeRequested(ctx: ChannelHandlerContext, m: MessageEvent) {
- // (do nothing.. don't pass it on)
- }
- })
- }
-}
-
-class ConnectionDisconnectFaultInjector extends FaultInjector {
- def fault(channel: Channel) {
- // We sink the event directly so that nothing in the pipeline sees
- // our mischevious actions.
- val future = Channels.future(channel)
- val event = new DownstreamChannelStateEvent(channel, future, ChannelState.CONNECTED, null)
- channel.getPipeline.getSink.eventSunk(channel.getPipeline, event)
- }
-}
-
-// Frequencies given are per request-second.
-class UpstreamFaultInjectorHandler(injectors: Tuple2[FaultInjector, Double]*)
-extends SimpleChannelUpstreamHandler with LifeCycleAwareChannelHandler
-{
- @volatile var timeout: Timeout = _
-
- // Construct a CDF of the faults so we can easily pick from our
- // distribution.
- val faultCDF = new Cdf({
- val injectorCDF =
- injectors.foldLeft(Nil: List[Tuple2[Double, FaultInjector]]) {
- case (Nil, (injector, freq)) => (freq / 1000.0, injector) :: Nil
- case (cdf@((freq, _) :: _), (injector, freq_)) => (freq + freq_ / 1000.0, injector) :: cdf
- }
-
- ((1.0, NoFaultInjector) :: injectorCDF).reverse
- })
-
- def start(channel: Channel) {
- timeout = FaultInjector.timer.newTimeout(
- new TimerTask {
- def run(to: Timeout) {
- if (!channel.isOpen || to.isCancelled)
- return
-
- faultCDF().fire(channel)
- timeout = FaultInjector.timer.newTimeout(this, 1, TimeUnit.MILLISECONDS)
- }
- }, 1, TimeUnit.MILLISECONDS)
- }
-
- def stop() {
- if (timeout ne null) {
- timeout.cancel()
- timeout = null
- }
- }
-
- def afterAdd(ctx: ChannelHandlerContext) {}
- def afterRemove(ctx: ChannelHandlerContext) {}
-
- def beforeAdd(ctx: ChannelHandlerContext) {
- if (ctx.getPipeline.isAttached)
- start(ctx.getChannel)
- }
-
- def beforeRemove(ctx: ChannelHandlerContext) {
- stop()
- }
-
- override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- start(ctx.getChannel)
- ctx.sendUpstream(e)
- }
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- stop()
- ctx.sendUpstream(e)
- }
-}
View
84 src/main/scala/com/twitter/killdeer/HttpFaultInjector.scala
@@ -1,84 +0,0 @@
-package com.twitter.killdeer
-
-import scala.util.Random
-
-import org.jboss.netty.handler.codec.http._
-import org.jboss.netty.channel._
-
-// These fault injectors are per-{request,response} rather than based
-// on request time. They must sit after the http message encoder.
-
-trait HttpFaultInjector {
- def faultMessage(httpMessage: HttpMessage) = ()
- def faultChunk(httpChunk: HttpChunk): Option[HttpChunk] = Some(httpChunk)
-}
-
-object NoHttpFaultInjector extends HttpFaultInjector
-
-object ContentLengthManglingHttpFaultInjector extends HttpFaultInjector {
- override def faultMessage(httpMessage: HttpMessage) {
- val newContentLength = HttpHeaders.getContentLength(httpMessage) + 1
- HttpHeaders.setHeader(httpMessage, "Content-Length", newContentLength)
- }
-}
-
-class ChunkDroppingHttpFaultInjector(rate: Float) extends HttpFaultInjector {
- val rng = new Random
-
- override def faultChunk(httpChunk: HttpChunk) = {
- if (rng.nextFloat < rate)
- None
- else
- Some(httpChunk)
- }
-}
-
-object ContentLengthDroppingHttpFaultInjector extends HttpFaultInjector {
- override def faultMessage(httpMessage: HttpMessage) {
- httpMessage.removeHeader("Content-Length")
- httpMessage.removeHeader("Connection")
- }
-}
-
-class HttpFaultInjectorHandler(injectors: Tuple2[HttpFaultInjector, Double]*)
-extends SimpleChannelDownstreamHandler {
- val faultCdf = {
- val cdf = injectors.foldLeft(Nil: List[Tuple2[Double, HttpFaultInjector]]) {
- case (Nil, (injector, freq)) => (freq, injector) :: Nil
- case (cdf@((freq, _) :: _), (injector, freq_)) => (freq + freq_, injector) :: cdf
- }
-
- new Cdf(((1.0 -> NoHttpFaultInjector) :: cdf).reverse)
- }
-
- val faulter = faultCdf()
-
- override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
- e.getMessage match {
- case httpChunk: HttpChunk =>
- // TODO: what do we do about futures in the case they get
- // filtered out? We don't want the upstream to hang
- // indefinitely.
- for (newChunk <- faulter.faultChunk(httpChunk)) {
- val future = Channels.future(ctx.getChannel)
- future.addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- if (future.isSuccess)
- e.getFuture.setSuccess()
- else
- e.getFuture.setFailure(future.getCause)
- }
- })
-
- Channels.write(ctx, future, newChunk)
- }
-
- case httpMessage: HttpMessage =>
- faulter.faultMessage(httpMessage)
- ctx.sendDownstream(e)
-
- case _ =>
- ctx.sendDownstream(e)
- }
- }
-}
View
20 src/main/scala/com/twitter/killdeer/KeepAliveHandler.scala
@@ -1,20 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-import org.jboss.netty.handler.codec.http.HttpHeaders
-import java.io.File
-
-class KeepAliveHandler(sampleDirectory: String) extends SimpleChannelHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val request = e.getMessage.asInstanceOf[HttpRequest]
- ctx.setAttachment(HttpHeaders.isKeepAlive(request))
- ctx.sendUpstream(e)
- }
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- if (!ctx.getAttachment.asInstanceOf[Boolean]) {
- ctx.sendDownstream(e)
- }
- }
-}
View
34 src/main/scala/com/twitter/killdeer/Killdeer.scala
@@ -1,34 +0,0 @@
-package com.twitter.killdeer
-
-import com.twitter.util.StorageUnitConversions._
-import com.twitter.util.{Timer, Eval}
-import org.jboss.netty.bootstrap.ServerBootstrap
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import java.io.File
-import java.util.concurrent.Executors
-import java.net.InetSocketAddress
-import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory}
-import org.jboss.netty.channel.Channels
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory
-import org.jboss.netty.handler.stream.ChunkedWriteHandler
-
-trait Config {
- def port = 6666
- def pipeline: ChannelPipeline
-}
-
-object Killdeer {
- def apply(args: Seq[String]) {
- val configFileName = args(0)
- val config = Eval[Config](new File(configFileName))
- val port = config.port
- val executor = Executors.newCachedThreadPool()
- val bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor))
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory {
- def getPipeline = config.pipeline
- })
- bootstrap.bind(new InetSocketAddress(port))
- println("Now accepting connections on port: " + port)
- }
-}
View
17 src/main/scala/com/twitter/killdeer/LatencyHandler.scala
@@ -1,17 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.ChannelBuffer
-import org.jboss.netty.util.{Timer, Timeout, TimerTask}
-import java.util.concurrent.TimeUnit
-
-class LatencyHandler(timer: Timer, latencyCdf: Cdf[Int]) extends SimpleChannelDownstreamHandler {
- override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
- val latency = latencyCdf()
- timer.newTimeout(new TimerTask {
- def run(to: Timeout) {
- ctx.sendDownstream(e)
- }
- }, latency, TimeUnit.MILLISECONDS)
- }
-}
View
12 src/main/scala/com/twitter/killdeer/LoggingHandler.scala
@@ -1,12 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-
-class LoggingHandler extends SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val request = e.getMessage.asInstanceOf[HttpRequest]
- println(request.getUri)
- ctx.sendUpstream(e)
- }
-}
View
11 src/main/scala/com/twitter/killdeer/Main.scala
@@ -1,11 +0,0 @@
-package com.twitter.killdeer
-
-object Main {
- def main(args: Array[String]) {
- val mode = args(0)
- mode match {
- case "killdeer" => Killdeer(args.drop(1))
- case "deerkill" => Deerkill(args.drop(1))
- }
- }
-}
View
75 src/main/scala/com/twitter/killdeer/ProxyHandler.scala
@@ -1,75 +0,0 @@
-package com.twitter.killdeer
-
-import java.net.InetSocketAddress
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.ChannelBuffers
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory
-import org.jboss.netty.bootstrap.ClientBootstrap
-import org.jboss.netty.handler.codec.http._
-
-class ProxyHandler(inetSocketAddress: InetSocketAddress, socketChannelFactory: ClientSocketChannelFactory) extends SimpleChannelUpstreamHandler {
- var inboundChannel: Channel = null
- var outboundChannel: Channel = null
-
- override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- def connect() = {
- val bootstrap = new ClientBootstrap(socketChannelFactory)
- val pipeline = bootstrap.getPipeline
- pipeline.addLast("encoder", new HttpRequestEncoder)
- pipeline.addLast("handler", new OutboundHandler(e.getChannel))
- bootstrap.connect(inetSocketAddress)
- }
-
- def suspendTrafficWhileConnecting() {
- inboundChannel.setReadable(false)
- }
-
- inboundChannel = e.getChannel
- suspendTrafficWhileConnecting()
- val f = connect()
- outboundChannel = f.getChannel
- f.addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- if (future.isSuccess) {
- inboundChannel.setReadable(true)
- } else {
- inboundChannel.close()
- }
- }
- })
- }
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- if (outboundChannel != null) closeOnFlush(outboundChannel)
- }
-
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- // e.getCause().printStackTrace() // usually doesn't matter
- }
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val request = e.getMessage.asInstanceOf[HttpRequest]
- request.setHeader("Host", inetSocketAddress.getHostName)
- outboundChannel.write(request)
- }
-
- private class OutboundHandler(inboundChannel: Channel) extends SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- inboundChannel.write(e.getMessage)
- }
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- closeOnFlush(inboundChannel)
- }
-
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- val channel = e.getChannel
- e.getCause().printStackTrace()
- closeOnFlush(channel)
- }
- }
-
- def closeOnFlush(ch: Channel) {
- if (ch.isConnected) ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE)
- }
-}
View
11 src/main/scala/com/twitter/killdeer/RecordedRequest.scala
@@ -1,11 +0,0 @@
-package com.twitter.killdeer
-
-object RecordedRequest {
- val FORMAT = """\S+""".r
- def apply(line: String) = {
- val Seq(transactionId, userId, _, _, _, _, method, uri, _, latency) = FORMAT.findAllIn(line).toList
- new RecordedRequest(transactionId, userId.toInt, method, uri)
- }
-}
-
-case class RecordedRequest(transactionId: String, userId: Int, method: String, uri: String)
View
49 src/main/scala/com/twitter/killdeer/RecordedResponse.scala
@@ -1,49 +0,0 @@
-package com.twitter.killdeer
-
-import java.io._
-import java.net.URLEncoder
-import org.jboss.netty.handler.codec.http.HttpResponse
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.ChannelBuffer
-
-object RecordedResponse {
- val FORMAT = "[^-]+".r
- val maxFileLength = 32
-
- def load(destination: String, transactionId: String, uri: String) = {
- val file = fileFor(destination, transactionId, uri)
- if (file.canRead) {
- val randomAccessFile = new RandomAccessFile(file, "r")
- Some(new DefaultFileRegion(randomAccessFile.getChannel, 0, randomAccessFile.length))
- } else {
- None
- }
- }
-
- def open(destination: String, transactionId: String, uri: String) =
- new RecordedResponse(RecordedResponse.fileFor(destination, transactionId, uri))
-
-
- private def fileFor(destination: String, transactionId: String, uri: String) = {
- val Seq(timestamp, _, _) = FORMAT.findAllIn(transactionId).toList
- val pathName = destination + timestamp + "/" + transactionId
-
- val filename = URLEncoder.encode(uri)
- val splitFilename = filename.grouped(maxFileLength).toList
-
- val path = new File(pathName + "/" + splitFilename.take(splitFilename.length - 1).mkString("/"))
- path.mkdirs()
- new File(path, splitFilename.last)
- }
-}
-
-class RecordedResponse(file: File) {
- file.delete()
- val fileChannel = new FileOutputStream(file, true).getChannel
-
- def writeMessage(buffer: ChannelBuffer) {
- fileChannel.write(buffer.toByteBuffer)
- }
-
- def close() { fileChannel.close() }
-}
View
34 src/main/scala/com/twitter/killdeer/RecordedResponseHandler.scala
@@ -1,34 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-import org.jboss.netty.handler.codec.http.HttpHeaders._
-import org.jboss.netty.handler.codec.http.HttpResponseStatus._
-import org.jboss.netty.handler.codec.http.HttpVersion._
-import org.jboss.netty.util.CharsetUtil
-
-class RecordedResponseHandler(responseSampleDirectory: String) extends SimpleChannelHandler {
- import RecordedResponse.load
-
- def txnid(request: HttpRequest) = request.getHeader("X-Transaction") match {
- case null => "-"
- case s => s
- }
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val request = e.getMessage.asInstanceOf[HttpRequest]
- val channel = e.getChannel
- val transactionId = txnid(request)
- load(responseSampleDirectory, transactionId, request.getUri) map { recordedResponse =>
- channel.write(recordedResponse).addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- channel.close()
- recordedResponse.releaseExternalResources()
- }
- })
- } getOrElse {
- println("Unsupported request: " + request)
- channel.close()
- }
- }
-}
View
36 src/main/scala/com/twitter/killdeer/ResponseRecorderHandler.scala
@@ -1,36 +0,0 @@
-package com.twitter.killdeer
-
-import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-import org.jboss.netty.buffer.ChannelBuffer
-import java.io.File
-
-class ResponseRecorderHandler(sampleDirectory: String) extends SimpleChannelHandler {
- case class Request(transactionId: String, uri: String)
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val request = e.getMessage.asInstanceOf[HttpRequest]
- var recorder = ctx.getAttachment.asInstanceOf[RecordedResponse]
- if (recorder != null) recorder.close()
- val transactionId = request.getHeader("X-Transaction")
-
- if (transactionId != null) {
- recorder = RecordedResponse.open(sampleDirectory, transactionId, request.getUri)
- ctx.setAttachment(recorder)
- }
- ctx.sendUpstream(e)
- }
-
- override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
- val response = e.getMessage.asInstanceOf[ChannelBuffer]
- val recorder = ctx.getAttachment.asInstanceOf[RecordedResponse]
- if (recorder != null) recorder.writeMessage(response)
- ctx.sendDownstream(e)
- }
-
- override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- val recorder = ctx.getAttachment.asInstanceOf[RecordedResponse]
- if (recorder != null) recorder.close()
- ctx.sendDownstream(e)
- }
-}
View
119 src/main/scala/com/twitter/killdeer/ThrottlingHandler.scala
@@ -1,119 +0,0 @@
-package com.twitter.killdeer
-
-import scala.math
-import scala.collection.mutable.ListBuffer
-import scala.collection.JavaConversions._
-
-import java.util.concurrent.TimeUnit
-
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer, CompositeChannelBuffer}
-import org.jboss.netty.handler.queue.BufferedWriteHandler
-import org.jboss.netty.util.{Timer, TimerTask, Timeout}
-
-abstract class BufferingThrottler(rateBps: Int, timer: Timer) {
- var events = List[MessageEvent]()
- var ctx: ChannelHandlerContext = _
-
- // Minimum granularity: 10ms.
- val rateBpms = rateBps.toFloat / 1000.0
- val tickMs = math.max(10, 1 / rateBpms).toInt
- val bytesPerTick = math.max(1, math.ceil(rateBpms * tickMs).toInt)
-
- @volatile var timeout: Timeout = _
- val task = new TimerTask {
- def run(to: Timeout) {
- if (to.isCancelled)
- return
-
- doWrite(bytesPerTick)
- timeout = timer.newTimeout(this, tickMs, TimeUnit.MILLISECONDS)
- }
- }
-
- timeout = timer.newTimeout(task, tickMs, TimeUnit.MILLISECONDS)
-
- def sink(ctx: ChannelHandlerContext, e: MessageEvent) {
- assert(e.getMessage.isInstanceOf[ChannelBuffer])
- this.ctx = ctx
-
- synchronized {
- events = events :+ e
- }
- }
-
- def stop() = {
- timeout.cancel()
- doWrite(Int.MaxValue)
- }
-
- def doWrite(howmuch: Int) = {
- val writeBuffers = new ListBuffer[ChannelBuffer]()
- val writeFutures = new ListBuffer[ChannelFuture]()
-
- synchronized {
- var written = 0
-
- while (written < howmuch && !events.isEmpty) {
- var event = events.head
- val buffer = event.getMessage.asInstanceOf[ChannelBuffer]
-
- val toWrite = math.min(howmuch - written, buffer.readableBytes)
- written += toWrite
-
- writeBuffers.append(buffer.readBytes(toWrite))
- if (!buffer.readable) {
- events = events.tail
- writeFutures.append(event.getFuture)
- }
- }
- }
-
- if (!writeBuffers.isEmpty) {
- val composite = ChannelBuffers.wrappedBuffer(writeBuffers.toArray:_*)
-
- val future = Channels.future(ctx.getChannel)
- future.addListener(new ChannelFutureListener() {
- def operationComplete(future: ChannelFuture) {
- if (future.isSuccess)
- writeFutures.foreach(_.setSuccess())
- else
- writeFutures.foreach(_.setFailure(future.getCause))
- }
- })
- write(composite, future)
- }
- }
-
-
- // Implement this!
- def write(out: ChannelBuffer, future: ChannelFuture)
-}
-
-// These need to be inserted very low in the stack-- it needs access
-// to ChannelBuffers directly, and not decoded POJOs.
-class DownstreamThrottlingHandler(rateBps: Int, timer: Timer)
-extends SimpleChannelDownstreamHandler
-{
- var ctx: ChannelHandlerContext = _
-
- val throttler = new BufferingThrottler(rateBps, timer) {
- def write(out: ChannelBuffer, future: ChannelFuture) {
- if (ctx ne null)
- Channels.write(ctx, future, out)
- }
- }
-
- override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = {
- this.ctx = ctx
- throttler.sink(ctx, e)
- }
-
- override def closeRequested(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- // The upstream handler waits for the write futures to complete
- // before closing, so we really shouldn't have any issues with
- // buffering here.
- throttler.stop()
- ctx.sendUpstream(e)
- }
-}
View
13 src/scripts/dk
@@ -1,13 +0,0 @@
-#!/bin/sh
-
-APP_NAME=killdeer
-VERSION=0.5
-HEAP_OPTS="-Xmx1024m -Xms1024m -XX:NewSize=512m"
-GC_OPTS="-XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC -XX:+UseParNewGC"
-DEBUG_OPTS="-XX:ErrorFile=/var/log/$APP_NAME/java_error%p.log"
-#YK_OPTS="-agentpath:/Applications/YourKit_Java_Profiler_9.0.2.app/bin/mac/libyjpagent.jnilib -jar"
-#YK_OPTS="-verbose:class"
-JAVA_OPTS="-server $YK_OPTS $GC_OPTS $HEAP_OPTS $DEBUG_OPTS"
-
-
-java $JAVA_OPTS -jar $APP_NAME-$VERSION.jar deerkill $@
View
13 src/scripts/kd
@@ -1,13 +0,0 @@
-#!/bin/sh
-
-APP_NAME=killdeer
-VERSION=0.5
-HEAP_OPTS="-Xmx1024m -Xms1024m -XX:NewSize=512m"
-GC_OPTS="-XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC -XX:+UseParNewGC"
-DEBUG_OPTS="-XX:ErrorFile=/var/log/$APP_NAME/java_error%p.log"
-#YK_OPTS="-agentpath:/Applications/YourKit_Java_Profiler_9.0.2.app/bin/mac/libyjpagent.jnilib -jar"
-#YK_OPTS="-verbose:class"
-JAVA_OPTS="-server $YK_OPTS $GC_OPTS $HEAP_OPTS $DEBUG_OPTS"
-
-
-java $JAVA_OPTS -jar $APP_NAME-$VERSION.jar killdeer $@
View
3 src/test/resources/response-sample.txt
@@ -1,3 +0,0 @@
-.1 100 100
-.2 200 200
-.3 300 300

0 comments on commit 33431ca

Please sign in to comment.
Something went wrong with that request. Please try again.