Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 4 commits
  • 1 file changed
  • 0 comments
  • 1 contributor
Apr 10, 2012
Alan Harper Remove .rvmrc 1220cb4
Jun 05, 2012
Alan Harper Merge branch 'master' of github.com:square-circle-triangle/bottle
* 'master' of github.com:square-circle-triangle/bottle:
  handle silent bunny timeouts * Generate an error if the given block is not yielded to: * This implies that no msg was received and the timeout has triggered an exit
  added retry_on_exception module\n * Enable error events to gracefully retry a set number of times * before giving up and raising a fatal exception
  enable auto-recovery after TCP connection loss
  fixed timeout issues
04e764b
Alan Harper Catch psych's special exceptions that are otherwise uncatchable 2a2c15f
Alan Harper Catch Pysch's special errors bb8af73

Showing 1 changed file with 18 additions and 10 deletions. Show diff stats Hide diff stats

  1. 28  lib/bottle/listener.rb
28  lib/bottle/listener.rb
@@ -5,7 +5,7 @@ class Listener
5 5
 
6 6
     def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = nil)
7 7
       @queue_name = queue_name
8  
-      @channel    = channel
  8
+      @channel = channel
9 9
       @channel.auto_recovery = true
10 10
       @channel.on_error(&method(:handle_channel_exception))
11 11
       #@consumer   = consumer
@@ -16,21 +16,29 @@ def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = nil
16 16
     def start
17 17
       puts "binding to #{@queue_name}, on exchange #{@exchange.name}"
18 18
       @queue.bind(@exchange, :routing_key => @queue_name).subscribe(:ack => true, &method(:handle_message))
19  
-    end 
  19
+    end
20 20
 
21  
-    def handle_message(metadata,payload)
  21
+    def handle_message(metadata, payload)
22 22
       worker_class = Bottle::Foreman.registered_workers[metadata.type]
23 23
       if worker_class.nil?
24  
-        respond({:state => 'error', :message => "Failed to find suitable worker for #{metadata.type}" }, metadata)
  24
+        respond({:state => 'error', :message => "Failed to find suitable worker for #{metadata.type}"}, metadata)
25 25
         false
26 26
       else
27  
-        payload = YAML.load(payload)
28  
-        respond worker_class.process(payload), metadata
29  
-        true
  27
+        begin
  28
+          #puts "GOT PAYLOAD: #{payload.inspect}"
  29
+          payload = YAML.load(payload)
  30
+
  31
+          respond worker_class.process(payload), metadata
  32
+          true
  33
+        rescue Psych::SyntaxError
  34
+          Airbrake.notify e
  35
+          puts e.inspect
  36
+          false
  37
+        end
30 38
       end
31 39
     rescue => e
32 40
       puts "Error processing message! #{e.message}"
33  
-      respond({:state => 'error', :message => e.message }, metadata)
  41
+      respond({:state => 'error', :message => e.message}, metadata)
34 42
       false
35 43
     ensure
36 44
       metadata.ack
@@ -40,9 +48,9 @@ def respond(payload, metadata)
40 48
       return if metadata.reply_to.nil?
41 49
       puts "Responding with #{payload.inspect} to: #{metadata.reply_to} : #{metadata.message_id}"
42 50
       @channel.default_exchange.publish(payload.to_yaml,
43  
-                                        :routing_key    => metadata.reply_to,
  51
+                                        :routing_key => metadata.reply_to,
44 52
                                         :correlation_id => metadata.message_id,
45  
-                                        :mandatory      => true)
  53
+                                        :mandatory => true)
46 54
     end
47 55
 
48 56
 

No commit comments for this range

Something went wrong with that request. Please try again.