Skip to content

Commit

Permalink
updates per PR review, non count logging, stricter validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Boertje committed Sep 9, 2019
1 parent e33945d commit ec8600b
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 84 deletions.
39 changes: 30 additions & 9 deletions lib/logstash/inputs/jdbc.rb
Expand Up @@ -203,7 +203,7 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base

config :use_prepared_statements, :validate => :boolean, :default => false

config :prepared_statement_name, :validate => :string, :default => SecureRandom.hex(40)
config :prepared_statement_name, :validate => :string, :default => ""

config :prepared_statement_bind_values, :validate => :array, :default => []

Expand All @@ -223,17 +223,25 @@ def register
end
end

set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

@enable_encoding = !@charset.nil? || !@columns_charset.empty?

unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end

@statement = ::File.read(@statement_filepath) if @statement_filepath

# must validate prepared statement mode after trying to read in from @statement_filepath
if @use_prepared_statements
validation_errors = validate_prepared_statement_mode
unless validation_errors.empty?
raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", "))
end
end

set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

@enable_encoding = !@charset.nil? || !@columns_charset.empty?

if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
end
Expand Down Expand Up @@ -281,10 +289,23 @@ def stop

private

def validate_prepared_statement_mode
error_messages = []
if @prepared_statement_name.empty?
error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session"
end
if @statement.count("?") != @prepared_statement_bind_values.size
# mismatch in number of bind value elements to placeholder characters
error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements"
end
if @jdbc_paging_enabled
# Pagination is not supported when using prepared statements
error_messages << "JDBC pagination cannot be used at this time"
end
error_messages
end

def execute_query(queue)
# update default parameters
# @parameters['sql_last_value'] = @value_tracker.value
# execute_statement(@statement, @parameters) do |row|
execute_statement do |row|
if enable_encoding?
## do the necessary conversions to string elements
Expand Down
11 changes: 8 additions & 3 deletions lib/logstash/plugin_mixins/jdbc/checked_count_logger.rb
Expand Up @@ -9,9 +9,14 @@ def initialize(logger)
@in_debug = @logger.debug?
end

def log_statement_parameters(query, statement, parameters)
def disable_count
@needs_check = false
@count_is_supported = false
end

def log_statement_parameters(statement, parameters, query)
return unless @in_debug
check_count_query(query) if @needs_check
check_count_query(query) if @needs_check && query
if @count_is_supported
@logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => execute_count(query))
else
Expand All @@ -35,4 +40,4 @@ def execute_count(query)
query.count
end
end
end end end
end end end
11 changes: 7 additions & 4 deletions lib/logstash/plugin_mixins/jdbc/statement_handler.rb
Expand Up @@ -49,7 +49,7 @@ def perform_query(db, sql_last_value)
def build_query(db, sql_last_value)
parameters[:sql_last_value] = sql_last_value
query = db[statement, parameters]
statement_logger.log_statement_parameters(query, statement, parameters)
statement_logger.log_statement_parameters(statement, parameters, query)
query
end

Expand Down Expand Up @@ -84,24 +84,27 @@ def perform_query(db, sql_last_value)
private

def build_query(db, sql_last_value)
# don't log statement count when using prepared statements for now...
# needs enhancement to allow user to supply a bindable count prepared statement in settings.
@parameters = create_bind_values_hash
if statement_prepared.false?
prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym}
@prepared = db[statement, *prepended].prepare(:select, name)
statement_prepared.make_true
end
# under the scheduler the database is recreated each time
# under the scheduler the Sequel database instance is recreated each time
# so the previous prepared statements are lost, add back
if db.prepared_statement(name).nil?
db.set_prepared_statement(name, prepared)
end
bind_value_sql_last_value(sql_last_value)
statement_logger.log_statement_parameters(statement, parameters, nil)
db.call(name, parameters)
end

def post_init(plugin)
# don't log statement count when using prepared statements for now...
# needs enhancement to allow user to supply a bindable count prepared statement in settings.
@statement_logger.disable_count

@name = plugin.prepared_statement_name.to_sym
@bind_values_array = plugin.prepared_statement_bind_values
@parameters = plugin.parameters
Expand Down
183 changes: 115 additions & 68 deletions spec/inputs/jdbc_spec.rb
Expand Up @@ -11,10 +11,6 @@
require "time"
require "date"

# ENV["LOG_AT"].tap do |level|
# LogStash::Logging::Logger::configure_logging(level) unless level.nil?
# end

# We do not need to set TZ env var anymore because we can have 'Sequel.application_timezone' set to utc by default now.

describe LogStash::Inputs::Jdbc do
Expand Down Expand Up @@ -1316,88 +1312,139 @@
end

context "when using prepared statements" do
before do
::File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
num_rows.times do |n|
db[:test_table].insert(:num => n.succ, :string => SecureRandom.hex(8), :custom_time => Time.now.utc, :created_at => Time.now.utc)
end
plugin.register
end

