Skip to content

Commit

Permalink
Adding ability to wait or not wait for response
Browse files Browse the repository at this point in the history
  • Loading branch information
winton committed Oct 14, 2011
1 parent 0bb5a92 commit 9b95321
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,6 +1,7 @@
.DS_Store
*.gem
.bundle
dump.rdb
Gemfile.lock
pkg
tmp
67 changes: 40 additions & 27 deletions lib/execache.rb
Expand Up @@ -26,10 +26,11 @@ def initialize(yaml)
while true
request = redis.lpop('execache:request')
if request
Thread.new do
Timeout.timeout(60) do
request = Yajl::Parser.parse(request)
channel = request.delete('channel')
commands = []
pending = false

request.each do |cmd_type, cmd_options|
# Command with preliminary args
Expand All @@ -46,11 +47,14 @@ def initialize(yaml)
group['cache_key'] = cache_key = "execache:cache:#{cache_key}"
cache = redis.get(cache_key)

if cache
if cache && cache == '[PENDING]'
pending = true
elsif cache
group['result'] = Yajl::Parser.parse(cache)
else
pending = true
redis.set(cache_key, '[PENDING]')
command << group['args']
nil
end
end

Expand All @@ -60,40 +64,49 @@ def initialize(yaml)
end
end

# Build response
response = request.inject({}) do |hash, (cmd_type, cmd_options)|
hash[cmd_type] = []
if pending
# Execute command in thread, cache results
Thread.new do
Timeout.timeout(60) do
request.each do |cmd_type, cmd_options|
if cmd_options['cmd']
separators = options[cmd_type]['separators'] || {}
separators['group'] ||= "[END]"
separators['result'] ||= "\n"
output = `#{cmd_options['cmd']}`
output = output.split(separators['group'] + separators['result'])
output = output.collect { |r| r.split(separators['result']) }
end

if cmd_options['cmd']
separators = options[cmd_type]['separators'] || {}
separators['group'] ||= "[END]"
separators['result'] ||= "\n"
output = `#{cmd_options['cmd']}`
output = output.split(separators['group'] + separators['result'])
output = output.collect { |r| r.split(separators['result']) }
cmd_options['groups'].each do |group|
unless group['result']
redis.set(
group['cache_key'],
Yajl::Encoder.encode(output.shift)
)
if group['ttl']
redis.expire(group['cache_key'], group['ttl'])
end
end
end
end
end
end
else
response = request.inject({}) do |hash, (cmd_type, cmd_options)|
hash[cmd_type] = []

cmd_options['groups'].each do |group|
if group['result']
cmd_options['groups'].each do |group|
hash[cmd_type] << group['result']
else
hash[cmd_type] << output.shift
redis.set(
group['cache_key'],
Yajl::Encoder.encode(hash[cmd_type].last)
)
if group['ttl']
redis.expire(group['cache_key'], group['ttl'])
end
end
end

hash
hash
end
end

redis.publish(
"execache:response:#{channel}",
Yajl::Encoder.encode(response)
pending ? '[PENDING]' : Yajl::Encoder.encode(response)
)
end
end
Expand Down
21 changes: 16 additions & 5 deletions lib/execache/client.rb
Expand Up @@ -9,18 +9,29 @@ def initialize(redis_url)
end

def exec(options)
options[:channel] = Digest::SHA1.hexdigest("#{rand}")
wait = options.delete(:wait)
subscribe_to = options[:channel] = Digest::SHA1.hexdigest("#{rand}")
options = Yajl::Encoder.encode(options)
response = nil

Timeout.timeout(60) do
@redis_1.subscribe("execache:response:#{options[:channel]}") do |on|
@redis_1.subscribe("execache:response:#{subscribe_to}") do |on|
on.subscribe do |channel, subscriptions|
@redis_2.rpush "execache:request", Yajl::Encoder.encode(options)
@redis_2.rpush "execache:request", options
end

on.message do |channel, message|
response = Yajl::Parser.parse(message)
@redis_1.unsubscribe
if message.include?('[PENDING]')
if wait == false
response = false
@redis_1.unsubscribe
else
@redis_2.rpush "execache:request", options
end
else
response = Yajl::Parser.parse(message)
@redis_1.unsubscribe
end
end
end
end
Expand Down
43 changes: 29 additions & 14 deletions spec/execache_spec.rb
Expand Up @@ -2,21 +2,23 @@

describe Execache do

def client_exec
def client_exec(options={})
@client.exec(
:some_binary => {
:args => 'preliminary_arg',
:groups => [
{
:args => 'arg1a arg1b',
:ttl => 60
},
{
:args => 'arg2a arg2b',
:ttl => 60
}
]
}
{
:some_binary => {
:args => 'preliminary_arg',
:groups => [
{
:args => 'arg1a arg1b',
:ttl => 60
},
{
:args => 'arg2a arg2b',
:ttl => 60
}
]
}
}.merge(options)
)
end

Expand Down Expand Up @@ -120,4 +122,17 @@ def client_exec
]
}
end

it "should respect wait option" do
@client.redis_1.keys("execache:cache:*").each do |key|
@client.redis_1.del(key)
end
client_exec(:wait => false).should == false
client_exec.should == {
"some_binary" => [
["arg1_result_1", "arg1_result_2"],
["arg2_result_1", "arg2_result_2"]
]
}
end
end

0 comments on commit 9b95321

Please sign in to comment.