diff --git a/.gitignore b/.gitignore index 60ce02e15..c97e90092 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,3 @@ log/* *.class .rbenv-version .rbenv-gemsets -bin/ diff --git a/Gemfile b/Gemfile index e3b03cc57..02d32aa1b 100644 --- a/Gemfile +++ b/Gemfile @@ -21,6 +21,7 @@ gem 'sidekiq' gem 'hot_bunnies', '~> 1.4.0.pre4' gem 'jruby-openssl', '~> 0.7.7' +# see http://www.ruby-forum.com/topic/4409725 gem 'activerecord-jdbcpostgresql-adapter', '= 1.2.2.1' gem 'coder', github: 'rkh/coder' diff --git a/Gemfile.lock b/Gemfile.lock index 4b575ff86..61816885b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -42,7 +42,7 @@ GIT GIT remote: git://github.com/travis-ci/travis-core.git - revision: 871129e3b8002b8c6299ad0dd04e74a7632fc148 + revision: e428d0b42fad2834ad187c9b3860bb77ec7d13c9 specs: travis-core (0.0.1) actionmailer (~> 3.2.11) @@ -71,7 +71,7 @@ GIT GIT remote: git://github.com/travis-ci/travis-support.git - revision: cf916e10949db43ce6f2b6f86082b367f04acfcd + revision: ff712aca1083a588974f835a84c574e6976aeb29 specs: travis-support (0.0.1) diff --git a/Procfile b/Procfile index 17f45bc8e..37fad621c 100644 --- a/Procfile +++ b/Procfile @@ -1 +1 @@ -hub: bin/thor travis:hub:start +hub: ./bin/hub diff --git a/Thorfile b/Thorfile deleted file mode 100644 index a099ce012..000000000 --- a/Thorfile +++ /dev/null @@ -1,6 +0,0 @@ -$: << File.expand_path('../lib', __FILE__) -$stdout.sync = true - -require 'bundler/setup' -require 'travis/hub/cli' - diff --git a/bin/hub b/bin/hub new file mode 100755 index 000000000..367f3ee05 --- /dev/null +++ b/bin/hub @@ -0,0 +1,18 @@ +#! /usr/bin/env ruby + +$stdout.sync = true + +$: << 'lib' + +require 'bundler/setup' +require 'travis/hub' +require 'core_ext/module/load_constants' + +[Travis::Hub, Travis].each do |target| + target.load_constants!(:skip => [/::AssociationCollection$/], debug: false) +end + +app = Travis::Hub.new +app.setup +app.run + diff --git a/lib/travis/hub.rb b/lib/travis/hub.rb index 8224fe701..27ff65907 100644 --- a/lib/travis/hub.rb +++ b/lib/travis/hub.rb @@ -1,68 +1,60 @@ require 'multi_json' -require 'benchmark' -require 'active_support/core_ext/float/rounding' -require 'core_ext/kernel/run_periodically' -require 'core_ext/hash/compact' require 'travis' -require 'travis/support' +require 'core_ext/kernel/run_periodically' $stdout.sync = true module Travis class Hub - autoload :Handler, 'travis/hub/handler' - autoload :Instrument, 'travis/hub/instrument' - autoload :Error, 'travis/hub/error' - autoload :Queues, 'travis/hub/queues' - - include Logging + autoload :Error, 'travis/hub/error' + autoload :Queue, 'travis/hub/queue' - class << self - def start - setup - prune_workers - enqueue_jobs + extend Instrumentation - Travis::Hub::Queues.subscribe - end - - protected + def setup + Travis::Async.enabled = true + Travis::Amqp.config = Travis.config.amqp - def setup - Travis::Features.start + Travis::Database.connect + Travis::Async::Sidekiq.setup(Travis.config.redis.url, Travis.config.sidekiq) - Travis::Async.enabled = true - Travis::Amqp.config = Travis.config.amqp - GH::DefaultStack.options[:ssl] = Travis.config.ssl + Travis::Exceptions::Reporter.start + Travis::Notification.setup + Travis::Addons.register - Travis.config.update_periodically - Travis::Memory.new(:hub).report_periodically if Travis.env == 'production' + Travis::Memory.new(:hub).report_periodically if Travis.env == 'production' + NewRelic.start if File.exists?('config/newrelic.yml') - Travis::Exceptions::Reporter.start - Travis::Notification.setup - Travis::Addons.register + # do we still need these in hub? + # Travis::Mailer.setup + # GH::DefaultStack.options[:ssl] = Travis.config.ssl + end - Travis::Database.connect - Travis::Mailer.setup - Travis::Async::Sidekiq.setup(Travis.config.redis.url, Travis.config.sidekiq) + def run + enqueue_jobs + Queue.subscribe(&method(:handle)) + end - NewRelic.start if File.exists?('config/newrelic.yml') - end + private - def prune_workers - run_periodically(Travis.config.workers.prune.interval, &::Worker.method(:prune)) + def handle(event, payload) + ActiveRecord::Base.cache do + Travis.run_service(:update_job, event: event.to_s.split(':').last, data: payload) end + end - def enqueue_jobs - run_periodically(Travis.config.queue.interval) do - Travis.run_service(:enqueue_jobs) unless Travis::Features.feature_active?(:travis_enqueue) - end + def enqueue_jobs + run_periodically(Travis.config.queue.interval) do + Travis.run_service(:enqueue_jobs) unless Travis::Features.feature_active?(:travis_enqueue) end + end - # def cleanup_jobs - # run_periodically(Travis.config.jobs.retry.interval, &::Job.method(:cleanup)) - # end - end + # class Instrument < Travis::Notification::Instrument + # def update_completed + # publish(msg: %(for #), event: target.event, payload: target.payload) + # end + # end + # Instrument.attach_to(self) end end diff --git a/lib/travis/hub/cli.rb b/lib/travis/hub/cli.rb deleted file mode 100644 index 95d316e73..000000000 --- a/lib/travis/hub/cli.rb +++ /dev/null @@ -1,30 +0,0 @@ -require 'bundler/setup' -require 'travis/hub' - -$stdout.sync = true - -module Travis - module Cli - class Thor < ::Thor - namespace 'travis:hub' - - desc 'start', 'Consume AMQP messages from the worker' - def start - ENV['ENV'] || 'development' - preload_constants! - Travis::Hub.start - end - - protected - - def preload_constants! - require 'core_ext/module/load_constants' - require 'travis' - - [Travis::Hub, Travis].each do |target| - target.load_constants!(:skip => [/::AssociationCollection$/], :debug => true) - end - end - end - end -end diff --git a/lib/travis/hub/handler.rb b/lib/travis/hub/handler.rb deleted file mode 100644 index e71a8bceb..000000000 --- a/lib/travis/hub/handler.rb +++ /dev/null @@ -1,52 +0,0 @@ -module Travis - class Hub - class Handler - autoload :Job, 'travis/hub/handler/job' - - include Logging - extend Instrumentation, NewRelic - - class << self - def handle(event, payload) - self.for(event, payload).handle - end - - def for(event, payload) - case event_type(event, payload) - when /^request/ - Handler::Request.new(event, payload) - when /^job|state/ - Handler::Job.new(event, payload) - when /^worker/ - Handler::Worker.new(event, payload) - when /^sync/ - Handler::Sync.new(event, payload) - else - raise "Unknown message type: #{event.inspect}" - end - end - - def event_type(event, payload) - (event || extract_event(payload)).to_s - end - - def extract_event(payload) - warn "Had to extract event from payload: #{payload.inspect}" - case payload['type'] - when 'pull_request', 'push' - 'request' - else - payload['type'] - end - end - end - - attr_accessor :event, :payload - - def initialize(event, payload) - @event = event - @payload = payload - end - end - end -end diff --git a/lib/travis/hub/handler/job.rb b/lib/travis/hub/handler/job.rb deleted file mode 100644 index 400310120..000000000 --- a/lib/travis/hub/handler/job.rb +++ /dev/null @@ -1,46 +0,0 @@ -require 'travis/logs/services/append' - -module Travis - class Hub - class Handler - # Handles updates from test jobs running on the worker, i.e. events - # like job:test:started, job:test:log and job:test:finished - class Job < Handler - def handle - case event - when 'job:test:log' - log - else - update - end - end - - protected - - def job - @job ||= ::Job.find(payload['id']) - end - - def update - Travis.run_service(:update_job, event: event.to_s.split(':').last, data: payload) - end - instrument :update - new_relic :update - - def log - if Travis::Features.feature_active?(:travis_logs) - # TODO hot compat, remove once workers publish to "reporting.jobs.logs" directly - publisher = Travis::Amqp::Publisher.jobs('logs') - publisher.publish(:data => payload, :uuid => Travis.uuid) - else - Travis.run_service(:logs_append, data: payload) - end - end - instrument :log - new_relic :log - - Travis::Hub::Instrument::Handler::Job.attach_to(self) - end - end - end -end diff --git a/lib/travis/hub/instrument.rb b/lib/travis/hub/instrument.rb deleted file mode 100644 index bcc8a1242..000000000 --- a/lib/travis/hub/instrument.rb +++ /dev/null @@ -1,25 +0,0 @@ -module Travis - class Hub - class Instrument - module Handler - class Job < Travis::Notification::Instrument - def update_completed - publish( - :msg => %(for #), - :event => target.event, - :payload => target.payload - ) - end - - def log_completed - # publish( - # :msg => %(for #), - # :event => target.event, - # :payload => target.payload - # ) - end - end - end - end - end -end diff --git a/lib/travis/hub/metrics.rb b/lib/travis/hub/metrics.rb deleted file mode 100644 index 0d1326dd2..000000000 --- a/lib/travis/hub/metrics.rb +++ /dev/null @@ -1,22 +0,0 @@ -require 'active_support/notifications' -require 'metriks' - -module Travis - class Hub - class Metrics - delegate :subscribe, :to => ActionSupport::Notifications - - def self.setup_subscriptions - new.setup_subscriptions - end - - def setup_subscriptions - subscribe(/(load|http)\.gh/) do |*args| - name, start, ending, transaction_id, payload = *args - time = ending - start - Metriks.timer(name).update(time) - end - end - end - end -end diff --git a/lib/travis/hub/queue.rb b/lib/travis/hub/queue.rb new file mode 100644 index 000000000..ca17ad7db --- /dev/null +++ b/lib/travis/hub/queue.rb @@ -0,0 +1,55 @@ +require 'coder' + +module Travis + class Hub + class Queue + include Logging + + def self.subscribe(&handler) + new(&handler).subscribe + end + + attr_reader :handler + + def initialize(&handler) + @handler = handler + end + + def subscribe + Travis::Amqp::Consumer.jobs('builds').subscribe(:ack => true, &method(:receive)) + end + + private + + def receive(message, payload) + failsafe(message, payload) do + event = message.properties.type + payload = decode(payload) || raise("no payload for #{event.inspect} (#{message.inspect})") + Travis.uuid = payload.delete('uuid') + handler.call(event, payload) + end + end + + def failsafe(message, payload, options = {}, &block) + Timeout::timeout(options[:timeout] || 60, &block) + rescue Exception => e + begin + puts e.message, e.backtrace + Travis::Exceptions.handle(Hub::Error.new(message.properties.type, payload, e)) + rescue Exception => e + puts "!!!FAILSAFE!!! #{e.message}", e.backtrace + end + ensure + message.ack + end + + def decode(payload) + cleaned = Coder.clean(payload) + MultiJson.decode(cleaned) + rescue StandardError => e + error "[decode error] payload could not be decoded with engine #{MultiJson.engine.to_s}: #{e.inspect} #{payload.inspect}" + nil + end + end + end +end diff --git a/lib/travis/hub/queues.rb b/lib/travis/hub/queues.rb deleted file mode 100644 index 728507b62..000000000 --- a/lib/travis/hub/queues.rb +++ /dev/null @@ -1,75 +0,0 @@ -require 'coder' - -module Travis - class Hub - class Queues - include Logging - - def self.subscribe - new.subscribe - end - - def subscribe - info "Subscribing to reporting.jobs.builds" - Travis::Amqp::Consumer.jobs('builds').subscribe(:ack => true, &method(:receive)) - end - - def receive(message, payload) - event = message.properties.type - # TODO move to instrumentation or remove? - debug "[#{Thread.current.object_id}] Handling event #{event.inspect} with payload : #{(payload.size > 160 ? "#{payload[0..160]} ..." : payload)}" - - payload = decode(payload) || raise("no payload for #{event.inspect} (#{message.inspect})") - Travis.uuid = payload.delete('uuid') - - with(:timeout, :benchmarking, :caching) do - Handler.handle(event, payload) if payload - end - - rescue Exception => e - begin - puts e.message, e.backtrace - Travis::Exceptions.handle(Hub::Error.new(event, payload, e)) - rescue Exception => e - puts "!!!FAILSAFE!!! #{e.message}", e.backtrace - end - - ensure - message.ack - end - - protected - - def timeout(&block) - Timeout::timeout(60, &block) - end - - def benchmarking(&block) - timing = Benchmark.realtime(&block) - debug "[#{Thread.current.object_id}] Completed in #{timing.round(4)} seconds" - end - - def caching(&block) - defined?(ActiveRecord) ? ActiveRecord::Base.cache(&block) : block.call - end - - def decode(payload) - cleaned = Coder.clean(payload) - MultiJson.decode(cleaned) - rescue StandardError => e - error "[#{Thread.current.object_id}] [decode error] payload could not be decoded with engine #{MultiJson.engine.to_s} : #{e.inspect}" - nil - end - - def with(*methods, &block) - if methods.size > 1 - head = methods.shift - with(*methods) { send(head, &block) } - else - send(methods.first, &block) - end - end - - end - end -end diff --git a/spec/travis/hub/handler/job_spec.rb b/spec/travis/hub/handler/job_spec.rb deleted file mode 100644 index 28787d530..000000000 --- a/spec/travis/hub/handler/job_spec.rb +++ /dev/null @@ -1,35 +0,0 @@ -require 'spec_helper' - -describe Travis::Hub::Handler::Job do - let(:payload) { {} } - let(:handler) { Travis::Hub::Handler::Job.new(nil, payload) } - let(:publisher) { stub('publisher', :publish => nil) } - - before :each do - Travis::Features.start - end - - describe '#handle' do - it 'updates job attributes on job:test:started' do - Travis.expects(:run_service).with(:update_job, data: payload, event: 'started') - handler.event = 'job:test:started' - handler.handle - end - - it 're-routes the message to reporting.jobs.logs (:travis_logs enabled)' do - Travis::Features.enable_for_all(:travis_logs) - Travis::Amqp::Publisher.expects(:jobs).with('logs').returns(publisher) - publisher.expects(:publish).with(:data => {}, :uuid => Travis.uuid) - handler.event = 'job:test:log' - handler.handle - end - - it 'appends the log on job:test:log (:travis_logs disabled)' do - Travis::Features.disable_for_all(:travis_logs) - Travis.expects(:run_service).with(:logs_append, data: payload) - handler.event = 'job:test:log' - handler.handle - end - end -end - diff --git a/spec/travis/hub/handler_spec.rb b/spec/travis/hub/handler_spec.rb deleted file mode 100644 index a92785289..000000000 --- a/spec/travis/hub/handler_spec.rb +++ /dev/null @@ -1,45 +0,0 @@ -require 'spec_helper' - -describe Travis::Hub::Handler do - let(:payload) { { :name => 'worker-1', :host => 'ruby-1.worker.travis-ci.org' } } - - describe '.for' do - describe 'given an event namespaced job:*' do - events = %w( - job:config:started - job:config:finished - job:test:started - job:test:log - job:test:finished - ) - - events.each do |event| - it 'returns a Job handler for #{event.inspect}' do - Travis::Hub::Handler.for(event, payload).should be_kind_of(Travis::Hub::Handler::Job) - end - end - end - - describe 'given an event namespaced worker:*' do - events = %w( - worker:status - worker:start - worker:finish - ) - - events.each do |event| - it 'returns a Worker handler for #{event.inspect}' do - Travis::Hub::Handler.for(event, payload).should be_kind_of(Travis::Hub::Handler::Worker) - end - end - end - - describe 'without an event name' do - describe 'for pull and push requests' do - it 'should fetch a Request handler for pull requests' do - Travis::Hub::Handler.for(nil, {'type' => 'pull_request'}).should be_instance_of(Travis::Hub::Handler::Request) - end - end - end - end -end diff --git a/spec/travis/hub/instrument/job_spec.rb b/spec/travis/hub/instrument/job_spec.rb deleted file mode 100644 index c7f9798a2..000000000 --- a/spec/travis/hub/instrument/job_spec.rb +++ /dev/null @@ -1,43 +0,0 @@ -require 'spec_helper' -require 'json' - -describe Travis::Hub::Instrument::Handler::Job do - include Travis::Testing::Stubs - - let(:payload) { { 'id' => 1, 'some' => 'payload' } } - let(:publisher) { Travis::Notification::Publisher::Memory.new } - let(:handler) { Travis::Hub::Handler::Job.new(nil, payload) } - let(:job) { stub('job', :update_attributes => nil) } - let(:event) { publisher.events.last } - - before :each do - Travis::Notification.publishers.replace([publisher]) - Travis.stubs(:run_service) - end - - it 'publishes a payload on update' do - handler.event = 'job:test:started' - handler.handle - - event.should publish_instrumentation_event( - :event => 'travis.hub.handler.job.update:completed', - :message => 'Travis::Hub::Handler::Job#update:completed for #', - ) - event[:data].should == { - :event => 'job:test:started', - :payload => payload - } - end - - # disabled for now cuz it's too spammy - # it 'publishes a payload on log' do - # handler.event = 'job:test:log' - # handler.handle - - # event[:payload].should == { - # :msg => 'Travis::Hub::Handler::Job#log for #', - # :event => 'job:test:log', - # :payload => payload - # } - # end -end diff --git a/spec/travis/hub/queues_spec.rb b/spec/travis/hub/queues_spec.rb index b29206f0b..a32a144a2 100644 --- a/spec/travis/hub/queues_spec.rb +++ b/spec/travis/hub/queues_spec.rb @@ -1,47 +1,36 @@ require 'spec_helper' -describe Travis::Hub::Queues do - let(:hub) { Travis::Hub::Queues.new } +describe Travis::Hub::Queue do + let(:handler) { ->(*) {} } + let(:queue) { Travis::Hub::Queue.new(&handler) } let(:payload) { '{ "foo": "bar", "uuid": "2d931510-d99f-494a-8c67-87feb05e1594" }' } - let(:message) { stub('message', :ack => nil, :properties => stub('properties', :type => 'request') ) } # TODO what are the real event types? - let(:handler) { stub('handler', :handle => nil) } + let(:message) { stub('message', :ack => nil, :properties => stub('properties', :type => 'job:finish') ) } - before :each do - Travis::Hub::Handler.stubs(:for).returns(handler) - end - - describe 'decode' do - it 'decodes a json payload' do - hub.send(:decode, '{ "id": 1 }')['id'].should == 1 - end + def receive + queue.send(:receive, message, payload) end describe 'receive' do it 'sets the given uuid to the current thread' do - hub.send(:receive, message, payload) + receive Thread.current[:uuid].should == '2d931510-d99f-494a-8c67-87feb05e1594' end describe 'with no exception being raised' do - it 'gets a handler for the event type and payload' do - Travis::Hub::Handler.expects(:for).with('request', { 'foo' => 'bar' }).returns(handler) - hub.receive(message, payload) - end - it 'handles the event' do - handler.expects(:handle) - hub.receive(message, payload) + handler.expects(:call).with('job:finish', 'foo' => 'bar') + receive end it 'acknowledges the message' do message.expects(:ack) - hub.receive(message, payload) + receive end end describe 'with an exception being raised' do before :each do - handler.expects(:handle).raises(StandardError.new('message')) + handler.expects(:call).raises(StandardError.new('message')) $stdout = StringIO.new end @@ -50,13 +39,13 @@ end it 'outputs the exception' do - hub.receive(message, payload) + receive $stdout.string.should =~ /message/ end it 'acknowledges the message' do message.expects(:ack) - hub.receive(message, payload) + receive end it 'notifies the error reporter' do @@ -65,8 +54,14 @@ exception.should be_instance_of(Travis::Hub::Error) exception.message.should =~ /message/ end - hub.receive(message, payload) + receive end end end + + describe 'decode' do + it 'decodes a json payload' do + queue.send(:decode, '{ "id": 1 }')['id'].should == 1 + end + end end