after do
plugin.stop
end
let(:last_run_value) { 250 }
let(:expected_queue_size) { 100 }
let(:num_rows) { 1000 }

context "without bind paramters" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT 100 ROWS ONLY",
"prepared_statement_bind_values" => [],
"prepared_statement_name" => "pstmt_test_without",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname
}
context "check validation" do
context "with an empty name setting" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT ? ROWS ONLY",
"prepared_statement_bind_values" => [100],
"use_prepared_statements" => true,
}
end

it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end

it "should fetch some rows" do
plugin.run(queue)
context "with an mismatched placeholder vs bind values" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT ? ROWS ONLY",
"prepared_statement_bind_values" => [],
"use_prepared_statements" => true,
}
end

expect(queue.size).to eq(expected_queue_size)
expect(YAML.load(File.read(settings["last_run_metadata_path"]))).to eq(expected_queue_size)
it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end

context "with jdbc paging enabled" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT 100 ROWS ONLY",
"prepared_statement_bind_values" => [],
"prepared_statement_name" => "pstmt_test_without",
"use_prepared_statements" => true,
"jdbc_paging_enabled" => true,
"jdbc_page_size" => 20,
"jdbc_fetch_size" => 10
}
end

it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end

end

context "with jdbc paging enabled" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT 100 ROWS ONLY",
"prepared_statement_bind_values" => [],
"prepared_statement_name" => "pstmt_test_without",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname,
"jdbc_paging_enabled" => true,
"jdbc_page_size" => 20,
"jdbc_fetch_size" => 10
}
context "and no validation failures" do
before do
::File.write(settings["last_run_metadata_path"], YAML.dump(last_run_value))
num_rows.times do |n|
db[:test_table].insert(:num => n.succ, :string => SecureRandom.hex(8), :custom_time => Time.now.utc, :created_at => Time.now.utc)
end
end

it "should fetch some rows" do
plugin.run(queue)
after do
plugin.stop
end

expect(queue.size).to eq(expected_queue_size)
expect(YAML.load(File.read(settings["last_run_metadata_path"]))).to eq(expected_queue_size)
context "with jdbc paging enabled" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT 100 ROWS ONLY",
"prepared_statement_bind_values" => [],
"prepared_statement_name" => "pstmt_test_without",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname,
"jdbc_paging_enabled" => true,
"jdbc_page_size" => 20,
"jdbc_fetch_size" => 10
}
end

it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end
end

context "with bind parameters" do
let(:settings) do
{
# Sadly, postgres does but derby doesn't - It is not allowed for both operands of '+' to be ? parameters.: PREPARE pstmt_test: SELECT * FROM test_table WHERE (num > ?) AND (num <= ? + ?) ORDER BY num
"statement" => "SELECT * FROM test_table WHERE (num > ?) ORDER BY num FETCH NEXT ? ROWS ONLY",
"prepared_statement_bind_values" => [":sql_last_value", expected_queue_size],
"prepared_statement_name" => "pstmt_test_with",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname
}
context "without placeholder and bind parameters" do
let(:settings) do
{
"statement" => "SELECT * FROM test_table ORDER BY num FETCH NEXT 100 ROWS ONLY",
"prepared_statement_bind_values" => [],
"prepared_statement_name" => "pstmt_test_without",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname
}
end

it "should fetch some rows" do
plugin.register
plugin.run(queue)

expect(queue.size).to eq(expected_queue_size)
expect(YAML.load(File.read(settings["last_run_metadata_path"]))).to eq(expected_queue_size)
end
end

it "should fetch some rows" do
plugin.run(queue)

expect(queue.size).to eq(expected_queue_size)
expect(YAML.load(File.read(settings["last_run_metadata_path"]))).to eq(last_run_value + expected_queue_size)
context "with bind parameters" do
let(:settings) do
{
# Sadly, postgres does but derby doesn't - It is not allowed for both operands of '+' to be ? parameters.: PREPARE pstmt_test: SELECT * FROM test_table WHERE (num > ?) AND (num <= ? + ?) ORDER BY num
"statement" => "SELECT * FROM test_table WHERE (num > ?) ORDER BY num FETCH NEXT ? ROWS ONLY",
"prepared_statement_bind_values" => [":sql_last_value", expected_queue_size],
"prepared_statement_name" => "pstmt_test_with",
"use_prepared_statements" => true,
"tracking_column_type" => "numeric",
"tracking_column" => "num",
"use_column_value" => true,
"last_run_metadata_path" => Stud::Temporary.pathname
}
end

it "should fetch some rows" do
plugin.register
plugin.run(queue)

expect(queue.size).to eq(expected_queue_size)
expect(YAML.load(File.read(settings["last_run_metadata_path"]))).to eq(last_run_value + expected_queue_size)
end
end
end
end
Expand Down

0 comments on commit ec8600b

Please sign in to comment.