Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy_user, proxy_password and move IOError (EOFError) to its own rescue clause #59

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 3.0.9
- Add proxy_user and proxy_password. Fixes [#58](https://github.com/logstash-plugins/logstash-input-twitter/issues/58)
- Rescue IOError and EOFError as separate with INFO logging and configuarable retry.
Fixes [#39](https://github.com/logstash-plugins/logstash-input-twitter/issues/39)

## 3.0.8
- Docs: Set the default_codec doc attribute.

Expand Down Expand Up @@ -44,7 +49,7 @@
- Fixes #22 #21 #20 #11 #9

## 2.0.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0

Expand Down
46 changes: 33 additions & 13 deletions lib/logstash/inputs/twitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/json"
require "stud/interval"
require "logstash/inputs/twitter/patches"
require "uri"

# Ingest events from the Twitter Streaming API.
class LogStash::Inputs::Twitter < LogStash::Inputs::Base
Expand Down Expand Up @@ -50,14 +51,14 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
config :oauth_token_secret, :validate => :password, :required => true

# Any keywords to track in the Twitter stream. For multiple keywords, use
# the syntax ["foo", "bar"]. There's a logical OR between each keyword
# the syntax ["foo", "bar"]. There's a logical OR between each keyword
# string listed and a logical AND between words separated by spaces per
# keyword string.
# See https://dev.twitter.com/streaming/overview/request-parameters#track
# See https://dev.twitter.com/streaming/overview/request-parameters#track
# for more details.
#
# The wildcard "*" option is not supported. To ingest a sample stream of
# all tweets, the use_samples option is recommended.
# The wildcard "*" option is not supported. To ingest a sample stream of
# all tweets, the use_samples option is recommended.
config :keywords, :validate => :array

# Record full tweet object as given to us by the Twitter Streaming API.
Expand All @@ -76,13 +77,13 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
config :locations, :validate => :string

# A list of BCP 47 language identifiers corresponding to any of the languages listed
# on Twitter’s advanced search page will only return tweets that have been detected
# on Twitter’s advanced search page will only return tweets that have been detected
# as being written in the specified languages.
config :languages, :validate => :array

# Returns a small random sample of all public statuses. The tweets returned
# by the default access level are the same, so if two different clients connect
# to this endpoint, they will see the same tweets. If set to true, the keywords,
# to this endpoint, they will see the same tweets. If set to true, the keywords,
# follows, locations, and languages options will be ignored. Default => false
config :use_samples, :validate => :boolean, :default => false

Expand All @@ -98,11 +99,24 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
# Port where the proxy is listening, by default 3128 (squid)
config :proxy_port, :validate => :number, :default => 3128

# Username where the proxy is listening, by default 3128 (squid)
config :proxy_port, :validate => :number, :default => 3128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate config item here


# Proxy user
config :proxy_user, :validate => :string

# Proxy user password
config :proxy_password, :validate => :password

# Duration in seconds to wait before retrying a connection when twitter responds with a 429 TooManyRequests
# In some cases the 'x-rate-limit-reset' header is not set in the response and <error>.rate_limit.reset_in
# is nil. If this occurs then we use the integer specified here. The default is 5 minutes.
config :rate_limit_reset_in, :validate => :number, :default => 300


# Duration in seconds to wait before retrying when Twitter client socket raises a EOF on network interruption
config :steam_eof_wait_duration, :validate => :number, :default => 30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just implement an exponential backoff with no need for more configuration items?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, if only we had a exponential backoff widget.
I will add the backoff.


def register
require "twitter"

Expand Down Expand Up @@ -145,6 +159,10 @@ def run(queue)
@logger.warn("Twitter too many requests error, sleeping for #{sleep_for}s")
Stud.stoppable_sleep(sleep_for) { stop? }
retry
rescue IOError => e
@logger.info("Twitter error: #{e.message}, retry in #{steam_eof_wait_duration}s")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe @logger.error here?

Suggested change
@logger.info("Twitter error: #{e.message}, retry in #{steam_eof_wait_duration}s")
@logger.error("Socket error, retrying in #{steam_eof_wait_duration}s", :error => e.message)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose INFO because it is not an error per se. When the user sees the ERROR log line they panic as if there is something that they need to do/fix. The message can be more INFO and not ERROR in its wording.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a good compromise with warn? :D Since that the ocurrence of this event means that there's no tweet ingestion I think we should do > info

Stud.stoppable_sleep(steam_eof_wait_duration) { stop? }
retry
rescue => e
# if a lot of these errors begin to occur, the repeated retry will result in TooManyRequests errors trapped above.
@logger.warn("Twitter client error", :message => e.message, :exception => e.class.name, :backtrace => e.backtrace, :options => @filter_options)
Expand Down Expand Up @@ -211,9 +229,6 @@ def from_tweet(tweet)
end
end

# Work around bugs in JrJackson. The standard serializer won't work till we upgrade
# event.set("in-reply-to", nil) if event.get("in-reply-to").is_a?(Twitter::NullObject)

event
end

Expand All @@ -223,10 +238,15 @@ def configure(c)
c.access_token = @oauth_token
c.access_token_secret = @oauth_token_secret.value
if @use_proxy
c.proxy = {
proxy_address: @proxy_address,
proxy_port: @proxy_port,
}
uri = URI.parse('')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the twitter client docs suggest you could just place the host and port into the hash as we're doing before, what is the need for creating the URI object here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread the part in the patch that uses the http gem's Request object. I though it was using the typhoeus gem and that uses a uri.

Still, now that I'm reading the http gem code I'm more confused.

uri.host = @proxy_address
uri.port = @proxy_port
proxy_hash = {:uri => uri}
if @proxy_user && @proxy_password
proxy_hash[:user] = @proxy_user
proxy_hash[:password] = @proxy_password
end
c.proxy = proxy_hash
end
end

Expand Down
4 changes: 2 additions & 2 deletions logstash-input-twitter.gemspec
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-twitter'
s.version = '3.0.8'
s.licenses = ['Apache License (2.0)']
s.version = '3.0.9'
s.licenses = ['Apache-2.0']
s.summary = "Reads events from the Twitter Streaming API"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
Expand Down
Binary file removed spec/fixtures/small_smile.png
Binary file not shown.
86 changes: 0 additions & 86 deletions spec/integration/twitter_pubsub_spec.rb

This file was deleted.

109 changes: 0 additions & 109 deletions spec/integration/twitter_spec.rb

This file was deleted.