Skip to content
This repository
Browse code

minor code changes to enhance readability

  • Loading branch information...
commit f99d9699d508f900314c83c2d18149a3ffc3c787 1 parent 5c025f4
Ryan LeCompte ryanlecompte authored
2  .gitignore
@@ -4,3 +4,5 @@ Gemfile.lock
4 4 pkg/*
5 5 rbxdb/
6 6 *.rdb
  7 +.idea/*
  8 +
2  README.md
Source Rendered
@@ -36,7 +36,7 @@ In your Rails app, create a `config/initializers/girl_friday.rb` which defines y
36 36 :size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.
37 37
38 38 In order to use the Redis backend, you must use a connection pool to share a set of Redis connections with
39   -other threads and GirlFriday queues using the `connection\_pool` gem:
  39 +other threads and GirlFriday queues using the `connection_pool` gem:
40 40
41 41 require 'connection_pool'
42 42
10 lib/girl_friday/persistence.rb
@@ -23,25 +23,23 @@ def size
23 23 class Redis
24 24 def initialize(name, options)
25 25 @opts = options
26   - unless @opts[:pool]
27   - raise ArgumentError, "you must pass in a :pool"
28   - end
  26 + raise ArgumentError, "you must pass in a :pool" unless @opts[:pool]
29 27 @key = "girl_friday-#{name}-#{environment}"
30 28 end
31 29
32 30 def push(work)
33 31 val = Marshal.dump(work)
34   - redis{ |r| r.rpush(@key, val) }
  32 + redis { |r| r.rpush(@key, val) }
35 33 end
36 34 alias_method :<<, :push
37 35
38 36 def pop
39   - val = redis{ |r| r.lpop(@key) }
  37 + val = redis { |r| r.lpop(@key) }
40 38 Marshal.load(val) if val
41 39 end
42 40
43 41 def size
44   - redis.llen(@key)
  42 + redis { |r| r.llen(@key) }
45 43 end
46 44
47 45 private
48 lib/girl_friday/work_queue.rb
@@ -70,17 +70,26 @@ def wait_for_empty
70 70 end
71 71 end
72 72
73   - def shutdown
  73 + def shutdown(&block)
74 74 # Runtime state should never be modified by caller thread,
75 75 # only the Supervisor thread.
76   - @supervisor << Shutdown[block_given? ? Proc.new : nil]
  76 + @supervisor << Shutdown[block]
77 77 end
78 78
79 79 private
80 80
  81 + def running?
  82 + !@shutdown
  83 + end
  84 +
  85 + def handle_error(ex)
  86 + # Redis network error? Log and ignore.
  87 + @error_handlers.each { |handler| handler.handle(ex) }
  88 + end
  89 +
81 90 def on_ready(who)
82 91 @total_processed += 1
83   - if !@shutdown && work = @persister.pop
  92 + if running? && work = @persister.pop
84 93 who.this << work
85 94 drain
86 95 else
@@ -88,22 +97,21 @@ def on_ready(who)
88 97 ready_workers << who.this
89 98 end
90 99 rescue => ex
91   - # Redis network error? Log and ignore.
92   - @error_handlers.each { |handler| handler.handle(ex) }
  100 + handle_error(ex)
93 101 end
94 102
95 103 def shutdown_complete
96 104 begin
97 105 @when_shutdown.call(self) if @when_shutdown
98 106 rescue Exception => ex
99   - @error_handlers.each { |handler| handler.handle(ex) }
  107 + handle_error(ex)
100 108 end
101 109 end
102 110
103 111 def on_work(work)
104 112 @total_queued += 1
105 113
106   - if !@shutdown && worker = ready_workers.pop
  114 + if running? && worker = ready_workers.pop
107 115 @busy_workers << worker
108 116 worker << work
109 117 drain
@@ -111,19 +119,12 @@ def on_work(work)
111 119 @persister << work
112 120 end
113 121 rescue => ex
114   - # Redis network error? Log and ignore.
115   - @error_handlers.each { |handler| handler.handle(ex) }
  122 + handle_error(ex)
116 123 end
117 124
118 125 def ready_workers
119   - @ready_workers ||= begin
120   - workers = []
121   - @size.times do
122   - # start N workers
123   - workers << Actor.spawn_link(&@work_loop)
124   - end
125   - workers
126   - end
  126 + # start N workers
  127 + @ready_workers ||= Array.new(@size) { Actor.spawn_link(&@work_loop) }
127 128 end
128 129
129 130 def start
@@ -132,9 +133,9 @@ def start
132 133 supervisor = Actor.current
133 134 @work_loop = Proc.new do
134 135 Thread.current[:label] = "#{name}-worker"
135   - while !@shutdown do
  136 + while running? do
136 137 work = Actor.receive
137   - if !@shutdown
  138 + if running?
138 139 result = @processor.call(work.msg)
139 140 work.callback.call(result) if work.callback
140 141 supervisor << Ready[Actor.current]
@@ -155,8 +156,7 @@ def start
155 156
156 157 def drain
157 158 # give as much work to as many ready workers as possible
158   - ps = @persister.size
159   - todo = ready_workers.size < ps ? ready_workers.size : ps
  159 + todo = [@persister.size, ready_workers.size].min
160 160 todo.times do
161 161 worker = ready_workers.pop
162 162 @busy_workers << worker
@@ -183,18 +183,18 @@ def supervisor_loop
183 183 return
184 184 end
185 185 f.when(Actor::DeadActorError) do |ex|
186   - if !@shutdown
  186 + if running?
187 187 # TODO Provide current message contents as error context
188 188 @total_errors += 1
189 189 @busy_workers.delete(ex.actor)
190 190 ready_workers << Actor.spawn_link(&@work_loop)
191   - @error_handlers.each { |handler| handler.handle(ex.reason) }
  191 + handle_error(ex.reason)
192 192 end
193 193 end
194 194 end
195 195 end
196 196 end
197   -
198 197 end
  198 +
199 199 Queue = WorkQueue
200 200 end

0 comments on commit f99d969

Please sign in to comment.
Something went wrong with that request. Please try again.