Permalink
Browse files

simplify the log receiving app

  • Loading branch information...
svenfuchs committed Jan 29, 2013
1 parent b0a6a9a commit d96b8e500066feef7866462491118cf1c77ce474
View
@@ -15,7 +15,7 @@ GIT
GIT
remote: git://github.com/travis-ci/travis-core.git
- revision: 987188a5f96f5d7e4f6a6e314ab402f3fbad0963
+ revision: b8d0d63f433c8e63472b97b3c287b99d31027ecb
specs:
travis-core (0.0.1)
actionmailer (~> 3.2.11)
View
@@ -1,6 +1,6 @@
#! /usr/bin/env ruby
-$: << File.expand_path('../../lib', __FILE__)
+$: << 'lib'
require 'bundler/setup'
require 'travis/logs/aggregate'
View
@@ -1,10 +1,15 @@
#! /usr/bin/env ruby
-$: << File.expand_path('../../lib', __FILE__)
+$: << 'lib'
require 'bundler/setup'
-require 'travis/logs/app'
+require 'travis/logs/receive'
+require 'core_ext/module/load_constants'
-app = Travis::Logs::App.new
-app.start
+[Travis::Logs::Receive, Travis].each do |target|
+ target.load_constants!(skip: [/::AssociationCollection$/])
+end
+app = Travis::Logs::Receive.new
+app.setup
+app.run
View
@@ -1,116 +0,0 @@
-require 'multi_json'
-
-require 'travis'
-require 'travis/support'
-require 'core_ext/module/load_constants'
-require 'timeout'
-require 'sidekiq'
-
-$stdout.sync = true
-
-require 'travis/task'
-
-module Travis
- module Logs
- autoload :Handler, 'travis/logs/handler'
-
- def self.queue_number
- ENV['LOGS_QUEUE']
- end
-
- def self.queue_name(shard = nil)
- number = queue_number
-
- name = 'logs'
- name = "#{name}#{number}" if number
- name = "#{name}.#{shard}" if shard
- name
- end
-
- class App
- extend Exceptions::Handling
- include Logging
-
- def start
- preload_constants
- setup
- subscribe
- end
-
- private
-
- def preload_constants
- [Travis::Logs, Travis].each do |target|
- target.load_constants!(skip: [/::AssociationCollection$/])
- end
- end
-
- def setup
- Travis::Async.enabled = true
- Travis::Amqp.config = Travis.config.amqp
- Travis::Addons::Pusher::Task.run_local = true # don't pipe log updates through travis_tasks
-
- Travis::Features.start
- Travis::Database.connect
- Travis::Exceptions::Reporter.start
- Travis::Notification.setup
- Travis::Addons.register
-
- Travis::LogSubscriber::ActiveRecordMetrics.attach
-
- Travis::Memory.new(:logs).report_periodically if Travis.env == 'production'
-
- NewRelic.start if File.exists?('config/newrelic.yml')
- end
-
- def subscribe
- info 'Subscribing to amqp ...'
- info "Subscribing to reporting.jobs.#{queue_name}"
-
- Travis::Amqp::Consumer.jobs(queue_name).subscribe(ack: true, declare: true) do |msg, payload|
- receive(:route, msg, payload)
- end
-
- 0.upto(Travis.config.logs.shards - 1).each do |shard|
- name = queue_name(shard)
- info "Subscribing to reporting.jobs.#{name}"
- Travis::Amqp::Consumer.jobs(name).subscribe(ack: true, declare: true) do |msg, payload|
- receive(:log, msg, payload)
- end
- end
- end
-
- def receive(type, message, payload)
- return unless payload = decode(payload)
- Travis.uuid = payload['uuid']
- handle(type, payload)
- rescue Exception => e
- puts "!!!FAILSAFE!!! #{e.message}", e.backtrace
- ensure
- message.ack
- end
-
- def handle(type, payload)
- timeout do
- Travis::Logs::Handler.handle(type, payload)
- end
- end
- rescues :handle, from: Exception unless Travis.env == 'test'
-
- def timeout(&block)
- Timeout::timeout(60, &block)
- end
-
- def decode(payload)
- MultiJson.decode(payload)
- rescue StandardError => e
- error "[#{Thread.current.object_id}] [decode error] payload could not be decoded with engine #{MultiJson.engine.to_s} (#{e.message}): #{payload.inspect}"
- nil
- end
-
- def queue_name(shard = nil)
- Travis::Logs.queue_name(shard)
- end
- end
- end
-end
View
@@ -1,29 +0,0 @@
-module Travis
- module Logs
- class Handler
- autoload :Log, 'travis/logs/handler/log'
- autoload :Route, 'travis/logs/handler/route'
-
- class << self
- def handle(type, payload)
- payload = { 'data' => payload } unless payload.key?('data') # TODO happens when message comes directly from the worker
- handler(type).new(payload).handle
- end
-
- def handler(type)
- const_get(type.to_s.camelize)
- end
- end
-
- include Logging
- extend Instrumentation, NewRelic
-
- attr_accessor :payload
-
- def initialize(payload)
- @payload = payload
- end
- end
- end
-end
-
@@ -1,23 +0,0 @@
-require 'travis/logs/services/append'
-
-module Travis
- module Logs
- class Handler
- class Log < Handler
- def handle
- # info "#{Thread.current.object_id} handling log update for job #{data['id']}" unless Travis.env == 'production'
- Travis.run_service(:logs_append, data: data)
- # info "#{Thread.current.object_id} done handling log update for job #{data['id']}: #{data['log'].to_s.bytesize} bytes" # unless Travis.env == 'production'
- end
- instrument :handle
- new_relic :handle
-
- def data
- payload['data']
- end
-
- # Travis::Logs::Instrument::Handler::Log.attach_to(self)
- end
- end
- end
-end
@@ -1,30 +0,0 @@
-require 'string_cleaner'
-
-module Travis
- module Logs
- class Handler
- class Route < Handler
- def handle
- publisher.publish(payload)
- end
- instrument :handle
- new_relic :handle
-
- def publisher
- # info "routing job-#{job_id} to: reporting.jobs.logs.#{shard}" unless Travis.env == 'production'
- Travis::Amqp::Publisher.jobs(Travis::Logs.queue_name(shard))
- end
-
- def shard
- job_id % Travis.config.logs.shards
- end
-
- def job_id
- payload['data']['id'].to_i
- end
-
- # Travis::Logs::Instrument::Handler::Log.attach_to(self)
- end
- end
- end
-end
@@ -1,16 +0,0 @@
-module Travis
- module Logs
- class Instrument
- class Handler < Travis::Notification::Instrument
- def log_completed
- # publish(
- # msg: %(#{target.class.name}#log for #<Job id="#{target.payload['id']}">),
- # event: target.event,
- # payload: target.payload
- # )
- end
- end
- end
- end
-end
-
View
@@ -0,0 +1,65 @@
+require 'multi_json'
+
+require 'travis'
+require 'core_ext/module/load_constants'
+require 'timeout'
+require 'sidekiq'
+
+$stdout.sync = true
+
+module Travis
+ module Logs
+ class Receive
+ autoload :Queue, 'travis/logs/receive/queue'
+
+ def setup
+ Travis::Async.enabled = true
+ Travis::Amqp.config = Travis.config.amqp
+ Travis::Addons::Pusher::Task.run_local = true # don't pipe log updates through travis_tasks
+
+ Travis::Database.connect
+ Travis::Exceptions::Reporter.start
+ Travis::Notification.setup
+ Travis::Addons.register
+
+ Travis::LogSubscriber::ActiveRecordMetrics.attach
+ Travis::Memory.new(:logs).report_periodically if Travis.env == 'production'
+ end
+
+ def run
+ Queue.subscribe(queue_name, &method(:route))
+ 0.upto(shards).each do |shard|
+ Queue.subscribe(queue_name(shard), &method(:receive))
+ end
+ end
+
+ private
+
+ def route(payload)
+ shard = payload['id'].to_i % shards
+ queue = queue_name(shard)
+ payload.update(uuid: Travis.uuid)
+ Travis::Amqp::Publisher.jobs(queue).publish(payload)
+ end
+
+ def receive(payload)
+ Travis.run_service(:logs_append, data: payload)
+ end
+
+ def queue_name(shard = nil)
+ name = ['logs']
+ name << number if number
+ name << ".#{shard}" if shard
+ name.join
+ end
+
+ def shards
+ Travis.config.logs.shards - 1
+ end
+
+ def number
+ ENV['LOGS_QUEUE']
+ end
+ end
+ end
+end
@@ -0,0 +1,57 @@
+require 'coder'
+
+module Travis
+ module Logs
+ class Receive
+ class Queue
+ include Logging
+
+ def self.subscribe(name, &handler)
+ new(name, &handler).subscribe
+ end
+
+ attr_reader :name, :handler
+
+ def initialize(name, &handler)
+ @name = name
+ @handler = handler
+ end
+
+ def subscribe
+ Travis::Amqp::Consumer.jobs(name).subscribe(:ack => true, declare: true, &method(:receive))
+ end
+
+ private
+
+ def receive(message, payload)
+ failsafe(message, payload) do
+ payload = decode(payload) || raise("no payload #{message.inspect}")
+ Travis.uuid = payload.delete('uuid')
+ handler.call(payload)
+ end
+ end
+
+ def failsafe(message, payload, options = {}, &block)
+ Timeout::timeout(options[:timeout] || 60, &block)
+ rescue Exception => e
+ begin
+ puts e.message, e.backtrace
+ Travis::Exceptions.handle(Hub::Error.new(message.properties.type, payload, e))
+ rescue Exception => e
+ puts "!!!FAILSAFE!!! #{e.message}", e.backtrace
+ end
+ ensure
+ message.ack
+ end
+
+ def decode(payload)
+ cleaned = Coder.clean(payload)
+ MultiJson.decode(cleaned)
+ rescue StandardError => e
+ error "[decode error] payload could not be decoded with engine #{MultiJson.engine.to_s}: #{e.inspect} #{payload.inspect}"
+ nil
+ end
+ end
+ end
+ end
+end
@@ -1,16 +1,13 @@
+# Do not use. This was used to archive the backlog of logs.
+# For day to day archiving travis-tasks should be used.
+
require 'travis'
require 'sidekiq'
require 'core_ext/hash/deep_symbolize_keys'
-# require 'core_ext/module/load_constants'
Travis::Database.connect
ActiveRecord::Base.logger.level = Logger::ERROR
Travis::Notification.setup
-# Travis::Exceptions::Reporter.start
-
-# Sidekiq::Logging.logger.formatter = ->(level, _, _, msg) do
-# "TID-#{Thread.current.object_id.to_s(36)} #{level}: #{msg}\n"
-# end
Sidekiq.configure_server do |c|
c.redis = { url: Travis.config.redis.url }
Oops, something went wrong.

0 comments on commit d96b8e5

Please sign in to comment.