Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt_proxy: Execute retry_connect in main thread to prevent updating @_retrying race condition #22

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions lib/fluent/plugin/mqtt_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,17 @@ def increment_retry_interval
@retry_interval = @retry_interval * @retry_inc_ratio
end

def retry_connect(e, message)
def handle_retrying_connection_with_a_lag(e, message)
log.error "#{message},#{e.class},#{e.message}\n#{e.backtrace.join("\n")}"
log.error "Retry in #{@retry_interval} sec"
timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval,
repeat: false) do
retry_connect
end
end

def retry_connect
if !@_retrying
log.error "#{message},#{e.class},#{e.message}"
log.error "Retry in #{@retry_interval} sec"
timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval, repeat: false, &method(:connect))
@_retrying = true
increment_retry_interval
Expand Down Expand Up @@ -119,16 +126,16 @@ def rescue_disconnection
# cannot catch MQTT::ProtocolException raised from @read_thread
# in ruby-mqtt. So, the current version uses plugin local thread
# @connect_thread to catch it.
retry_connect(e, "Protocol error occurs in #{current_plugin_name}.")
handle_retrying_connection_with_a_lag(e, "Protocol error occurs in #{current_plugin_name}.")
raise MqttError, "Protocol error occurs."
rescue Timeout::Error => e
retry_connect(e, "Timeout error occurs in #{current_plugin_name}.")
handle_retrying_connection_with_a_lag(e, "Timeout error occurs in #{current_plugin_name}.")
raise Timeout::Error, "Timeout error occurs."
rescue SystemCallError => e
retry_connect(e, "System call error occurs in #{current_plugin_name}.")
handle_retrying_connection_with_a_lag(e, "System call error occurs in #{current_plugin_name}.")
raise SystemCallError, "System call error occurs."
rescue StandardError=> e
retry_connect(e, "The other error occurs in #{current_plugin_name}.")
handle_retrying_connection_with_a_lag(e, "The other error occurs in #{current_plugin_name}.")
raise StandardError, "The other error occurs."
rescue MQTT::NotConnectedException=> e
# Since MQTT::NotConnectedException is raised only on publish,
Expand Down