Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Version 0.2.0 - The great rewrite

  • Loading branch information...
commit 61c30bf988c2e1221de36ddc9ef4e2959cedc6ee 1 parent 239a6f2
@winton authored
View
12 README.md
@@ -32,6 +32,7 @@ Configure
Given the above example, our `execache.yml` looks like this:
redis: localhost:6379/0
+ parallel: 3
some_binary:
command: '/bin/some/binary'
separators:
@@ -52,17 +53,12 @@ Execute Commands
client = Execache::Client.new("localhost:6379/0")
results = client.exec(
+ :ttl => 60,
:some_binary => {
:args => 'preliminary_arg',
:groups => [
- {
- :args => 'arg1a arg1b',
- :ttl => 60
- },
- {
- :args => 'arg2a arg2b',
- :ttl => 60
- }
+ 'arg1a arg1b',
+ 'arg2a arg2b'
]
}
)
View
2  execache.gemspec
@@ -6,7 +6,7 @@ $:.unshift lib unless $:.include?(lib)
Gem::Specification.new do |s|
s.name = "execache"
- s.version = '0.1.10'
+ s.version = '0.2.0'
s.platform = Gem::Platform::RUBY
s.authors = [ "Winton Welsh" ]
s.email = [ "mail@wintoni.us" ]
View
142 lib/execache.rb
@@ -16,6 +16,7 @@ class Execache
def initialize(yaml)
options = YAML.load(File.read(yaml))
+ parallel = options['parallel'] || 3
puts "\nStarting execache server (redis @ #{options['redis']})..."
@@ -24,100 +25,111 @@ def initialize(yaml)
begin
while true
- request = redis.lpop('execache:request')
- if request
+ if request = redis.lpop('execache:request')
Timeout.timeout(60) do
request = Yajl::Parser.parse(request)
+
+ # Options
+ global_cache_key = request.delete('cache_key')
channel = request.delete('channel')
- force = request.delete('channel')
- commands = []
+ force = request.delete('force')
+ ttl = request.delete('ttl')
+
pending = false
+ results = {}
request.each do |cmd_type, cmd_options|
- # Command with preliminary args
+ cache_keys = []
+ groups = []
+
+ # Binary + preliminary arguments
command = [
options[cmd_type]['command'],
cmd_options['args']
- ]
+ ].join(' ')
- # Fill results with caches if present
- cmd_options['groups'].each do |group|
+ # For each argument group...
+ cmd_options['groups'].each do |args|
cache_key = Digest::SHA1.hexdigest(
- "#{group['cache_key'] || cmd_options['args']} #{group['args']}"
+ "#{global_cache_key || command} #{args}"
)
- group['cache_key'] = cache_key = "execache:cache:#{cache_key}"
+ cache_key = "execache:cache:#{cache_key}"
cache = redis.get(cache_key)
-
- if cache && cache == '[PENDING]'
+
+ # If force cache overwrite || no cache || pending cache
+ if force || !cache || cache == '[PENDING]'
pending = true
- group['result'] = true
- elsif !force && !group['force'] && cache
- group['result'] = Yajl::Parser.parse(cache)
+
+ # Else, store cache result
else
- pending = true
+ results[cmd_type] ||= []
+ results[cmd_type] << Yajl::Parser.parse(cache)
+ end
+
+ # If force cache overwrite || no cache
+ if force || !cache
redis.set(cache_key, '[PENDING]')
redis.expire(cache_key, 60) # Timeout incase execution fails
- command << group['args']
+
+ cache_keys << cache_key
+ groups << args
end
end
- # Add command to be executed if not all args are cached
- if command.length > 2
- cmd_options['cmd'] = command.join(' ')
+ # Add to command queue if commands present
+ unless groups.empty?
+ command = {
+ :cache_keys => cache_keys,
+ :cmd_type => cmd_type,
+ :command => command,
+ :groups => groups,
+ :ttl => ttl
+ }
+ redis.rpush("execache:commands", Yajl::Encoder.encode(command))
end
end
+
+ redis.publish(
+ "execache:response:#{channel}",
+ pending ? '[PENDING]' : Yajl::Encoder.encode(results)
+ )
+ end
+ end
- if pending
- # Execute command in thread, cache results
- unless redis.get('execache:wait')
- Thread.new do
- Timeout.timeout(60) do
- request.each do |cmd_type, cmd_options|
- if cmd_options['cmd']
- redis.set('execache:wait', '1')
- redis.expire('execache:wait', 120)
- separators = options[cmd_type]['separators'] || {}
- separators['group'] ||= "[END]"
- separators['result'] ||= "\n"
- output = `#{cmd_options['cmd']}`
- output = output.split(separators['group'] + separators['result'])
- output = output.collect { |r| r.split(separators['result']) }
- redis.del('execache:wait')
- end
-
- cmd_options['groups'].each do |group|
- unless group['result']
- redis.set(
- group['cache_key'],
- Yajl::Encoder.encode(output.shift)
- )
- if group['ttl']
- redis.expire(group['cache_key'], group['ttl'])
- end
- end
- end
- end
- end
- end
- end
- else
- response = request.inject({}) do |hash, (cmd_type, cmd_options)|
- hash[cmd_type] = []
+ # Execute queued commands
+ if redis.get("execache:parallel").to_i <= parallel && cmd = redis.lpop("execache:commands")
+ redis.incr("execache:parallel")
+ Thread.new do
+ Timeout.timeout(60) do
+ cmd = Yajl::Parser.parse(cmd)
- cmd_options['groups'].each do |group|
- hash[cmd_type] << group['result']
- end
+ cache_keys = cmd['cache_keys']
+ cmd_type = cmd['cmd_type']
+ command = cmd['command']
+ groups = cmd['groups']
+ ttl = cmd['ttl']
+
+ separators = options[cmd_type]['separators'] || {}
+ separators['group'] ||= "[END]"
+ separators['result'] ||= "\n"
+
+ results = `#{command} #{groups.join(' ')}`
+ results = results.split(separators['group'] + separators['result'])
+ results = results.collect { |r| r.split(separators['result']) }
- hash
+ redis.decr("execache:parallel")
+
+ results.each_with_index do |result, i|
+ redis.set(
+ cache_keys[i],
+ Yajl::Encoder.encode(result)
+ )
+ redis.expire(cache_keys[i], ttl) if ttl
end
end
-
- redis.publish(
- "execache:response:#{channel}",
- pending ? '[PENDING]' : Yajl::Encoder.encode(response)
- )
end
end
+
sleep(1.0 / 1000.0)
end
rescue Interrupt
View
37 spec/execache_spec.rb
@@ -5,17 +5,12 @@
def client_exec(options={})
@client.exec(
{
+ :ttl => 60,
:some_binary => {
:args => 'preliminary_arg',
:groups => [
- {
- :args => 'arg1a arg1b',
- :ttl => 60
- },
- {
- :args => 'arg2a arg2b',
- :ttl => 60
- }
+ 'arg1a arg1b',
+ 'arg2a arg2b'
]
}
}.merge(options)
@@ -27,7 +22,7 @@ def client_exec(options={})
Execache.new("#{$root}/spec/fixtures/execache.yml")
end
@client = Execache::Client.new("localhost:6379/0")
- @client.redis_1.keys("execache:cache:*").each do |key|
+ @client.redis_1.keys("execache:*").each do |key|
@client.redis_1.del(key)
end
end
@@ -64,14 +59,10 @@ def client_exec(options={})
it "should read from cache for individual groups" do
@client.exec(
+ :ttl => 60,
:some_binary => {
:args => 'preliminary_arg',
- :groups => [
- {
- :args => 'arg2a arg2b',
- :ttl => 60
- }
- ]
+ :groups => [ 'arg2a arg2b' ]
}
).should == {
"some_binary" => [
@@ -80,14 +71,10 @@ def client_exec(options={})
}
@client.exec(
+ :ttl => 60,
:some_binary => {
:args => 'preliminary_arg',
- :groups => [
- {
- :args => 'arg1a arg1b',
- :ttl => 60
- }
- ]
+ :groups => [ 'arg1a arg1b' ]
}
).should == {
"some_binary" => [
@@ -98,14 +85,10 @@ def client_exec(options={})
it "should not read cache if preliminary arg changes" do
@client.exec(
+ :ttl => 60,
:some_binary => {
:args => 'preliminary_arg2',
- :groups => [
- {
- :args => 'arg2a arg2b',
- :ttl => 60
- }
- ]
+ :groups => [ 'arg2a arg2b' ]
}
).should == {
"some_binary" => [
View
1  spec/fixtures/execache.yml
@@ -1,4 +1,5 @@
redis: localhost:6379/0
+parallel: 2
some_binary:
command: 'ruby spec/fixtures/fixture.rb'
separators:
Please sign in to comment.
Something went wrong with that request. Please try again.