Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SnowplowTracker::Emitter: Make the logger configurable via a :logger option #109

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 18 additions & 14 deletions lib/snowplow-tracker/emitters.rb
Expand Up @@ -34,7 +34,8 @@ class Emitter
:buffer_size => Maybe[Num],
:on_success => Maybe[Func[Num => Any]],
:on_failure => Maybe[Func[Num, Hash => Any]],
:thread_count => Maybe[Num]
:thread_count => Maybe[Num],
:logger => Maybe[Logger]
})

@@StrictConfigHash = And[@@ConfigHash, lambda { |x|
Expand All @@ -46,6 +47,8 @@ class Emitter
:method => 'get'
}

attr_reader :logger

Contract String, @@StrictConfigHash => lambda { |x| x.is_a? Emitter }
def initialize(endpoint, config={})
config = @@DefaultConfig.merge(config)
Expand All @@ -62,7 +65,8 @@ def initialize(endpoint, config={})
@method = config[:method]
@on_success = config[:on_success]
@on_failure = config[:on_failure]
LOGGER.info("#{self.class} initialized with endpoint #{@collector_uri}")
@logger = config[:logger] || LOGGER
logger.info("#{self.class} initialized with endpoint #{@collector_uri}")

self
end
Expand Down Expand Up @@ -108,10 +112,10 @@ def flush(async=true)
Contract ArrayOf[Hash] => nil
def send_requests(evts)
if evts.size < 1
LOGGER.info("Skipping sending events since buffer is empty")
logger.info("Skipping sending events since buffer is empty")
return
end
LOGGER.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}")
logger.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}")

evts.each do |event|
event['stm'] = (Time.now.to_f * 1000).to_i.to_s # add the sent timestamp, overwrite if already exists
Expand All @@ -126,7 +130,7 @@ def send_requests(evts)
).to_json)
post_succeeded = is_good_status_code(request.code)
rescue StandardError => se
LOGGER.warn(se)
logger.warn(se)
end
if post_succeeded
unless @on_success.nil?
Expand All @@ -147,7 +151,7 @@ def send_requests(evts)
request = http_get(evt)
get_succeeded = is_good_status_code(request.code)
rescue StandardError => se
LOGGER.warn(se)
logger.warn(se)
end
if get_succeeded
success_count += 1
Expand All @@ -174,15 +178,15 @@ def send_requests(evts)
Contract Hash => lambda { |x| x.is_a? Net::HTTPResponse }
def http_get(payload)
destination = URI(@collector_uri + '?' + URI.encode_www_form(payload))
LOGGER.info("Sending GET request to #{@collector_uri}...")
LOGGER.debug("Payload: #{payload}")
logger.info("Sending GET request to #{@collector_uri}...")
logger.debug("Payload: #{payload}")
http = Net::HTTP.new(destination.host, destination.port)
request = Net::HTTP::Get.new(destination.request_uri)
if destination.scheme == 'https'
http.use_ssl = true
end
response = http.request(request)
LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
logger.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
"GET request to #{@collector_uri} finished with status code #{response.code}"
}

Expand All @@ -193,8 +197,8 @@ def http_get(payload)
#
Contract Hash => lambda { |x| x.is_a? Net::HTTPResponse }
def http_post(payload)
LOGGER.info("Sending POST request to #{@collector_uri}...")
LOGGER.debug("Payload: #{payload}")
logger.info("Sending POST request to #{@collector_uri}...")
logger.debug("Payload: #{payload}")
destination = URI(@collector_uri)
http = Net::HTTP.new(destination.host, destination.port)
request = Net::HTTP::Post.new(destination.request_uri)
Expand All @@ -204,7 +208,7 @@ def http_post(payload)
request.body = payload.to_json
request.set_content_type('application/json; charset=utf-8')
response = http.request(request)
LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
logger.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
"POST request to #{@collector_uri} finished with status code #{response.code}"
}

Expand Down Expand Up @@ -266,10 +270,10 @@ def flush(async=true)
@buffer = []
end
if not async
LOGGER.info('Starting synchronous flush')
logger.info('Starting synchronous flush')
@queue.synchronize do
@all_processed_condition.wait_while { @results_unprocessed > 0 }
LOGGER.info('Finished synchronous flush')
logger.info('Finished synchronous flush')
end
end
break if @buffer.size < 1
Expand Down