Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge commit 'mloughran/master'

  • Loading branch information...
commit a44c28c53845dfd7e5bb36285a4d1fe82beb0787 2 parents 3b2627d + 0d7b71f
Keith Rarick authored December 10, 2009

Showing 1 changed file with 14 additions and 2 deletions. Show diff stats Hide diff stats

  1. 16  lib/beanstalk-client/connection.rb
16  lib/beanstalk-client/connection.rb
@@ -19,6 +19,7 @@
19 19
 require 'fcntl'
20 20
 require 'yaml'
21 21
 require 'set'
  22
+require 'thread'
22 23
 require 'beanstalk-client/errors'
23 24
 require 'beanstalk-client/job'
24 25
 
@@ -27,6 +28,7 @@ class Connection
27 28
     attr_reader :addr
28 29
 
29 30
     def initialize(addr, default_tube=nil)
  31
+      @mutex = Mutex.new
30 32
       @waiting = false
31 33
       @addr = addr
32 34
       connect
@@ -80,6 +82,7 @@ def peek_buried()
80 82
 
81 83
     def reserve(timeout=nil)
82 84
       raise WaitingForJobError if @waiting
  85
+      @mutex.lock
83 86
       if timeout.nil?
84 87
         @socket.write("reserve\r\n")
85 88
       else
@@ -97,6 +100,8 @@ def reserve(timeout=nil)
97 100
       end
98 101
 
99 102
       Job.new(self, *read_job('RESERVED'))
  103
+    ensure
  104
+      @mutex.unlock
100 105
     end
101 106
 
102 107
     def delete(id)
@@ -178,10 +183,13 @@ def list_tubes_watched(cached=false)
178 183
 
179 184
     def interact(cmd, rfmt)
180 185
       raise WaitingForJobError if @waiting
  186
+      @mutex.lock
181 187
       @socket.write(cmd)
182 188
       return read_yaml('OK') if rfmt == :yaml
183 189
       return found_job if rfmt == :job
184 190
       check_resp(*rfmt)
  191
+    ensure
  192
+      @mutex.unlock
185 193
     end
186 194
 
187 195
     def get_resp()
@@ -336,7 +344,9 @@ def list_tubes_watched(*args)
336 344
     end
337 345
 
338 346
     def remove(conn)
339  
-      @connections.delete(conn.addr)
  347
+      connection = @connections.delete(conn.addr)
  348
+      connection.close if connection
  349
+      connection
340 350
     end
341 351
 
342 352
     # Close all open connections for this pool
@@ -370,7 +380,9 @@ def peek_job(id)
370 380
     def call_wrap(c, *args)
371 381
       self.last_conn = c
372 382
       c.send(*args)
373  
-    rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex
  383
+    rescue UnexpectedResponse => ex
  384
+      raise ex
  385
+    rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex
374 386
       self.remove(c)
375 387
       raise ex
376 388
     end

0 notes on commit a44c28c

Please sign in to comment.
Something went wrong with that request. Please try again.