Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 3c5dd38037
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 199 lines (160 sloc) 3.907 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
require 'rubygems'
require 'mysqlplus'

class MysqlTest
  
  class NotImplemented < StandardError
  end
  
  attr_accessor :queries,
                :context,
                :connections,
                :connection_signature,
                :start,
                :done,
                :query_with,
                :per_query_overhead,
                :timeout
  
  def initialize( queries, context = '' )
    @queries = queries
    @context = context
    @done = []
    @query_with = :async_query
    @per_query_overhead = 3
    @timeout = 20
    yield self if block_given?
  end
  
  def setup( &block )
    @start = Time.now
    @connection_signature = block
  end
  
  def run!
    c_or_native_ruby_async_query do
      present_context if context?
      prepare
      yield
    end
  end
  
  def per_query_overhead=( overhead )
    @per_query_overhead = ( overhead == :random ) ? rand() : overhead
  end
  
  protected

  def prepare
    raise NotImplemented
  end
  
  def teardown
    raise NotImplemented
  end
  
  def log( message, prefix = '' )
    puts "[#{timestamp}] #{prefix} #{message}"
  end
  
  def with_logging( message )
    log( message, 'Start' )
    yield
    log( message, 'End' )
  end
  
  def timestamp
    Time.now - @start
  end

  def context?
    @context != ''
  end
  
  def present_context
    log "#############################################"
    log "# #{@context}"
    log "#############################################"
  end
  
  def c_or_native_ruby_async_query
    if @query_with == :c_async_query
      log "** using C based async_query"
    else
      log "** using native Ruby async_query"
    end
    yield
  end
  
  def dispatch_query( connection, sql, timeout = nil )
    connection.send( @query_with, sql, timeout )
  end
  
end

class EventedMysqlTest < MysqlTest
  
  attr_accessor :sockets
  
  def initialize( queries, context = '' )
    @sockets = []
    @connections = {}
    super( queries, context )
  end
  
  def setup( &block )
    super( &block )
    with_logging 'Setup connection pool' do
      @queries.times do
        connection = @connection_signature.call
        @connections[ IO.new(connection.socket) ] = connection
        @sockets = @connections.keys
      end
    end
  end
  
  def run!
    super do
      catch :END_EVENT_LOOP do
        loop do
          result = select( @sockets,nil,nil,nil )
          if result
            result.first.each do |conn|
              @connections[conn].get_result.each{|res| log( "Result for socket #{conn.fileno} : #{res}" ) }
              @done << nil
              if done?
                teardown
              end
            end
          end
        end
      end
    end
  end
  
  protected
  
  def prepare
    @connections.each_value do |conn|
      conn.send_query( "select sleep(#{@per_query_overhead})" )
    end
  end
  
  def teardown
    log "done"
    throw :END_EVENT_LOOP
  end
  
  def done?
    @done.size == @queries
  end
  
end

class ThreadedMysqlTest < MysqlTest
  
  attr_accessor :threads
  
  def initialize( queries, context = '' )
    @connections = []
    @threads = []
    super( queries, context )
  end
  
  def setup( &block )
    super( &block )
    with_logging "Setup connection pool" do
      @queries.times do
        @connections << @connection_signature.call
      end
    end
  end
  
  def run!
    super do
      with_logging "waiting on threads" do
        @threads.each{|t| t.join }
      end
    end
  end
  
  protected
  
  def prepare
    with_logging "prepare" do
      @queries.times do |conn|
        @threads << Thread.new do

          log "sending query on connection #{conn}"

          dispatch_query( @connections[conn], "select sleep(#{@per_query_overhead})", @timeout ).each do |result|
            log "connection #{conn} done"
          end
        
        end
      end
    end
  end
  
end
Something went wrong with that request. Please try again.