Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 40 additions & 25 deletions lib/skylight/probes/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,36 @@ module DelayedJob
begin
require "delayed/plugin"

UNKNOWN = "<Delayed::Job Unknown>"

class Plugin < ::Delayed::Plugin
callbacks do |lifecycle|
lifecycle.around(:perform) { |worker, job, &block| sk_instrument(worker, job, &block) }

lifecycle.after(:error) { |_worker, _job| Skylight.trace&.segment = "error" }
lifecycle.after(:failure) { |_worker, _job| Skylight.trace&.segment = "error" }
end

class << self
include Skylight::Util::Logging

# This is called quite early in Delayed::Worker
#
# Typically, the `:perform` lifecycle hook is called before the
# `payload_object` has been deserialized, so we can't name the
# trace yet.
#
# If we call `payload_object` here, we would move the work of
# loading the object ahead of where it naturally happens, which
# means the database load time won't be instrumented. On the other
# hand, should the deserialization fail, we would have moved the
# timing of the error as well. Crucially – it would have moved it
# outside of the spot where these errors are normally caught and
# reported by the worker.
#
# See https://github.com/skylightio/skylight-ruby/issues/491
def sk_instrument(_worker, job)
endpoint = Skylight::Probes::DelayedJob.handler_name(job)

Skylight.trace(
endpoint,
UNKNOWN,
"app.delayed_job.worker",
"Delayed::Worker#run",
component: :worker,
Expand All @@ -41,15 +56,6 @@ def sk_instrument(_worker, job)
$stderr.puts "[SKYLIGHT] The delayed_job probe was requested, but Delayed::Plugin was not defined."
end

UNKNOWN = "<Delayed::Job Unknown>"

def self.handler_name(job)
payload_object =
job.respond_to?(:payload_object_without_sk) ? job.payload_object_without_sk : job.payload_object

payload_object_name(payload_object)
end

def self.payload_object_name(payload_object)
if payload_object.is_a?(::Delayed::PerformableMethod)
payload_object.display_name
Expand Down Expand Up @@ -77,18 +83,27 @@ def self.payload_object_source_meta(payload_object)

class InstrumentationProxy < SimpleDelegator
def perform
source_meta = Skylight::Probes::DelayedJob.payload_object_source_meta(__getobj__)

opts = {
category: "app.delayed_job.job",
title: format_source(*source_meta),
meta: {
source_location_hint: source_meta
},
internal: true
}

Skylight.instrument(opts) { __getobj__.perform }
if (trace = Skylight.instrumenter&.current_trace)
if trace.endpoint == UNKNOWN
# At this point, deserialization was, by definition, successful.
# So it'd be safe to set the endpoint name based on the payload
# object here.
trace.endpoint = Skylight::Probes::DelayedJob.payload_object_name(__getobj__)
end

source_meta = Skylight::Probes::DelayedJob.payload_object_source_meta(__getobj__)

opts = {
category: "app.delayed_job.job",
title: format_source(*source_meta),
meta: {
source_location_hint: source_meta
},
internal: true
}

Skylight.instrument(opts) { __getobj__.perform }
end
end

# Used by Delayed::Backend::Base to determine Job#name
Expand Down
82 changes: 82 additions & 0 deletions spec/integration/delayed_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,88 @@ def enqueue_job(*args)
)
end
end

context "with ActiveRecord model" do
let(:users_migration) do
base = ActiveRecord::Migration
base = base::Current if defined?(base::Current)

Class.new(base) do
def self.up
create_table :users, force: true do |table|
table.string :name, null: false
table.timestamps null: true
end
end

def self.down
drop_table :users
end
end
end

around do |example|
with_sqlite(migration: users_migration) do
example.call
end
end

before do
stub_const("SkDelayedRecord", Class.new(ActiveRecord::Base) do
self.table_name = "users"

def good_method
Skylight.instrument(category: "app.zomg") do
SpecHelper.clock.skip 1
end
end

def self.good_method
new.good_method
end
end)
end

# overrides enqueue_job on the outer context
def enqueue_job(_method_name, *, class_method: false)
if class_method
SkDelayedRecord.delay(queue: "queue-name").good_method
else
SkDelayedRecord.create!(name: "test-record").tap do |record|
record.delay(queue: "queue-name").good_method
end
end
end

specify "instance method" do
enqueue_and_process_job(:good_method)

server.wait resource: "/report"
report = server.reports[0].to_simple_report
expect(report.endpoint.name).to eq("SkDelayedRecord#good_method<sk-segment>queue-name</sk-segment>")
end

specify "class method" do
enqueue_and_process_job(:good_method, class_method: true)

server.wait resource: "/report"
report = server.reports[0].to_simple_report
expect(report.endpoint.name).to eq("SkDelayedRecord.good_method<sk-segment>queue-name</sk-segment>")
end

specify "instance method on a deleted record" do
SkDelayedRecord.create!(name: "test-record").tap do |record|
record.delay(queue: "queue-name").good_method
record.destroy!
end

expect { worker.work_off }.not_to raise_error

server.wait resource: "/report"
report = server.reports[0].to_simple_report
expect(report.endpoint.name).to eq("<Delayed::Job Unknown><sk-segment>error</sk-segment>")
end
end
end

def enqueue_job(method_name, *, class_method: false)
Expand Down