Skip to content

Commit

Permalink
show fd/sig in logs; connection pool should not grow in size; close b…
Browse files Browse the repository at this point in the history
…roken connections and queue queries when disconnected
  • Loading branch information
tmm1 committed Sep 4, 2008
1 parent 36ec963 commit d0d6907
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions lib/em/mysql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ class << self
class EventedMysql < EM::Connection
def initialize mysql, opts
@mysql = mysql
@fd = mysql.socket
@opts = opts
@queue = []
@pending = []
@processing = false
@connected = true

log 'mysql connected'
end
attr_reader :processing
attr_reader :processing, :connected

def connection_completed
@connected = true
Expand Down Expand Up @@ -54,12 +57,13 @@ def notify_readable
blk.call res if blk
else
log 'readable, but nothing queued?! probably an ERROR state'
return close
end
rescue Mysql::Error => e
log 'mysql error', e.message
if DisconnectErrors.include? e.message
@pending << [response, sql, blk]
close
return close
else
raise e
end
Expand All @@ -71,7 +75,7 @@ def notify_readable

def unbind
log 'mysql disconnect', $!
cp = EventedMysql.instance_variable_get('@connection_pool') and cp.delete(self)
# cp = EventedMysql.instance_variable_get('@connection_pool') and cp.delete(self)
@connected = false

# XXX wait for the next tick, so the FD is removed completely from the reactor
Expand All @@ -81,14 +85,20 @@ def unbind
log 'mysql reconnecting'
@processing = false
@mysql = EventedMysql._connect @opts
@fd = @mysql.socket
# puts;
# p ['oldsig', @signature]
@signature = EM.attach_fd @mysql.socket, EM::AttachInNotifyReadableMode, EM::AttachInWriteMode
# p ['newsig', @signature]
# puts;
log 'mysql connected'
EM.instance_variable_get('@conns')[@signature] = self
end
end

def execute sql, response = nil, &blk
begin
unless @processing
unless @processing or !@connected
@processing = true
log 'mysql sending', sql
@mysql.send_query(sql)
Expand All @@ -100,8 +110,7 @@ def execute sql, response = nil, &blk
log 'mysql error', e.message
if DisconnectErrors.include? e.message
@pending << [response, sql, blk]
close_connection
return
return close
else
raise e
end
Expand All @@ -112,8 +121,8 @@ def execute sql, response = nil, &blk

def close
@connected = false
@mysql.close
close_connection
@mysql.close
end

private
Expand All @@ -127,7 +136,7 @@ def next_query

def log *args
return unless @opts[:logging]
p args
p [@fd, (@signature[-4..-1] if @signature), *args]
end

public
Expand Down Expand Up @@ -218,10 +227,11 @@ def self.all query, type = nil, &blk

def self.connection_pool
@connection_pool ||= (1..settings[:connections]).map{ EventedMysql.connect(settings) }
(1..(settings[:connections]-@connection_pool.size)).each do
@connection_pool << EventedMysql.connect(settings)
end unless settings[:connections] == @connection_pool.size
@connection_pool
# p ['connpool', settings[:connections], @connection_pool.size]
# (1..(settings[:connections]-@connection_pool.size)).each do
# @connection_pool << EventedMysql.connect(settings)
# end unless settings[:connections] == @connection_pool.size
# @connection_pool
end
end

Expand Down Expand Up @@ -256,7 +266,6 @@ def self.connection_pool
}
end

#
# to test, run:
# mysqladmin5 -u root kill `mysqladmin5 -u root processlist | grep "select sleep(5)+1" | cut -d'|' -f2`
#
Expand Down

0 comments on commit d0d6907

Please sign in to comment.