Skip to content

Commit

Permalink
Working, untested
Browse files Browse the repository at this point in the history
  • Loading branch information
winton committed Oct 11, 2011
1 parent ae5da8a commit 9f6873d
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 27 deletions.
81 changes: 60 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,75 @@
Execache
===========
========

A gem template for new projects.
Run commands in parallel and cache the output. Redis queues jobs and stores the result.

Requirements
------------

<pre>
gem install stencil
gem install execache
</pre>

Setup the template
------------------
How Your Binaries Should Behave
-------------------------------

You only have to do this once.
Execache assumes that the script or binary you are executing has multiple results and sometimes multiple groups of results.

<pre>
git clone git@github.com:winton/execache.git
cd execache
stencil
</pre>
Example output:

Setup a new project
-------------------
$ bin/some/binary preliminary_arg arg1a arg1b arg2a arg2b
$ arg1_result_1
$ arg1_result_2
$ [END]
$ arg2_result_1
$ arg2_result_2

Do this for every new project.
Your binary may take zero or more preliminary arguments (e.g. `preliminary_arg`), followed by argument "groups" that dictate output (e.g. `arg1a arg1b`).

<pre>
mkdir my_project
git init
stencil execache
rake rename
</pre>
Configure
---------

Given the above example, our `execache.yml` looks like this:

redis: localhost:6379/0
some_binary:
command: '/bin/some/binary'
separators:
result: "\n"
group: "[END]"

Start the Server
----------------

$ execache /path/to/execache.yml

Execute Commands
----------------

require 'rubygems'
require 'execache'

client = Execache::Client.new("localhost:6379/0")

results = client.exec(
:some_binary => {
:args => 'preliminary_arg',
:groups => [
{
:args => 'arg1a arg1b',
:ttl => 60
},
{
:args => 'arg2a arg2b',
:ttl => 60
}
]
}
)

The last command does a find-replace (gem\_template -> my\_project) on files and filenames.
results == {
:some_binary => [
[ 'arg1_result_1', 'arg1_result_2' ],
[ 'arg2_result_1', 'arg2_result_2' ]
]
}
13 changes: 8 additions & 5 deletions execache.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ Gem::Specification.new do |s|
s.name = "execache"
s.version = '0.1.0'
s.platform = Gem::Platform::RUBY
s.authors = []
s.email = []
s.homepage = "http://"
s.summary = %q{}
s.description = %q{}
s.authors = [ "Winton Welsh" ]
s.email = [ "mail@wintoni.us" ]
s.homepage = "http://github.com/winton/execache"
s.summary = %q{Run commands in parallel and cache the output, controlled by Redis}
s.description = %q{Run commands in parallel and cache the output. Redis queues jobs and stores the result.}

s.executables = `cd #{root} && git ls-files bin/*`.split("\n").collect { |f| File.basename(f) }
s.files = `cd #{root} && git ls-files`.split("\n")
s.require_paths = %w(lib)
s.test_files = `cd #{root} && git ls-files -- {features,test,spec}/*`.split("\n")

s.add_development_dependency "rspec", "~> 1.0"

s.add_dependency "redis", "~> 2.2.2"
s.add_dependency "yajl-ruby", "~> 1.0.0"
end
113 changes: 112 additions & 1 deletion lib/execache.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,115 @@
require "digest/sha1"
require "timeout"
require "yaml"

gem "yajl-ruby", "~> 1.0.0"
require "yajl"

gem "redis", "~> 2.2.2"
require "redis"

$:.unshift File.dirname(__FILE__)

module Execache
require 'execache/client'

class Execache

def initialize(yaml)
options = YAML.load(File.read(yaml))

puts "\nStarting execache server (redis @ #{options['redis']})..."

redis = Redis.connect(:url => "redis://#{options['redis']}")
retries = 0

begin
while true
request = redis.lpop('execache:request')
if request
#Thread.new do
request = Yajl::Parser.parse(request)
channel = request.delete('channel')
commands = []

request.each do |cmd_type, cmd_options|
# Command with preliminary args
command = [
options[cmd_type]['command'],
cmd_options['args']
]

# Fill results with caches if present
cmd_options['groups'].each do |group|
cache_key = Digest::SHA1.hexdigest(
"#{cmd_options['args']} #{group['args']}"
)
group['cache_key'] = cache_key = "execache:cache:#{cache_key}"
cache = redis.get(cache_key)

if cache
group['result'] = Yajl::Parser.parse(cache)
else
command << group['args']
nil
end
end

# Add command to be executed if not all args are cached
if command.length > 2
cmd_options['cmd'] = command.join(' ')
end
end

# Build response
response = request.inject({}) do |hash, (cmd_type, cmd_options)|
hash[cmd_type] = []

if cmd_options['cmd']
separators = options[cmd_type]['separators']
output = `#{cmd_options['cmd']}`
output = output.split(separators['group'] + separators['result'])
output = output.collect { |r| r.split(separators['result']) }
end

cmd_options['groups'].each do |group|
if group['result']
hash[cmd_type] << group['result']
else
hash[cmd_type] << output.shift
redis.set(
group['cache_key'],
Yajl::Encoder.encode(hash[cmd_type].last)
)
if group['ttl']
redis.expire(group['cache_key'], group['ttl'])
end
end
end

hash
end

redis.publish(
"execache:response:#{channel}",
Yajl::Encoder.encode(response)
)
#end
end
sleep(1.0 / 1000.0)
end
rescue Interrupt
shut_down
rescue Exception => e
puts "\nError: #{e.message}"
puts "\t#{e.backtrace.join("\n\t")}"
retries += 1
shut_down if retries >= 10
retry
end
end

def shut_down
puts "\nShutting down execache server..."
exit
end
end
29 changes: 29 additions & 0 deletions lib/execache/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class Execache
class Client

def initialize(redis_url)
@redis_1 = Redis.connect(:url => "redis://#{redis_url}")
@redis_2 = Redis.connect(:url => "redis://#{redis_url}")
end

def exec(options)
options[:channel] = Digest::SHA1.hexdigest("#{rand}")
response = nil

Timeout.timeout(60) do
@redis_1.subscribe("execache:response:#{options[:channel]}") do |on|
on.subscribe do |channel, subscriptions|
@redis_2.rpush "execache:request", Yajl::Encoder.encode(options)
end

on.message do |channel, message|
response = Yajl::Parser.parse(message)
@redis_1.unsubscribe
end
end
end

response
end
end
end
29 changes: 29 additions & 0 deletions spec/execache_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
require 'spec_helper'

describe Execache do

before(:all) do
@thread = Thread.new do
Execache.new("#{$root}/spec/fixtures/execache.yml")
end
@client = Execache::Client.new("localhost:6379/0")
end

after(:all) do
@thread.kill
end

it "should" do
puts @client.exec(
:some_binary => {
:args => 'preliminary_arg',
:groups => [
{
:args => 'arg1a arg1b',
:ttl => 60
},
{
:args => 'arg2a arg2b',
:ttl => 60
}
]
}
).inspect
end
end
6 changes: 6 additions & 0 deletions spec/fixtures/execache.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
redis: localhost:6379/0
some_binary:
command: 'ruby spec/fixtures/fixture.rb'
separators:
result: "\n"
group: "[END]"
5 changes: 5 additions & 0 deletions spec/fixtures/fixture.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
puts "arg1_result_1
arg1_result_2
[END]
arg2_result_1
arg2_result_2"
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "pp"
require "bundler"

Bundler.require(:default)
Bundler.require(:development)

$root = File.expand_path('../../', __FILE__)
Expand Down

0 comments on commit 9f6873d

Please sign in to comment.