Permalink
Browse files

Log data.

Provide a mechanism for QC users to handle log events.
See instruments for an example usage: https://gist.github.com/2317855.
This change also provided an opportunity to use structured data as the
log format..
  • Loading branch information...
1 parent f38682e commit 6057420feefec4c0a8efb369d59c016a0e0cad19 Ryan Smith (ace hacker) committed Apr 8, 2012
Showing with 207 additions and 68 deletions.
  1. +4 −2 Gemfile
  2. +20 −11 lib/queue_classic.rb
  3. +10 −12 lib/queue_classic/conn.rb
  4. +5 −3 lib/queue_classic/queries.rb
  5. +127 −0 lib/queue_classic/scrolls.rb
  6. +29 −38 lib/queue_classic/worker.rb
  7. +12 −0 readme.md
  8. +0 −2 test/helper.rb
View
@@ -1,9 +1,11 @@
source :rubygems
-gem 'rake'
+gem "rake"
gemspec
group :test do
- gem 'ruby-debug19'
+ gem "ruby-debug19"
+ gem "turn"
+ gem "minitest"
end
View
@@ -1,17 +1,18 @@
require "pg"
-
-require "logger"
require "uri"
$: << File.expand_path(__FILE__, "lib")
+require "queue_classic/scrolls"
require "queue_classic/okjson"
require "queue_classic/conn"
require "queue_classic/queries"
require "queue_classic/queue"
require "queue_classic/worker"
module QC
+ # ENV["QC_LOG_LEVEL"] is used in Scrolls
+ Scrolls::Log.start
Root = File.expand_path("..", File.dirname(__FILE__))
SqlFunctions = File.join(QC::Root, "/sql/ddl.sql")
@@ -22,9 +23,6 @@ module QC
ENV["DATABASE_URL"] ||
raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
- # export QC_LOG_LEVEL=`ruby -r "logger" -e "puts Logger::ERROR"`
- LOG_LEVEL = (ENV["QC_LOG_LEVEL"] || Logger::DEBUG).to_i
-
# You can use the APP_NAME to query for
# postgres related process information in the
# pg_stat_activity table. Don't set this unless
@@ -66,12 +64,6 @@ module QC
# as the max exponent.
MAX_LOCK_ATTEMPTS = (ENV["QC_MAX_LOCK_ATTEMPTS"] || 5).to_i
-
- # Setup the logger
- Log = Logger.new($stdout)
- Log.level = LOG_LEVEL
- Log.info("program=queue_classic log=true")
-
# Defer method calls on the QC module to the
# default queue. This facilitates QC.enqueue()
def self.method_missing(sym, *args, &block)
@@ -84,4 +76,21 @@ def self.default_queue
end
end
+ def self.log_yield(data)
+ begin
+ t0 = Time.now
+ yield
+ rescue => e
+ log({:level => :error, :error => e.class, :message => e.message.strip}.merge(data))
+ raise
+ ensure
+ t = Integer((Time.now - t0)*1000)
+ log(data.merge(:elapsed => t)) unless e
+ end
+ end
+
+ def self.log(data)
+ Scrolls.log({:lib => :queue_classic}.merge(data))
+ end
+
end
View
@@ -3,46 +3,44 @@ module Conn
extend self
def execute(stmt, *params)
- log("executing #{stmt.inspect}, #{params.inspect}")
+ log(:level => :debug, :action => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PGError => e
- log("execute exception=#{e.inspect}")
+ log(:error => e.inspect)
raise
end
end
def notify(chan)
- log("NOTIFY")
+ log(:level => :debug, :action => "NOTIFY")
execute('NOTIFY "' + chan + '"') #quotes matter
end
def listen(chan)
- log("LISTEN")
+ log(:level => :debug, :action => "LISTEN")
execute('LISTEN "' + chan + '"') #quotes matter
end
def unlisten(chan)
- log("UNLISTEN")
+ log(:level => :debug, :action => "UNLISTEN")
execute('UNLISTEN "' + chan + '"') #quotes matter
end
def drain_notify
until connection.notifies.nil?
- log("draining notifications")
+ log(:level => :debug, :action => "drain_notifications")
end
end
def wait_for_notify(t)
- log("waiting for notify timeout=#{t}")
connection.wait_for_notify(t) do |event, pid, msg|
- log("received notification #{event}")
+ log(:level => :debug, :action => "received_notification")
end
- log("done waiting for notify")
end
def transaction
@@ -70,7 +68,7 @@ def disconnect
end
def connect
- log("establishing connection")
+ log(:level => :debug, :action => "establish_conn")
conn = PGconn.connect(
db_url.host,
db_url.port || 5432,
@@ -80,7 +78,7 @@ def connect
db_url.password
)
if conn.status != PGconn::CONNECTION_OK
- log("connection error=#{conn.error}")
+ log(:level => :error, :message => conn.error)
end
conn
end
@@ -90,7 +88,7 @@ def db_url
end
def log(msg)
- Log.info(msg)
+ QC.log(msg)
end
end
@@ -3,9 +3,11 @@ module Queries
extend self
def insert(q_name, method, args, chan=nil)
- s = "INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
- res = Conn.execute(s, q_name, method, OkJson.encode(args))
- Conn.notify(chan) if chan
+ QC.log_yield(:action => "insert_job") do
+ s = "INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
+ res = Conn.execute(s, q_name, method, OkJson.encode(args))
+ Conn.notify(chan) if chan
+ end
end
def lock_head(q_name, top_bound)
@@ -0,0 +1,127 @@
+require "thread"
+
+module Scrolls
+ extend self
+
+ def log(data, &blk)
+ Log.log(data, &blk)
+ end
+
+ def log_exception(data, e)
+ Log.log_exception(data, e)
+ end
+
+ module Log
+ extend self
+
+ LOG_LEVEL = (ENV["QC_LOG_LEVEL"] || 3).to_i
+ LOG_LEVEL_MAP = {
+ "fatal" => 0,
+ "error" => 1,
+ "warn" => 2,
+ "info" => 3,
+ "debug" => 4
+ }
+
+ attr_accessor :stream
+
+ def start(out = nil)
+ # This allows log_exceptions below to pick up the defined output,
+ # otherwise stream out to STDERR
+ @defined = out.nil? ? false : true
+ sync_stream(out)
+ end
+
+ def sync_stream(out = nil)
+ out = STDOUT if out.nil?
+ @stream = out
+ @stream.sync = true
+ end
+
+ def mtx
+ @mtx ||= Mutex.new
+ end
+
+ def write(data)
+ if log_level_ok?(data[:level])
+ msg = unparse(data)
+ mtx.synchronize do
+ @stream.puts(msg)
+ end
+ end
+ end
+
+ def unparse(data)
+ data.map do |(k, v)|
+ if (v == true)
+ k.to_s
+ elsif v.is_a?(Float)
+ "#{k}=#{format("%.3f", v)}"
+ elsif v.nil?
+ nil
+ else
+ v_str = v.to_s
+ if (v_str =~ /^[a-zA-z0-9\-\_\.]+$/)
+ "#{k}=#{v_str}"
+ else
+ "#{k}=\"#{v_str.sub(/".*/, "...")}\""
+ end
+ end
+ end.compact.join(" ")
+ end
+
+ def log(data, &blk)
+ unless blk
+ write(data)
+ else
+ start = Time.now
+ res = nil
+ log(data.merge(:at => :start))
+ begin
+ res = yield
+ rescue StandardError, Timeout::Error => e
+ log(data.merge(
+ :at => :exception,
+ :reraise => true,
+ :class => e.class,
+ :message => e.message,
+ :exception_id => e.object_id.abs,
+ :elapsed => Time.now - start
+ ))
+ raise(e)
+ end
+ log(data.merge(:at => :finish, :elapsed => Time.now - start))
+ res
+ end
+ end
+
+ def log_exception(data, e)
+ sync_stream(STDERR) unless @defined
+ log(data.merge(
+ :exception => true,
+ :class => e.class,
+ :message => e.message,
+ :exception_id => e.object_id.abs
+ ))
+ if e.backtrace
+ bt = e.backtrace.reverse
+ bt[0, bt.size-6].each do |line|
+ log(data.merge(
+ :exception => true,
+ :exception_id => e.object_id.abs,
+ :site => line.gsub(/[`'"]/, "")
+ ))
+ end
+ end
+ end
+
+ def log_level_ok?(level)
+ if level
+ LOG_LEVEL_MAP[level.to_s] <= LOG_LEVEL
+ else
+ true
+ end
+ end
+
+ end
+end
Oops, something went wrong.

0 comments on commit 6057420

Please sign in to comment.