Skip to content

Commit

Permalink
Feat: name scheduler threads + redirect error logging (#102)
Browse files Browse the repository at this point in the history
* use a custom Scheduler sub-class
  - redirecting potential logs to LS logger
  - setting thread name for scheduler threads
  - any worker threads started are also named

* Refactor: include cause when warning from exception

* force testing against rufus-scheduler 3.0.9
  • Loading branch information
kares committed Jan 19, 2022
1 parent 0bcbc47 commit abec497
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 5.2.2
- Feat: name scheduler threads + redirect error logging [#102](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/102)

## 5.2.1
- Refactor: isolate paginated normal statement algorithm in a separate handler [#101](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/101)

Expand Down
8 changes: 6 additions & 2 deletions lib/logstash/inputs/jdbc.rb
Expand Up @@ -3,6 +3,7 @@
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc/jdbc"
require "logstash/plugin_mixins/jdbc/scheduler"
require "logstash/plugin_mixins/ecs_compatibility_support"
require "logstash/plugin_mixins/ecs_compatibility_support/target_check"
require "logstash/plugin_mixins/validator_support/field_reference_validation_adapter"
Expand Down Expand Up @@ -293,8 +294,11 @@ def set_value_tracker(instance)
def run(queue)
load_driver
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
# input thread (Java) name example "[my-oracle]<jdbc"
@scheduler = LogStash::PluginMixins::Jdbc::Scheduler.new(
:max_work_threads => 1, :thread_name => "[#{id}]<jdbc__scheduler"
)
@scheduler.schedule_cron @schedule do
execute_query(queue)
end

Expand Down
3 changes: 2 additions & 1 deletion lib/logstash/plugin_mixins/jdbc/jdbc.rb
Expand Up @@ -220,7 +220,8 @@ def execute_statement
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e
details = { :exception => e.message }
details = { exception: e.class, message: e.message }
details[:cause] = e.cause.inspect if e.cause
details[:backtrace] = e.backtrace if @logger.debug?
@logger.warn("Exception when executing JDBC query", details)
else
Expand Down
111 changes: 111 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/scheduler.rb
@@ -0,0 +1,111 @@
require 'rufus/scheduler'

require 'logstash/util/loggable'

module LogStash module PluginMixins module Jdbc
class Scheduler < Rufus::Scheduler

include LogStash::Util::Loggable

# Rufus::Scheduler >= 3.4 moved the Time impl into a gem EoTime = ::EtOrbi::EoTime`
# Rufus::Scheduler 3.1 - 3.3 using it's own Time impl `Rufus::Scheduler::ZoTime`
TimeImpl = defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime :
(defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time)

# @overload
def on_error(job, err)
details = { exception: err.class, message: err.message, backtrace: err.backtrace }
details[:cause] = err.cause if err.cause

details[:now] = debug_format_time(TimeImpl.now)
details[:last_time] = (debug_format_time(job.last_time) rescue nil)
details[:next_time] = (debug_format_time(job.next_time) rescue nil)
details[:job] = job

details[:opts] = @opts
details[:started_at] = started_at
details[:thread] = thread.inspect
details[:jobs_size] = @jobs.size
details[:work_threads_size] = work_threads.size
details[:work_queue_size] = work_queue.size

logger.error("Scheduler intercepted an error:", details)

rescue => e
logger.error("Scheduler failed in #on_error #{e.inspect}")
end

def debug_format_time(time)
# EtOrbi::EoTime used by (newer) Rufus::Scheduler has to_debug_s https://git.io/JyiPj
time.respond_to?(:to_debug_s) ? time.to_debug_s : time.strftime("%Y-%m-%dT%H:%M:%S.%L")
end
private :debug_format_time

# @private helper used by JobDecorator
def work_thread_name_prefix
( @opts[:thread_name] || "#{@thread_key}_scheduler" ) + '_worker-'
end

protected

# @overload
def start
ret = super() # @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler"

# at least set thread.name for easier thread dump analysis
if @thread.is_a?(Thread) && @thread.respond_to?(:name=)
@thread.name = @thread[:name] if @thread[:name]
end

ret
end

# @overload
def do_schedule(job_type, t, callable, opts, return_job_instance, block)
job_or_id = super

job_or_id.extend JobDecorator if return_job_instance

job_or_id
end

module JobDecorator

def start_work_thread
prev_thread_count = @scheduler.work_threads.size

ret = super() # does not return Thread instance in 3.0

work_threads = @scheduler.work_threads
while prev_thread_count == work_threads.size # very unlikely
Thread.pass
work_threads = @scheduler.work_threads
end

work_thread_name_prefix = @scheduler.work_thread_name_prefix

work_threads.sort! do |t1, t2|
if t1[:name].nil?
t2[:name].nil? ? 0 : +1 # nils at the end
elsif t2[:name].nil?
t1[:name].nil? ? 0 : -1
else
t1[:name] <=> t2[:name]
end
end

work_threads.each_with_index do |thread, i|
unless thread[:name]
thread[:name] = "#{work_thread_name_prefix}#{sprintf('%02i', i)}"
thread.name = thread[:name] if thread.respond_to?(:name=)
# e.g. "[oracle]<jdbc_scheduler_worker-00"
end
end

ret
end

end

end
end end end
5 changes: 2 additions & 3 deletions logstash-integration-jdbc.gemspec
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.2.1'
s.version = '5.2.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down Expand Up @@ -34,8 +34,7 @@ Gem::Specification.new do |s|

s.add_runtime_dependency 'tzinfo'
s.add_runtime_dependency 'tzinfo-data'
# 3.5 limitation is required for jdbc-static loading schedule
s.add_runtime_dependency 'rufus-scheduler', '< 3.5'
s.add_runtime_dependency 'rufus-scheduler', '~> 3.0.9'
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3'
s.add_runtime_dependency "logstash-mixin-validator_support", '~> 1.0'
s.add_runtime_dependency "logstash-mixin-event_support", '~> 1.0'
Expand Down
2 changes: 1 addition & 1 deletion spec/inputs/integration/integ_spec.rb
Expand Up @@ -70,7 +70,7 @@
plugin.register
expect( plugin ).to receive(:log_java_exception)
expect(plugin.logger).to receive(:warn).once.with("Exception when executing JDBC query",
hash_including(:exception => instance_of(String)))
hash_including(:message => instance_of(String)))
q = Queue.new
expect{ plugin.run(q) }.not_to raise_error
end
Expand Down
52 changes: 52 additions & 0 deletions spec/plugin_mixins/jdbc/scheduler_spec.rb
@@ -0,0 +1,52 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/plugin_mixins/jdbc/scheduler"

describe LogStash::PluginMixins::Jdbc::Scheduler do

let(:thread_name) { '[test]<jdbc_scheduler' }

let(:opts) do
{ :max_work_threads => 2, :thread_name => thread_name }
end

subject(:scheduler) { LogStash::PluginMixins::Jdbc::Scheduler.new(opts) }

after { scheduler.stop(:wait) }

it "sets scheduler thread name" do
expect( scheduler.thread.name ).to include thread_name
end

context 'cron schedule' do

before do
scheduler.schedule_cron('* * * * * *') { sleep 1.25 } # every second
end

it "sets worker thread names" do
sleep 3.0
threads = scheduler.work_threads
threads.sort! { |t1, t2| (t1.name || '') <=> (t2.name || '') }

expect( threads.size ).to eql 2
expect( threads.first.name ).to eql "#{thread_name}_worker-00"
expect( threads.last.name ).to eql "#{thread_name}_worker-01"
end

end

context 'every 1s' do

before do
scheduler.schedule_in('1s') { raise 'TEST' } # every second
end

it "logs errors handled" do
expect( scheduler.logger ).to receive(:error).with /Scheduler intercepted an error/, hash_including(:message => 'TEST')
sleep 1.5
end

end

end

0 comments on commit abec497

Please sign in to comment.