Skip to content
Browse files

svn merge -r 1845:1860 svn+ssh://phlapjack@rubyforge.org/var/svn/skyn…

…et/branches/ts_manager

- Skynet manager now supports being daemonized by running ./script/skynet start and ./script/skynet stop!
- Change mysql text fields to longtext in migration and schema files
- Include some extras including our init.d script, our nagios monitoring script, rails controller and view for monitoring
- Created a new skynet rails initializer which gets installed from skynet_install --rails
- Modified the skynet_install skynet runner to take skynet initializer into account
- Skynet::Manager - Work on the shutdown proceedure. Much more graceful now.
- Changed some of the debug levels in Skynet::Worker and Skynet::Manager
- Skynet::ActiveRecordExtensions Fixed a bug where it would fail if your table had fewer than 1000 rows.  It performs a count
 first now to make sure there are enough rows.
- Fix a comment in Skynet::MapreduceHelper.  self.reduce should ahve been self.reduce_each
- Fix some bugs in skynet_config where it would choke on Skynet::Config.logfile_location if the SKYNET_LOG_FILE was set to a filehandle like STDOUT.
- Fix a skynet manager bug where it would get weird empty BigDecimals from the worker statuses
- Added Skynet::Job.results_by_job_id to retrieve results from asyncronous jobs
- Moved all worker stats into the manager class which now does stats no matter what worker_queue_adapter your using.  The stats are more detailed now as well.
- Removed all stats code from worker queue classes
- Added a stats_for_hosts method to Skynet::Manager which aggregates stats accross many managers.
- Move worker methods out of the mysql message queue adapter into a new mysql worker queue adapter
- Added a Skynet::Manager#stats method
- Skynet::Manager now saves its stats to a file and reloads them when it starts.
- Skynet::Manager.get returns the current local manager drb object
- Workers now just add their statuses to the worker queue (without trying to take their old one off)
- Managers just keep polling the queue, taking worker statuses off, and holding an internal hash of worker statuses.
- The Manager doesn't pass around the worker_pids or worker_queue anymore, just updates when needed and uses the cached one.
- Changed how the skynet pid file and log file are specified.  You can now specify the directory and file separately.
- Lowered the WORKER_CHECK_DELAY
- Made some API changes to Skynet::Config so you can call class or instance methods for all the CONFIG options.
- Add new WORKER_QUEUE_ADAPTER config setting
- Create Skynet::MessageQueue.start_or_connect
- More work on Skynet::WorkerQueue class
  • Loading branch information...
1 parent 819f49b commit 6693f8107e33bf9422eac85db0fb03e42dfbb615 phlapjack committed May 3, 2008
View
1 History.txt
@@ -1,4 +1,5 @@
== 0.9.3 2008-04-10
+ - Support starting Skynet with ./script/skynet start and stop to daemonize
- Close file handles on exec.
Skynet::Worker and Skynet::Manager now call Skynet.fork_and_exec instead of their own versions.
Skynet.fork_and_exec prevents file descriptor exhaustion by calling Skynet.close_file_handles.
View
10 Manifest.txt
@@ -7,12 +7,18 @@ app_generators/skynet_install/USAGE
app_generators/skynet_install/skynet_install_generator.rb
app_generators/skynet_install/templates/migration.rb
app_generators/skynet_install/templates/skynet
+app_generators/skynet_install/templates/skynet_initializer.rb
app_generators/skynet_install/templates/skynet_mysql_schema.sql
bin/skynet
bin/skynet_install
bin/skynet_tuplespace_server
config/hoe.rb
config/requirements.rb
+extras/README
+extras/init.d/skynet
+extras/nagios/check_skynet.sh
+extras/rails/controllers/skynet_controller.rb
+extras/rails/views/skynet/index.rhtml
lib/skynet.rb
lib/skynet/mapreduce_helper.rb
lib/skynet/mapreduce_test.rb
@@ -36,7 +42,10 @@ lib/skynet/skynet_ruby_extensions.rb
lib/skynet/skynet_task.rb
lib/skynet/skynet_tuplespace_server.rb
lib/skynet/skynet_worker.rb
+lib/skynet/skynet_worker_queue.rb
lib/skynet/version.rb
+lib/skynet/worker_queue_adapters/mysql.rb
+lib/skynet/worker_queue_adapters/tuple_space.rb
log/debug.log
log/skynet.log
log/skynet_tuplespace_server.log
@@ -45,7 +54,6 @@ script/destroy
script/generate
script/txt2html
setup.rb
-skynet.log
tasks/deployment.rake
tasks/environment.rake
tasks/website.rake
View
13 app_generators/skynet_install/skynet_install_generator.rb
@@ -26,8 +26,12 @@ def manifest
# Create stubs
m.template "skynet", "script/skynet", :collision => :ask, :chmod => 0775, :shebang => options[:shebang]
+ if @in_rails
+ m.template "skynet_initializer.rb", "config/initializers/skynet.rb", :collision => :ask, :chmod => 0655
+ m.directory 'config/initializers'
+ end
if @mysql
- m.template "skynet_schema.sql", "db/skynet_schema.sql", :collision => :ask, :chmod => 0655
+ m.template "skynet_mysql_schema.sql", "db/skynet_mysql_schema.sql", :collision => :ask, :chmod => 0655
m.directory 'db/migrate'
m.migration_template "migration.rb", "db/migrate",
:collision => :ask,
@@ -58,13 +62,16 @@ def add_options!(opts)
opts.on("--mysql",
"Include mysql migration if you want to use mysql as your message queue.
Installs:
- ./db/skynet_schema.sql
+ ./db/skynet_mysql_schema.sql
./db/migrate/db/migrate/###_create_skynet_tables.rb
") do |mysql|
options[:mysql] = true if mysql
end
opts.on("-r", "--rails",
- "Install into rails app",
+ "Install into rails app.
+ Installs:
+ ./config/initializers/skynet.rb
+ (If using rails 1, make sure to add require 'skynet' to your environment.rb)",
"Default: false") do |rails|
options[:rails] = true if rails
end
View
2 app_generators/skynet_install/templates/migration.rb
@@ -29,7 +29,7 @@ def self.up
t.column :tasktype, :string
t.column :task_id, 'bigint unsigned'
t.column :job_id, 'bigint unsigned'
- t.column :raw_payload, :text
+ t.column :raw_payload, "longtext"
t.column :payload_type, :string
t.column :name, :string
t.column :expiry, :integer
View
16 app_generators/skynet_install/templates/skynet
@@ -5,24 +5,20 @@
require File.expand_path(File.dirname(__FILE__) + '/../config/boot')
require File.expand_path(File.dirname(__FILE__)) + '/../config/environment'
<% end -%>
-
require 'rubygems'
require 'skynet'
-Skynet::CONFIG[:LAUNCHER_PATH] = File.expand_path(__FILE__)
-Skynet::CONFIG[:SKYNET_LOG_LEVEL] = Logger::ERROR
-<% if in_rails -%>
-Skynet::CONFIG[:SKYNET_PIDS_FILE] = File.expand_path("#{RAILS_ROOT}/log/skynet_#{RAILS_ENV}.pid")
-Skynet::CONFIG[:SKYNET_LOG_FILE] = File.expand_path("#{RAILS_ROOT}/log/skynet_#{RAILS_ENV}.log")
-<% else -%>
-Skynet::CONFIG[:SKYNET_PIDS_FILE] = File.expand_path(File.dirname(__FILE__) + "/../log/skynet.pid")
-Skynet::CONFIG[:SKYNET_LOG_FILE] = File.expand_path(File.dirname(__FILE__) + "/../log/skynet.log")
+Skynet::CONFIG[:LAUNCHER_PATH] = File.expand_path(__FILE__)
+Skynet::CONFIG[:SKYNET_LOG_LEVEL] = Logger::ERROR
+<% if not in_rails -%>
+Skynet::CONFIG[:SKYNET_LOG_DIR] = File.expand_path(File.dirname(__FILE__) + "/../log")
+Skynet::CONFIG[:SKYNET_PID_DIR] = File.expand_path(File.dirname(__FILE__) + "/../log")
<% end -%>
<% if mysql -%>
+<% if not in_rails -%>
# Use the mysql message queue adapter
Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER] = "Skynet::MessageQueueAdapter::Mysql"
-<% if not in_rails -%>
Skynet::CONFIG[:MYSQL_HOST] = "localhost"
Skynet::CONFIG[:MYSQL_USERNAME] = "root"
Skynet::CONFIG[:MYSQL_PASSWORD] = ""
View
9 app_generators/skynet_install/templates/skynet_initializer.rb
@@ -0,0 +1,9 @@
+require 'skynet'
+Skynet::CONFIG[:SKYNET_LOG_DIR] = File.expand_path("#{RAILS_ROOT}/log")
+Skynet::CONFIG[:SKYNET_LOG_FILE] = "skynet_#{RAILS_ENV}.log"
+Skynet::CONFIG[:SKYNET_PID_DIR] = File.expand_path("#{RAILS_ROOT}/log")
+Skynet::CONFIG[:SKYNET_LOG_LEVEL] = Logger::ERROR
+<% if mysql -%>
+# Use the mysql message queue adapter
+Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER] = "Skynet::MessageQueueAdapter::Mysql"
+<% end %>
View
2 app_generators/skynet_install/templates/skynet_mysql_schema.sql
@@ -7,7 +7,7 @@ CREATE TABLE skynet_message_queues (
tasktype varchar(255) default NULL,
task_id bigint(20) unsigned default NULL,
job_id bigint(20) unsigned default NULL,
- raw_payload text,
+ raw_payload longtext,
payload_type varchar(255) default NULL,
name varchar(255) default NULL,
expiry int(11) default NULL,
View
8 bin/skynet_tuplespace_server
@@ -15,12 +15,12 @@ require 'rubygems'
require 'daemons'
require 'pp'
-require File.expand_path(File.dirname(__FILE__)) + '/../lib/skynet.rb'
+require File.expand_path(File.dirname(__FILE__)) + '/../lib/skynet.rb'
options = {
:port => 7647,
- :logfile => Skynet::CONFIG[:SKYNET_LOG_FILE],
- :loglevel => "DEBUG",
+ :logfile => Skynet::Config.pidfile_location,
+ :loglevel => "ERROR",
:piddir => Skynet::CONFIG[:SKYNET_PID_DIR]
}
@@ -32,7 +32,7 @@ OptionParser.new do |opt|
opt.on('-p', '--port PORT', 'Port to listen on. default 7647') do |v|
options[:port] = v.to_i
end
- opt.on('-o', '--log LOGFILE', 'Logfile to log to') do |v|
+ opt.on('-o', '--logfile LOGFILE', 'Logfile to log to') do |v|
options[:logfile] = v
end
opt.on('-l', '--loglevel LOGLEVEL', 'Log level defaults to DEBUG') do |v|
View
7 extras/README
@@ -0,0 +1,7 @@
+I've included some extras to help you deploy and monitor skynet.
+
+init.d/skynet # sample init.d script to start/stop/restart skynet
+
+nagios/check_skynet.sh # A script we use to have nagios (a monitoring tool) check our skynet Q.
+
+rails/* # I've included the controller and view we use to monitor our skynet Q
View
87 extras/init.d/skynet
@@ -0,0 +1,87 @@
+#!/bin/sh
+#
+# skynet start/stop skynet
+#
+# processname: solr
+#
+
+# Source function library
+. /etc/init.d/functions
+
+# Get network config
+. /etc/sysconfig/network
+
+HOME=/skynet/current
+RAILS_ENV=production
+
+cd $HOME
+RETVAL=0
+start() {
+ echo -n $"Starting Skynet: "
+ ./script/skynet &
+ RETVAL=$?
+ echo
+ return $RETVAL
+}
+
+stop() {
+ echo -n $"Stopping Skynet: "
+ pid=`cat /skynet/logs/skynet_production.pid`
+ kill ${pid}
+ RETVAL=$?
+ return $RETVAL
+}
+
+restart() {
+ echo -n $"Restarting Skynet: "
+ ./script/skynet -r
+ RETVAL=$?
+ echo
+ return $RETVAL
+}
+
+
+reload() {
+ stop
+ start
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ exit
+ ;;
+ restart)
+ restart
+ ;;
+ condrestart)
+ [ -f /var/lock/subsys/skynet ] && restart || :
+ ;;
+ reload)
+ reload
+ ;;
+ *)
+ echo $"Usage: $0 {start|stop|status|restart|condrestart|reload}"
+ exit 1
+esac
+get_pid_for_cmd ()
+{
+ pid=`cat /skynet/logs/skynet_worker7647.pid`
+ return pid
+
+ #for pid in `ls -t /proc`; do
+ # if [ -d /proc/$pid ] && [ -f /proc/$pid/cmdline ]; then
+ # if [ "$1" = "$(</proc/$pid/cmdline)" ]; then
+ # return
+ # fi
+ # fi
+ #done
+ #pid=0
+}
+
+exit $?
View
121 extras/nagios/check_skynet.sh
@@ -0,0 +1,121 @@
+#! /bin/sh
+#
+# Usage: ./check_skynet --help
+#
+# Examples:
+# ./check_skynet -w 300 -c 2000 -u
+
+# Paths to commands used in this script. These
+# may have to be modified to match your system setup.
+
+PATH=""
+
+ECHO="/bin/echo"
+SED="/bin/sed"
+GREP="/bin/grep"
+TAIL="/bin/tail"
+CAT="/bin/cat"
+CUT="/bin/cut"
+WC="/bin/wc"
+CURL="/usr/bin/curl -f"
+
+PROGNAME=`/bin/basename $0`
+PROGPATH=`echo $0 | /bin/sed -e 's,[\\/][^\\/][^\\/]*$,,'`
+REVISION=`echo '$Revision: 0.1 $' | /bin/sed -e 's/[^0-9.]//g'`
+
+. /usr/local/nagios/libexec/utils.sh
+
+print_usage() {
+ echo "Usage: $PROGNAME -w <threshold> -c <threshold> -u <url of skynet status page>"
+ echo "Usage: $PROGNAME --help"
+ echo "Usage: $PROGNAME --version"
+}
+
+print_help() {
+ print_revision $PROGNAME $REVISION
+ echo ""
+ print_usage
+ echo ""
+ echo "Check Skynet's untaken_tasks"
+ echo ""
+ support
+}
+
+# Make sure the correct number of command line
+# arguments have been supplied
+
+if [ $# -lt 3 ]; then
+ print_usage
+ exit $STATE_UNKNOWN
+fi
+
+# Grab the command line arguments
+
+exitstatus=$STATE_UNKNOWN #default
+while test -n "$1"; do
+ case "$1" in
+ --help)
+ print_help
+ exit $STATE_OK
+ ;;
+ -h)
+ print_help
+ exit $STATE_OK
+ ;;
+ --version)
+ print_revision $PROGNAME $VERSION
+ exit $STATE_OK
+ ;;
+ -V)
+ print_revision $PROGNAME $VERSION
+ exit $STATE_OK
+ ;;
+ -w)
+ WARNING=$2;
+ shift;
+ ;;
+ -c)
+ CRITICAL=$2;
+ shift;
+ ;;
+ -u)
+ URL=$2;
+ shift;
+ ;;
+ *)
+ echo "Unknown argument: $1"
+ print_usage
+ exit $STATE_UNKNOWN
+ ;;
+ esac
+ shift
+done
+
+
+CURRENT=$($CURL $URL | $GREP "untaken_tasks\|down"| $CUT -d: -f2)
+
+if [ -z $CURRENT ] ;then
+ $ECHO "CANNOT GATHER SKYNET TASKS CALL NOPS"
+ exit $STATE_UNKNOWN
+fi
+
+if [ $CURRENT -ge $CRITICAL ]; then
+ $ECHO "Skynet untaken_tasks: $CURRENT threshold: $CRITICAL CRITICAL"
+ $ECHO " "
+ exit $STATE_CRITICAL
+fi
+
+if [ $CURRENT -ge $WARNING ]; then
+ $ECHO "Skynet untaken_tasks: $CURRENT threshold: $WARNING WARNING"
+ $ECHO " "
+ exit $STATE_WARNING
+fi
+
+if [[ $CURRENT -lt $CRITICAL && $CURRENT -lt $WARNING ]]; then
+ $ECHO "Skynet untaken_tasks: $CURRENT threshold critical: $CRITICAL threshold warning: $WARNING OK"
+ $ECHO " "
+ exit $STATE_OK
+
+fi
+$ECHO "NO SCRIPT OUTPUT CALL NOPS!"
+exit $STATE_CRITICAL
View
41 extras/rails/controllers/skynet_controller.rb
@@ -0,0 +1,41 @@
+class Admin::SkynetController < AdminController
+
+ def index
+ begin
+ setup
+ if params[:skynet_message_queue]
+ Skynet.configure(:MYSQL_MESSAGE_QUEUE_TABLE => params[:skynet_message_queue]) do
+ @stats = @mq.stats
+ end
+ else
+ @stats = @mq.stats
+ end
+ @stats.merge!(Skynet::Manager.stats_for_hosts)
+ @stats[:hosts] = @stats[:servers].size
+ rescue Exception => e
+ logger.error "ERROR #{e.inspect} #{e.backtrace.join("\n")}"
+ end
+ end
+
+ # plain text page that will be used by monitoring scripts
+ def status
+ begin
+ setup
+ stats = @mq.stats
+ stats[:servers] = stats[:servers].keys.join(",")
+ stats.each { |k,v| stats[k.to_s] = stats.delete(k) }
+ text = stats.keys.sort.collect{ |k| "#{k}:#{stats[k]}" }.join("\n") + "\n"
+ render :text => text, :content_type => 'text/plain'
+ rescue Exception => e
+ render :text => "skynet is down\n", :content_type => 'text/plain'
+ end
+ end
+
+ private
+
+ def setup
+ @mq ||= Skynet::MessageQueue.new(Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER])
+ @last_updated = Time.now.strftime('%r')
+ end
+
+end
View
137 extras/rails/views/skynet/index.rhtml
@@ -0,0 +1,137 @@
+
+ <div class="hd">
+ <h1>SKYNET STATUS</h1>
+ <span>Last updated: <%= @last_updated || 'N/A' %></span>
+ <br />
+ <br />
+ </div>
+
+ <% if @stats %>
+
+ <h2>Overall Stats</h2>
+ <table class="admin_table">
+ <thead>
+ <tr>
+ <th>Hosts</th>
+ <th>Workers</th>
+ <th>Active Workers<BR>(Masters / Tasks / Either)</th>
+ <th>Idle Workers<BR>(Masters / Tasks / Either)</th>
+ <th>Untaken Tasks<BR>(Master / Task)</th>
+ <th>Taken Tasks<BR>(Master / Task)</th>
+ <th>Results</th>
+ <th>Processed <br />(by active workers)</th>
+ <th>Process Time</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td class="numeric"><%= @stats[:hosts] %></td>
+ <td class="numeric"><%= @stats[:number_of_workers] %></td>
+ <td class="numeric"><b><%= @stats[:active_workers] %></b> (<%= @stats[:active_masters] %> / <%= @stats[:active_taskworkers] %> / <%= @stats[:active_master_or_task_workers] %>)</td>
+ <td class="numeric"><b><%= @stats[:idle_workers] %></b> (<%= @stats[:idle_masters] %> / <%= @stats[:idle_taskworkers] %> / <%= @stats[:idle_master_or_task_workers] %>)</td>
+ <td class="numeric"><%= @stats[:untaken_tasks] %> (<b><%= @stats[:untaken_master_tasks] %></b> / <%= @stats[:untaken_task_tasks] %>)</td>
+ <td class="numeric"><b><%= @stats[:taken_tasks] %></b> (<%= @stats[:taken_master_tasks] %> / <%= @stats[:taken_task_tasks] %>)</td>
+ <td class="numeric"><%= @stats[:results] %></td>
+ <td class="numeric"><%= @stats[:processed] %> (<%= @stats[:processed_by_active_workers] %>)</td>
+ <td><%= @stats[:time] %></td>
+ </tr>
+ </tbody>
+ </table>
+
+
+ <BR>
+ <h2>Servers</h2>
+ <table class="admin_table">
+ <colgroup>
+ <col span="1" width="3%" />
+ <col span="1" width="10%" />
+ <col span="1" width="5%" />
+ <col span="1" width="5%" />
+ <col span="1" width="5%" />
+ <col span="1" width="5%" />
+ </colgroup>
+ <thead>
+ <tr>
+ <th class="numeric">#</th>
+ <th>Hostname</th>
+ <th>Number of Workers</th>
+ <th>Active Workers<BR>Masters / Tasks / Either</th>
+ <th>Idle Workers<BR>Masters / Tasks / Either</th>
+ <th>Processed <br />(by active workers)</th>
+ </tr>
+ </thead>
+
+ <% if @stats[:servers] %>
+ <tbody>
+ <% @stats[:servers].keys.sort.each do |hostname| %>
+ <% server = @stats[:servers][hostname] %>
+ <% i = 1 %>
+ <tr>
+ <td class="numeric"><%= i %></td>
+ <td><%= server[:hostname] %></td>
+ <td class="numeric"><%= server[:number_of_workers] %></td>
+ <td class="numeric"><%= server[:active_workers] %> (<%= server[:active_masters] %> / <%= server[:active_taskworkers] %> / <%= server[:active_master_or_task_workers] %>) </td>
+ <td class="numeric"><%= server[:idle_workers] %> (<%= server[:idle_masters] %> / <%= server[:idle_taskworkers] %> / <%= server[:idle_master_or_task_workers] %>) </td>
+ <td class="numeric"><%= server[:processed] %> (<%= server[:processed_by_active_workers] %>)</td>
+ </tr>
+ <% i += 1 %>
+ <% end %>
+ </tbody>
+ <% end %>
+
+ </table>
+
+ <% end %>
+
+ <!-- <table class="admin_table">
+ <colgroup>
+ <col span="1" width="5%" />
+ <col span="1" width="8%" />
+ <col span="1" width="10%" />
+ <col span="1" width="10%" />
+ <col span="1" width="5%" />
+ <col span="1" width="5%" />
+ </colgroup>
+ <thead>
+ <tr>
+ <th class="numeric">#</th>
+ <th>Worker ID</th>
+ <th>Hostname</th>
+ <th>PID</th>
+ <th>Job ID</th>
+ <th>Task ID</th>
+ <th>Version</th>
+ <th>Processed</th>
+ <th>M/R</th>
+ <th>Name</th>
+ </tr>
+ </thead>
+
+ <% if false and @servers %>
+ <tbody>
+ <% @servers.each do |server| %>
+ <tr>
+ <td colspan="10"><%= server[:manager] %></td>
+ </tr>
+ <% i = 1 %>
+ <% server[:workers].each do |w| %>
+ <tr>
+ <td class="numeric"><%= i %></td>
+ <td><%= w.worker_id %></td>
+ <td><%= w.hostname %></td>
+ <td class="numeric"><%= w.process_id %></td>
+ <td class="numeric"><%= w.job_id %></td>
+ <td class="numeric"><%= w.task_id %></td>
+ <td class="numeric"><%= w.version %></td>
+ <td class="numeric"><%= w.processed %></td>
+ <td><%= w.map_or_reduce || '-' %></td>
+ <td><%= w.name %></td>
+ </tr>
+ <% i += 1 %>
+ <% end %>
+ <% end %>
+ </tbody>
+ <% end %>
+
+ </table> -->
+
View
16 lib/skynet.rb
@@ -17,7 +17,9 @@
require 'skynet_message'
require 'message_queue_adapters/message_queue_adapter'
require 'message_queue_adapters/tuple_space'
+require 'worker_queue_adapters/tuple_space'
require "skynet_message_queue"
+require "skynet_worker_queue"
require 'skynet_partitioners'
require 'skynet_job'
require 'skynet_worker'
@@ -29,6 +31,7 @@
require 'active_record'
require 'skynet_active_record_extensions'
require 'message_queue_adapters/mysql'
+ require 'worker_queue_adapters/mysql'
rescue LoadError => e
end
require 'mapreduce_test'
@@ -51,14 +54,25 @@ def self.fork_and_exec(command)
sleep 0.01 # remove contention on manager drb object
log = Skynet::Logger.get
info "executing /bin/sh -c \"#{command}\""
- pid = fork do
+ pid = safefork do
close_files
exec("/bin/sh -c \"#{command}\"")
exit
end
Process.detach(pid)
pid
end
+
+ def self.safefork (&block)
+ @fork_tries ||= 0
+ fork(&block)
+ rescue Errno::EWOULDBLOCK
+ raise if @fork_tries >= 20
+ @fork_tries += 1
+ sleep 5
+ retry
+ end
+
# close open file descriptors starting with STDERR+1
def self.close_files(from=3, to=50)
View
2 lib/skynet/mapreduce_helper.rb
@@ -22,7 +22,7 @@ module MapreduceHelper
# SomeUrlSlurper.gather_results(url) # returns an array of urls of sites that link to the given url
# end
#
-# def self.reduce(linked_from_url)
+# def self.reduce_each(linked_from_url)
# SomeUrlSluper.find_text("mysite", linked_from_url) # finds all the times "mysite" appears in the given url, which we know links to the url given in the map_data
# end
# end
View
954 lib/skynet/message_queue_adapters/mysql.rb
@@ -17,360 +17,305 @@ class RequestExpiredError < Skynet::Error
class InvalidMessage < Skynet::Error
end
-
- class MessageQueueAdapter
- class Mysql < Skynet::MessageQueueAdapter
-
- include SkynetDebugger
- include Skynet::GuidGenerator
+ class MessageQueueAdapter::Mysql < Skynet::MessageQueueAdapter
+
+ include SkynetDebugger
+ include Skynet::GuidGenerator
- SEARCH_FIELDS = [:tasktype, :task_id, :job_id, :payload_type, :expire_time, :iteration, :version] unless defined?(SEARCH_FIELDS)
-
- Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TEMP_CHECK_DELAY] ||= 30
-
- @@db_set = false
-
- def self.adapter
- :mysql
- end
+ SEARCH_FIELDS = [:tasktype, :task_id, :job_id, :payload_type, :expire_time, :iteration, :version] unless defined?(SEARCH_FIELDS)
+
+ Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TEMP_CHECK_DELAY] ||= 30
+
+ @@db_set = false
+
+ def self.adapter
+ :mysql
+ end
- def initialize
- if Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE]
- SkynetMessageQueue.table_name = Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE]
- end
- if not @@db_set
- if Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
- begin
- SkynetMessageQueue.establish_connection Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
- SkynetWorkerQueue.establish_connection Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
- rescue ActiveRecord::AdapterNotSpecified => e
- error "#{Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]} not defined as a database adaptor #{e.message}"
- end
- elsif not ActiveRecord::Base.connected?
- db_options = {
- :adapter => Skynet::CONFIG[:MYSQL_ADAPTER],
- :host => Skynet::CONFIG[:MYSQL_HOST],
- :username => Skynet::CONFIG[:MYSQL_USERNAME],
- :password => Skynet::CONFIG[:MYSQL_PASSWORD],
- :database => Skynet::CONFIG[:MYSQL_DATABASE]
- }
- ActiveRecord::Base.establish_connection(db_options)
+ def self.start_or_connect(options={})
+ new
+ end
+
+ def initialize
+ if Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE]
+ SkynetMessageQueue.table_name = Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE]
+ end
+ if not @@db_set
+ if Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
+ begin
+ SkynetMessageQueue.establish_connection Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
+ SkynetWorkerQueue.establish_connection Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]
+ rescue ActiveRecord::AdapterNotSpecified => e
+ error "#{Skynet::CONFIG[:MYSQL_QUEUE_DATABASE]} not defined as a database adaptor #{e.message}"
end
+ elsif (not ActiveRecord::Base.connected?) and Skynet::CONFIG[:MYSQL_DATABASE]
+ db_options = {
+ :adapter => Skynet::CONFIG[:MYSQL_ADAPTER],
+ :host => Skynet::CONFIG[:MYSQL_HOST],
+ :username => Skynet::CONFIG[:MYSQL_USERNAME],
+ :password => Skynet::CONFIG[:MYSQL_PASSWORD],
+ :database => Skynet::CONFIG[:MYSQL_DATABASE]
+ }
+ ActiveRecord::Base.establish_connection(db_options)
end
- @@db_set = true
-
- end
-
- def message_queue_table
- Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE] || SkynetMessageQueue.table_name
- end
-
- def self.debug_class_desc
- "MYSQLMQ"
end
+ @@db_set = true
- def message_to_conditions(message)
- template_to_conditions(message.to_a)
- end
+ end
+
+ def message_queue_table
+ Skynet::CONFIG[:MYSQL_MESSAGE_QUEUE_TABLE] || SkynetMessageQueue.table_name
+ end
+
+ def self.debug_class_desc
+ "MYSQLMQ"
+ end
+
+ def message_to_conditions(message)
+ template_to_conditions(message.to_a)
+ end
- def template_to_conditions(template,fields=Skynet::Message.fields)
- conditions = []
- values = []
-
- fields.each_with_index do |field,ii|
- value = template[ii]
- next unless value
- if value.is_a?(Range)
- conditions << "#{field} BETWEEN #{value.first} AND #{value.last}"
- elsif value.is_a?(Symbol) or value.is_a?(String)
- conditions << "#{field} = '#{value}'"
- else
- conditions << "#{field} = #{value}"
- end
- end
- return '' if conditions.empty?
- return conditions.join(" AND ")
+ def template_to_conditions(template,fields=Skynet::Message.fields)
+ conditions = []
+ values = []
+
+ fields.each_with_index do |field,ii|
+ value = template[ii]
+ next unless value
+ if value.is_a?(Range)
+ conditions << "#{field} BETWEEN #{value.first} AND #{value.last}"
+ elsif value.is_a?(Symbol) or value.is_a?(String)
+ conditions << "#{field} = '#{value}'"
+ else
+ conditions << "#{field} = #{value}"
+ end
end
-
- def message_to_hash(message,timeout=nil,fields=Skynet::Message.fields)
- timeout ||= message.expiry
- hash = {}
- fields.each do |field|
- next if field == :drburi
- # next unless message.send(field)
- if message.send(field).is_a?(Symbol)
- hash[field] = message.send(field).to_s
- elsif field == :payload
- hash[:raw_payload] = message.raw_payload
- else
- hash[field] = message.send(field)
- end
- end
- if timeout
- hash[:timeout] = timeout
- hash[:expire_time] = (Time.now.to_f + timeout) unless hash[:expire_time]
+ return '' if conditions.empty?
+ return conditions.join(" AND ")
+ end
+
+ def message_to_hash(message,timeout=nil,fields=Skynet::Message.fields)
+ timeout ||= message.expiry
+ hash = {}
+ fields.each do |field|
+ next if field == :drburi
+ # next unless message.send(field)
+ if message.send(field).is_a?(Symbol)
+ hash[field] = message.send(field).to_s
+ elsif field == :payload
+ hash[:raw_payload] = message.raw_payload
+ else
+ hash[field] = message.send(field)
end
- hash
end
-
- def write_fallback_message(message_row, message)
- tran_id = get_unique_id(1)
- ftm = message.fallback_task_message
- update_sql = %{
- update #{message_queue_table}
- SET iteration = #{ftm.iteration },
- expire_time = #{ftm.expire_time},
- updated_on = '#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}',
- tran_id = #{tran_id}
- WHERE id = #{message_row.id} AND iteration = #{message.iteration}
- AND tran_id #{(message_row.tran_id ? " =#{message_row.tran_id}" : ' IS NULL')}
- }
- rows = update(update_sql) || 0
- message_row.tran_id = tran_id if rows == 1
- rows
+ if timeout
+ hash[:timeout] = timeout
+ hash[:expire_time] = (Time.now.to_f + timeout) unless hash[:expire_time]
end
-
- def take_next_task(curver,timeout=0.5,payload_type=nil,queue_id=0)
- timeout = Skynet::CONFIG[:MYSQL_NEXT_TASK_TIMEOUT] if timeout < 1
- debug "TASK NEXT TASK!!!!!!! timeout: #{timeout} queue_id:#{queue_id}"
+ hash
+ end
+
+ def write_fallback_message(message_row, message)
+ tran_id = get_unique_id(1)
+ ftm = message.fallback_task_message
+ update_sql = %{
+ update #{message_queue_table}
+ SET iteration = #{ftm.iteration },
+ expire_time = #{ftm.expire_time},
+ updated_on = '#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}',
+ tran_id = #{tran_id}
+ WHERE id = #{message_row.id} AND iteration = #{message.iteration}
+ AND tran_id #{(message_row.tran_id ? " =#{message_row.tran_id}" : ' IS NULL')}
+ }
+ rows = update(update_sql) || 0
+ message_row.tran_id = tran_id if rows == 1
+ rows
+ end
+
+ def take_next_task(curver,timeout=0.5,payload_type=nil,queue_id=0)
+ timeout = Skynet::CONFIG[:MYSQL_NEXT_TASK_TIMEOUT] if timeout < 1
+ debug "TASK NEXT TASK!!!!!!! timeout: #{timeout} queue_id:#{queue_id}"
- start = Time.now
- template = Skynet::Message.next_task_template(curver, payload_type)
+ start = Time.now
+ template = Skynet::Message.next_task_template(curver, payload_type)
+ message = nil
+
+ loop do
+ rows = 0
message = nil
-
- loop do
- rows = 0
- message = nil
- template = Skynet::Message.next_task_template(curver, payload_type)
- begin
- message_row = find_next_message(template, payload_type)
- if message_row
- message = Skynet::Message.new(message_row.attributes)
- rows = write_fallback_message(message_row, message)
-
- if rows < 1
- old_temp = temperature(payload_type)
- set_temperature(payload_type, template_to_conditions(template), queue_id)
- debug "MISSCOLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
- else
- break
- end
- else # no messages on queue with this temp
- old_temp = temperature(payload_type)
- if old_temp > 1
- set_temperature(payload_type, template_to_conditions(template), queue_id)
- end
- debug "MISS PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
- end
- rescue Skynet::Message::BadMessage => e
- message_row.destroy
- next
- rescue ActiveRecord::StatementInvalid => e
- if e.message =~ /Deadlock/
+ template = Skynet::Message.next_task_template(curver, payload_type)
+ begin
+ message_row = find_next_message(template, payload_type)
+ if message_row
+ message = Skynet::Message.new(message_row.attributes)
+ rows = write_fallback_message(message_row, message)
+
+ if rows < 1
old_temp = temperature(payload_type)
set_temperature(payload_type, template_to_conditions(template), queue_id)
- debug "COLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
+ debug "MISSCOLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
else
- raise e
+ break
+ end
+ else # no messages on queue with this temp
+ old_temp = temperature(payload_type)
+ if old_temp > 1
+ set_temperature(payload_type, template_to_conditions(template), queue_id)
end
+ debug "MISS PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
end
- if Time.now.to_f > start.to_f + timeout
- debug "MISSTIMEOUT PTYPE #{payload_type} #{temperature(payload_type)}"
- raise Skynet::RequestExpiredError.new
- else
- sleepy = rand(timeout * 0.5 )
- debug "EMPTY QUEUE #{temperature(payload_type)} SLEEPING: #{timeout} / #{sleepy}"
- sleep sleepy
+ rescue Skynet::Message::BadMessage => e
+ message_row.destroy
+ next
+ rescue ActiveRecord::StatementInvalid => e
+ if e.message =~ /Deadlock/
+ old_temp = temperature(payload_type)
+ set_temperature(payload_type, template_to_conditions(template), queue_id)
+ debug "COLLISION PTYPE #{payload_type} OLDTEMP: #{old_temp} NEWTEMP: #{temperature(payload_type)}"
+ else
+ raise e
end
end
-
- return message
- end
-
- def write_message(message,timeout=nil)
- timeout ||= message.expiry
- SkynetMessageQueue.create(message_to_hash(message, timeout))
- end
-
- def write_result(message,result=[],timeout=nil)
- timeout ||= message.expiry
- result_message = message.result_message(result)
- result_message.expire_time = nil
- update_message_with_result(result_message,timeout)
- end
-
- def update_message_with_result(message,timeout=nil)
- timeout ||= message.expiry
- timeout_sql = (timeout ? ", timeout = #{timeout}, expire_time = #{Time.now.to_f + timeout}" : '')
- rows = 0
- raw_payload_sql = " raw_payload = "
- raw_payload_sql << (message.raw_payload ? "'#{::Mysql.escape_string(message.raw_payload)}'" : 'NULL')
- update_sql = %{
- update #{message_queue_table}
- set tasktype = "#{message.tasktype}",
- #{raw_payload_sql},
- payload_type = "#{message.payload_type}",
- updated_on = "#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}",
- tran_id = NULL
- #{timeout_sql}
- where task_id = #{message.task_id}
- }
- rows = update(update_sql)
-
- raise Skynet::RequestExpiredError.new() if rows == 0
- end
-
- def take_result(job_id,timeout=1)
- start = Time.now
- result = nil
- sleep_time = 10
- if timeout < 1
- sleep_time = 1
- elsif timeout > 10
- sleep_time = 10
- else
- sleep_time = timeout * 0.25
+ if Time.now.to_f > start.to_f + timeout
+ debug "MISSTIMEOUT PTYPE #{payload_type} #{temperature(payload_type)}"
+ raise Skynet::RequestExpiredError.new
+ else
+ sleepy = rand(timeout * 0.5 )
+ debug "EMPTY QUEUE #{temperature(payload_type)} SLEEPING: #{timeout} / #{sleepy}"
+ sleep sleepy
end
- message_row = nil
-
- loop do
- # message_row = take(Skynet::Message.result_template(job_id), start, timeout,sleep_time)
- conditions = template_to_conditions(Skynet::Message.result_template(job_id))
- # sleep_time ||= timeout
-
- message_row = SkynetMessageQueue.find(:first,:conditions => conditions)
-
- break if message_row
+ end
- if Time.now.to_f > start.to_f + timeout
- raise Skynet::RequestExpiredError.new
- else
- sleepy = rand(sleep_time)
- # error "RESULT EMPTY SLEEPING: #{sleepy}"
- sleep sleepy
- next
- end
- next
- end
+ return message
+ end
+
+ def write_message(message,timeout=nil)
+ timeout ||= message.expiry
+ SkynetMessageQueue.create(message_to_hash(message, timeout))
+ end
+
+ def write_result(message,result=[],timeout=nil)
+ timeout ||= message.expiry
+ result_message = message.result_message(result)
+ result_message.expire_time = nil
+ update_message_with_result(result_message,timeout)
+ end
+
+ def update_message_with_result(message,timeout=nil)
+ timeout ||= message.expiry
+ timeout_sql = (timeout ? ", timeout = #{timeout}, expire_time = #{Time.now.to_f + timeout}" : '')
+ rows = 0
+ raw_payload_sql = " raw_payload = "
+ raw_payload_sql << (message.raw_payload ? "'#{::Mysql.escape_string(message.raw_payload)}'" : 'NULL')
+ update_sql = %{
+ update #{message_queue_table}
+ set tasktype = "#{message.tasktype}",
+ #{raw_payload_sql},
+ payload_type = "#{message.payload_type}",
+ updated_on = "#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}",
+ tran_id = NULL
+ #{timeout_sql}
+ where task_id = #{message.task_id}
+ }
+ rows = update(update_sql)
+
+ raise Skynet::RequestExpiredError.new() if rows == 0
+ end
- result = Skynet::Message.new(message_row.clone.attributes)
- message_row.destroy
- return result if result
- end
-
- def list_tasks(iteration=nil)
- conditions = template_to_conditions(Skynet::Message.outstanding_tasks_template(iteration))
- SkynetMessageQueue.find(:all,:conditions => conditions)
+ def take_result(job_id,timeout=1)
+ start = Time.now
+ result = nil
+ sleep_time = 10
+ if timeout < 1
+ sleep_time = 1
+ elsif timeout > 10
+ sleep_time = 10
+ else
+ sleep_time = timeout * 0.25
end
+ message_row = nil
- def list_results
- conditions = template_to_conditions(Skynet::Message.outstanding_results_template)
- SkynetMessageQueue.find(:all,:conditions => conditions)
- end
+ loop do
+ # message_row = take(Skynet::Message.result_template(job_id), start, timeout,sleep_time)
+ conditions = template_to_conditions(Skynet::Message.result_template(job_id))
+ # sleep_time ||= timeout
- def write_error(message,error='',timeout=nil)
- message.expire_time = nil
- update_message_with_result(message.error_message(error),timeout)
- end
-
- def write_worker_status(task, timeout=nil)
- message = Skynet::WorkerStatusMessage.new(task)
- update_hash = message_to_hash(message, timeout, Skynet::WorkerStatusMessage.fields)
- update_hash.each do |k,v|
- if not v
- update_hash[k] = "NULL"
- elsif v.kind_of?(String) or v.kind_of?(Symbol)
- update_hash[k] = "'#{v}'"
- end
- end
+ message_row = SkynetMessageQueue.find(:first,:conditions => conditions)
+
+ break if message_row
- update_sql = "UPDATE skynet_worker_queues SET #{update_hash.collect{|k,v| "#{k}=#{v}"}.join(',')} WHERE worker_id=#{message.worker_id}"
- rows = update(update_sql)
- if rows == 0
- begin
- insert_sql = "INSERT INTO skynet_worker_queues (#{update_hash.keys.join(',')}) VALUES (#{update_hash.values.join(',')})"
- rows = update(insert_sql)
- rescue ActiveRecord::StatementInvalid => e
- if e.message =~ /Duplicate/
- error "DUPLICATE WORKER #{e.message} #{e.backtrace.join("\n")}"
- else
- raise e
- end
- end
- end
- return rows
- end
-
- def take_worker_status(task, timeout=nil)
- conditions = template_to_conditions(Skynet::WorkerStatusMessage.worker_status_template(task), Skynet::WorkerStatusMessage.fields)
- worker_status = nil
- SkynetWorkerQueue.transaction do
- worker_row = SkynetWorkerQueue.find(:first, :conditions => conditions)
- return unless worker_row
- worker_status = Skynet::WorkerStatusMessage.new(worker_row.clone.attributes)
- worker_row.destroy
- end
- worker_status
- end
-
- def read_all_worker_statuses(hostname=nil,process_id=nil)
- ws = Skynet::WorkerStatusMessage.all_workers_template(hostname)
- ws[4] = process_id if process_id
- conditions = template_to_conditions(ws,Skynet::WorkerStatusMessage.fields)
- rows = SkynetWorkerQueue.find(:all, :conditions => conditions)
- workers = rows.collect{ |w| Skynet::WorkerStatusMessage.new(w.attributes) }#.sort{ |a,b| a.process_id <=> b.process_id }
- end
+ if Time.now.to_f > start.to_f + timeout
+ raise Skynet::RequestExpiredError.new
+ else
+ sleepy = rand(sleep_time)
+ # error "RESULT EMPTY SLEEPING: #{sleepy}"
+ sleep sleepy
+ next
+ end
+ next
+ end
+ result = Skynet::Message.new(message_row.clone.attributes)
+ message_row.destroy
+ return result if result
+ end
+
+ def list_tasks(iteration=nil)
+ conditions = template_to_conditions(Skynet::Message.outstanding_tasks_template(iteration))
+ SkynetMessageQueue.find(:all,:conditions => conditions)
+ end
+
+ def list_results
+ conditions = template_to_conditions(Skynet::Message.outstanding_results_template)
+ SkynetMessageQueue.find(:all,:conditions => conditions)
+ end
- def clear_worker_status(hostname=nil)
- sql = "delete from skynet_worker_queues "
- if hostname
- sql << "where hostname = '#{hostname}'"
- end
- SkynetWorkerQueue.connection.execute(sql)
- end
+ def write_error(message,error='',timeout=nil)
+ message.expire_time = nil
+ update_message_with_result(message.error_message(error),timeout)
+ end
- def version_active?(curver=nil, queue_id=0)
- return true unless curver
- message_row = find_next_message(Skynet::Message.next_task_template(curver, nil, queue_id), :any, 1)
- if message_row or curver.to_i == get_worker_version.to_i
- true
- else
- false
- end
+ def version_active?(curver=nil, queue_id=0)
+ return true unless curver
+ message_row = find_next_message(Skynet::Message.next_task_template(curver, nil, queue_id), :any, 1)
+ if message_row or curver.to_i == get_worker_version.to_i
+ true
+ else
+ false
end
+ end
- def set_worker_version(ver=nil)
- ver ||= 1
- SkynetMessageQueue.connection.insert("replace #{message_queue_table} (tran_id, task_id, tasktype, version) values (0, 0, 'version',#{ver})")
- ver
- end
+ def set_worker_version(ver=nil)
+ ver ||= 1
+ SkynetMessageQueue.connection.insert("replace #{message_queue_table} (tran_id, task_id, tasktype, version) values (0, 0, 'version',#{ver})")
+ ver
+ end
- def get_worker_version
- ver = SkynetMessageQueue.connection.select_value("select version from #{message_queue_table} where tran_id = 0 and tasktype = 'version'")
- if not ver
- begin
- SkynetMessageQueue.connection.insert("insert into #{message_queue_table} (tran_id, task_id, tasktype, version) values (0, 0, 'version', 1)")
- rescue ActiveRecord::StatementInvalid => e
- if e.message =~ /Duplicate/
- return get_worker_version
- else
- raise e
- end
+ def get_worker_version
+ ver = SkynetMessageQueue.connection.select_value("select version from #{message_queue_table} where tran_id = 0 and tasktype = 'version'")
+ if not ver
+ begin
+ SkynetMessageQueue.connection.insert("insert into #{message_queue_table} (tran_id, task_id, tasktype, version) values (0, 0, 'version', 1)")
+ rescue ActiveRecord::StatementInvalid => e
+ if e.message =~ /Duplicate/
+ return get_worker_version
+ else
+ raise e
end
- ver = 1
end
- ver.to_i
+ ver = 1
end
-
- def clear_outstanding_tasks
- SkynetMessageQueue.connection.execute("delete from #{message_queue_table} where tasktype = 'task'")
- end
+ ver.to_i
+ end
+
+ def clear_outstanding_tasks
+ SkynetMessageQueue.connection.execute("delete from #{message_queue_table} where tasktype = 'task'")
+ end
- def delete_expired_messages
- SkynetMessageQueue.connection.execute("delete from #{message_queue_table} where (tasktype='result' and expire_time < #{Time.now.to_i}) OR (expire_time BETWEEN 1 AND '#{Time.now.to_i - 7200}' and iteration = -1) OR (expire_time BETWEEN 1 AND '#{Time.now.to_i - 36000}')")
- end
+ def delete_expired_messages
+ SkynetMessageQueue.connection.execute("delete from #{message_queue_table} where (tasktype='result' and expire_time < #{Time.now.to_i}) OR (expire_time BETWEEN 1 AND '#{Time.now.to_i - 7200}' and iteration = -1) OR (expire_time BETWEEN 1 AND '#{Time.now.to_i - 36000}')")
+ end
# select hostname, iteration, count(id) as number_of_workers, count(iteration) as iteration, sum(processed) as processed, max(started_at) as most_recent_task_time from skynet_worker_queues where tasksubtype = 'worker' group by hostname, iteration;
#
@@ -379,248 +324,179 @@ def delete_expired_messages
#
# from skynet_worker_queues where tasksubtype = 'worker' group by hostname;
- def stats
- stats = {
- :servers => {},
- :results => 0,
- :taken_tasks => 0,
- :untaken_tasks => 0,
- :taken_master_tasks => 0,
- :taken_task_tasks => 0,
- :untaken_master_tasks => 0,
- :untaken_task_tasks => 0,
- :failed_tasks => 0,
- :processed => 0,
- :number_of_workers => 0,
- :active_workers => 0,
- :idle_workers => 0,
- :hosts => 0,
- :masters => 0,
- :taskworkers => 0,
- :time => Time.now.to_f
- }
-
- stat_rows = SkynetWorkerQueue.connection.select_all(%{
- SELECT tasktype, payload_type, iteration, count(id) as number_of_tasks
- FROM #{message_queue_table}
- GROUP BY tasktype, payload_type, iteration
- })
- # pp stat_rows
- stat_rows.each do |row|
- if row["tasktype"] == "result" or row["payload_type"] == "result"
- stats[:results] += row["number_of_tasks"].to_i
- elsif row["tasktype"] == "task"
- type_of_tasks = nil
- if row["payload_type"] == "master"
- type_of_tasks = :master_tasks
- elsif row["payload_type"] == "task"
- type_of_tasks = :task_tasks
- end
- if row["iteration"].to_i > 0
- stats["taken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i
- stats[:taken_tasks] += row["number_of_tasks"].to_i
- elsif row["iteration"].to_i == 0
- stats["untaken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i
- stats[:untaken_tasks] += row["number_of_tasks"].to_i
- else
- stats[:failed_tasks] += row["number_of_tasks"].to_i
- end
- end
- end
-
- servers = {}
-
- stat_sql = <<-SQL
- select hostname, map_or_reduce, count(id) number_of_workers, sum(processed) as processed,
- max(started_at) as most_recent_task_time, iteration
- FROM skynet_worker_queues
- WHERE skynet_worker_queues.tasksubtype = 'worker'
- SQL
-
- stat_rows = SkynetWorkerQueue.connection.select_all("#{stat_sql} GROUP BY hostname, map_or_reduce").each do |row|
- servers[row["hostname"]] ||= {
- :processed => 0,
- :hostname => row["hostname"],
- :number_of_workers => 0,
- :active_workers => 0,
- :idle_workers => 0,
- }
-
- servers[row["hostname"]][:processed] += row["processed"].to_i
- servers[row["hostname"]][:number_of_workers] += row["number_of_workers"].to_i
- servers[row["hostname"]][:active_workers] += 0
- servers[row["hostname"]][:idle_workers] += row["number_of_workers"].to_i
- stats[:processed] += row["processed"].to_i
- stats[:number_of_workers] += row["number_of_workers"].to_i
- stats[:idle_workers] += row["number_of_workers"].to_i
- end
-
- SkynetWorkerQueue.connection.select_all(%{
- #{stat_sql} AND skynet_worker_queues.iteration IS NOT NULL
- GROUP BY hostname, map_or_reduce
- }).each do |row|
- map_or_reduce = nil
- if row["map_or_reduce"] == "master"
- map_or_reduce = :masters
+ def stats
+ stats = {
+ :servers => {},
+ :results => 0,
+ :taken_tasks => 0,
+ :untaken_tasks => 0,
+ :taken_master_tasks => 0,
+ :taken_task_tasks => 0,
+ :untaken_master_tasks => 0,
+ :untaken_task_tasks => 0,
+ :failed_tasks => 0,
+ :time => Time.now.to_f,
+ }
+
+ stat_rows = SkynetWorkerQueue.connection.select_all(%{
+ SELECT tasktype, payload_type, iteration, count(id) as number_of_tasks, expire_time
+ FROM #{message_queue_table}
+ WHERE expire_time <= #{Time.now.to_i}
+ GROUP BY tasktype, payload_type, iteration
+ })
+ stat_rows.each do |row|
+ if row["tasktype"] == "result" or row["payload_type"] == "result"
+ stats[:results] += row["number_of_tasks"].to_i
+ elsif row["tasktype"] == "task"
+ type_of_tasks = nil
+ if row["payload_type"] == "master"
+ type_of_tasks = :master_tasks
+ elsif row["payload_type"] == "task"
+ type_of_tasks = :task_tasks
+ end
+ if row["iteration"].to_i == 0
+ stats["untaken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i
+ stats[:untaken_tasks] += row["number_of_tasks"].to_i
+ elsif row["expire_time"].to_i < Time.now.to_i
+ stats[:failed_tasks] += row["number_of_tasks"].to_i
else
- map_or_reduce = :taskworkers
+ stats["taken_#{type_of_tasks}".to_sym] += row["number_of_tasks"].to_i
+ stats[:taken_tasks] += row["number_of_tasks"].to_i
end
- servers[row["hostname"]][:active_workers] += row["number_of_workers"].to_i
- servers[row["hostname"]][:idle_workers] -= row["number_of_workers"].to_i
- servers[row["hostname"]][map_or_reduce] ||= 0
- servers[row["hostname"]][map_or_reduce] += row["number_of_workers"].to_i
- stats[map_or_reduce] += row["number_of_workers"].to_i
- stats[:active_workers] += row["number_of_workers"].to_i
- stats[:idle_workers] -= row["number_of_workers"].to_i
- end
-
- stats[:servers] = servers
- stats[:hosts] = servers.keys.size
- stats[:time] = Time.now.to_f - stats[:time]
- stats
- end
-
- def processed(sleepy=5,tim=10)
- last_time = Time.now
- last_count = Skynet::MessageQueue.new.stats[:processed]
- tim.times do
- new_count = Skynet::MessageQueue.new.stats[:processed]
- new_time = Time.now
- puts "Processed #{new_count - last_count} in #{new_time - last_time}"
- last_time = new_time
- last_count = new_count
- sleep sleepy
end
end
-
- private
- def update(sql)
- rows = 0
- 3.times do
- begin
- SkynetMessageQueue.transaction do
- rows = SkynetMessageQueue.connection.update(sql)
- end
- return rows
- rescue ActiveRecord::StatementInvalid => e
- if e.message =~ /Deadlock/ or e.message =~ /Transaction/
- error "#{self.class} update had collision #{e.message}"
- sleep 0.2
- next
- else
- raise e
- end
+ stats[:time] = Time.now.to_f - stats[:time]
+ stats
+ end
+
+ private
+
+ def update(sql)
+ rows = 0
+ 3.times do
+ begin
+ SkynetMessageQueue.transaction do
+ rows = SkynetMessageQueue.connection.update(sql)
+ end
+ return rows
+ rescue ActiveRecord::StatementInvalid => e
+ if e.message =~ /Deadlock/ or e.message =~ /Transaction/
+ error "#{self.class} update had collision #{e.message}"
+ sleep 0.2
+ next
+ else
+ raise e
end
end
- return rows
end
+ return rows
+ end
+
+ Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP] ||= 40
- Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP] ||= 40
-
- def find_next_message(template, payload_type, temperature=nil)
- conditions = template_to_conditions(template)
- temperature ||= temperature(payload_type)
- temperature_sql = (temperature > 1 ? " AND id % #{temperature.ceil} = #{rand(temperature).to_i} " : '')
-
- ### Mqke sure we get the old ones. If we order by on ever select its VERY expensive.
- order_by = (payload_type != :master and rand(100) < 5) ? "ORDER BY payload_type desc, created_on desc" : ''
-
- sql = <<-SQL
- SELECT *
- FROM #{message_queue_table}
- WHERE #{conditions} #{temperature_sql}
- #{order_by}
- LIMIT 1
- SQL
-
- SkynetMessageQueue.find_by_sql(sql).first
- end
+ def find_next_message(template, payload_type, temperature=nil)
+ conditions = template_to_conditions(template)
+ temperature ||= temperature(payload_type)
+ temperature_sql = (temperature > 1 ? " AND id % #{temperature.ceil} = #{rand(temperature).to_i} " : '')
- # Skynet::CONFIG[:temperature_growth_rate] ||= 2
- # Skynet::CONFIG[:temperature_backoff_rate] ||= 0.75
+ ### Mqke sure we get the old ones. If we order by on ever select its VERY expensive.
+ order_by = (payload_type != :master and rand(100) < 5) ? "ORDER BY payload_type desc, created_on desc" : ''
- # TUNEABLE_SETTINGS = [:temp_pow, :temp_interval, :sleep_time]
- #
- # def write_score(new_values,new_result,score)
- # values ||= {}
- # set = new_values.keys.sort.collect{|k|[k,new_values[k]]}.join(",")
- # if not values[set]
- # values[set] ||= {}
- # values[set][:results] ||= []
- # values[set][:scores] ||= []
- # values[set][:settings] ||= {}
- # values[set][:total_score] = 0
- # TUNEABLE_SETTINGS.each do |setting|
- # values[set][:settings][setting] = []
- # end
- # end
- # TUNEABLE_SETTINGS.each do |setting, value|
- # values[set][:settings][setting] << value
- # end
- # values[set][:results] << new_result
- # values[set][:scores] << score + values[set][:total_score]
- # values[set][:total_score] += score
- # end
-
- @@temperature ||= {}
- @@temperature[:task] ||= 1
- @@temperature[:master] ||= 1
- @@temperature[:any] ||= 1
+ sql = <<-SQL
+ SELECT *
+ FROM #{message_queue_table}
+ WHERE #{conditions} #{temperature_sql}
+ #{order_by}
+ LIMIT 1
+ SQL
+
+ SkynetMessageQueue.find_by_sql(sql).first
+ end
- def temperature(payload_type)
- payload_type ||= :any
- payload_type = payload_type.to_sym
- @@temperature[payload_type.to_sym]
- end
+ # Skynet::CONFIG[:temperature_growth_rate] ||= 2
+ # Skynet::CONFIG[:temperature_backoff_rate] ||= 0.75
+
+ # TUNEABLE_SETTINGS = [:temp_pow, :temp_interval, :sleep_time]
+ #
+ # def write_score(new_values,new_result,score)
+ # values ||= {}
+ # set = new_values.keys.sort.collect{|k|[k,new_values[k]]}.join(",")
+ # if not values[set]
+ # values[set] ||= {}
+ # values[set][:results] ||= []
+ # values[set][:scores] ||= []
+ # values[set][:settings] ||= {}
+ # values[set][:total_score] = 0
+ # TUNEABLE_SETTINGS.each do |setting|
+ # values[set][:settings][setting] = []
+ # end
+ # end
+ # TUNEABLE_SETTINGS.each do |setting, value|
+ # values[set][:settings][setting] << value
+ # end
+ # values[set][:results] << new_result
+ # values[set][:scores] << score + values[set][:total_score]
+ # values[set][:total_score] += score
+ # end
- Skynet::CONFIG[:MYSQL_QUEUE_TEMP_POW] ||= 0.6
-
- def set_temperature(payload_type, conditions, queue_id=0)
- payload_type ||= :any
- payload_type = payload_type.to_sym
-
- temp_q_conditions = "queue_id = #{queue_id} AND type = '#{payload_type}' AND updated_on < '#{(Time.now - 5).strftime('%Y-%m-%d %H:%M:%S')}'"
+ @@temperature ||= {}
+ @@temperature[:task] ||= 1
+ @@temperature[:master] ||= 1
+ @@temperature[:any] ||= 1
+
+ def temperature(payload_type)
+ payload_type ||= :any
+ payload_type = payload_type.to_sym
+ @@temperature[payload_type.to_sym]
+ end
+
+ Skynet::CONFIG[:MYSQL_QUEUE_TEMP_POW] ||= 0.6
+
+ def set_temperature(payload_type, conditions, queue_id=0)
+ payload_type ||= :any
+ payload_type = payload_type.to_sym
+
+ temp_q_conditions = "queue_id = #{queue_id} AND type = '#{payload_type}' AND updated_on < '#{(Time.now - 5).strftime('%Y-%m-%d %H:%M:%S')}'"
# "POW(#{(rand(40) + 40) * 0.01})"
# its almost like the temperature table needs to store the POW and adjust that to be adaptive. Like some % of the time it
# uses the one in the table, and some % it tries a new one and scores it.
- begin
- temperature = SkynetMessageQueue.connection.select_value(%{select (
- CASE WHEN (@t:=FLOOR(
- POW(@c:=(SELECT count(*) FROM #{message_queue_table} WHERE #{conditions}
- ),#{Skynet::CONFIG[:MYSQL_QUEUE_TEMP_POW]}))) < 1 THEN 1 ELSE @t END) from skynet_queue_temperature WHERE #{temp_q_conditions}
- })
- if temperature
- rows = update("UPDATE skynet_queue_temperature SET temperature = #{temperature} WHERE #{temp_q_conditions}")
- @@temperature[payload_type.to_sym] = temperature.to_f
- else
- sleepy = rand Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP]
- sleep sleepy
- @@temperature[payload_type.to_sym] = get_temperature(payload_type, queue_id)
- end
- rescue ActiveRecord::StatementInvalid => e
- if e.message =~ /away/
- ActiveRecord::Base.connection.reconnect!
- SkynetMessageQueue.connection.reconnect!
- end
+ begin
+ temperature = SkynetMessageQueue.connection.select_value(%{select (
+ CASE WHEN (@t:=FLOOR(
+ POW(@c:=(SELECT count(*) FROM #{message_queue_table} WHERE #{conditions}
+ ),#{Skynet::CONFIG[:MYSQL_QUEUE_TEMP_POW]}))) < 1 THEN 1 ELSE @t END) from skynet_queue_temperature WHERE #{temp_q_conditions}
+ })
+ if temperature
+ rows = update("UPDATE skynet_queue_temperature SET temperature = #{temperature} WHERE #{temp_q_conditions}")
+ @@temperature[payload_type.to_sym] = temperature.to_f
+ else
+ sleepy = rand Skynet::CONFIG[:MYSQL_TEMPERATURE_CHANGE_SLEEP]
+ sleep sleepy
+ @@temperature[payload_type.to_sym] = get_temperature(payload_type, queue_id)
end
- # update("UPDATE skynet_queue_temperature SET type = '#{payload_type}', temperature = CASE WHEN @t:=FLOOR(SQRT(select count(*) from #{message_queue_table} WHERE #{conditions})) < 1 THEN 1 ELSE @t END")
- # tasks = SkynetMessageQueue.connection.select_value("select count(*) from #{message_queue_table} WHERE #{conditions}").to_i
- # sleep 4 if payload_type == :tasks and tasks < 100
- # @@temperature[payload_type.to_sym] = tasks ** 0.5
- # @@temperature[payload_type.to_sym] *= multiplier
- @@temperature[payload_type.to_sym] = 1 if @@temperature[payload_type.to_sym] < 1
- end
-
- def get_temperature(payload_type, queue_id=0)
- payload_type ||= :any
- payload_type = payload_type.to_sym
- value = SkynetMessageQueue.connection.select_value("select temperature from skynet_queue_temperature WHERE type = '#{payload_type}'").to_f
- if not value
- SkynetMessageQueue.connection.execute("insert into skynet_queue_temperature (queue_id,type,temperature) values (#{queue_id},'#{payload_type}',#{@@temperature[payload_type.to_sym]})")
+ rescue ActiveRecord::StatementInvalid => e
+ if e.message =~ /away/
+ ActiveRecord::Base.connection.reconnect!
+ SkynetMessageQueue.connection.reconnect!
end
- value
end
+ # update("UPDATE skynet_queue_temperature SET type = '#{payload_type}', temperature = CASE WHEN @t:=FLOOR(SQRT(select count(*) from #{message_queue_table} WHERE #{conditions})) < 1 THEN 1 ELSE @t END")
+ # tasks = SkynetMessageQueue.connection.select_value("select count(*) from #{message_queue_table} WHERE #{conditions}").to_i
+ # sleep 4 if payload_type == :tasks and tasks < 100
+ # @@temperature[payload_type.to_sym] = tasks ** 0.5
+ # @@temperature[payload_type.to_sym] *= multiplier
+ @@temperature[payload_type.to_sym] = 1 if @@temperature[payload_type.to_sym] < 1
+ end
+
+ def get_temperature(payload_type, queue_id=0)
+ payload_type ||= :any
+ payload_type = payload_type.to_sym
+ value = SkynetMessageQueue.connection.select_value("select temperature from skynet_queue_temperature WHERE type = '#{payload_type}'").to_f
+ if not value
+ SkynetMessageQueue.connection.execute("insert into skynet_queue_temperature (queue_id,type,temperature) values (#{queue_id},'#{payload_type}',#{@@temperature[payload_type.to_sym]})")
+ end
+ value
end
end
end
View
76 lib/skynet/message_queue_adapters/tuple_space.rb
@@ -10,17 +10,17 @@ def take(tuple, sec=nil, &block)
class Skynet
class Error < StandardError
end
-
+
class RequestExpiredError < Skynet::Error
end
-
+
class InvalidMessage < Skynet::Error
end
class MessageQueueAdapter
-
+
class TupleSpace < Skynet::MessageQueueAdapter
-
+
include SkynetDebugger
USE_FALLBACK_TASKS = true
@@ -36,6 +36,18 @@ def self.adapter
:tuplespace
end
+ def self.start_or_connect(options={})
+ begin
+ mq = new
+ rescue Skynet::ConnectionError
+ pid = fork do
+ exec("skynet_tuplespace_server start")
+ end
+ sleep 5
+ mq = new
+ end
+ end
+
def initialize(ts=nil)
if not ts
ts = self.class.get_tuple_space
@@ -72,36 +84,6 @@ def write_error(message,error='',timeout=nil)
take_fallback_message(message)
end
-
- def write_worker_status(task, timeout=nil)
- begin
- take_worker_status(task,0.00001)
- rescue Skynet::RequestExpiredError
- end
- write(Skynet::WorkerStatusMessage.new(task), timeout)
- end
-
- def take_worker_status(task, timeout=nil)
- Skynet::WorkerStatusMessage.new(take(Skynet::WorkerStatusMessage.worker_status_template(task), timeout))
- end
-
- def read_all_worker_statuses(hostname=nil)
- ws = Skynet::WorkerStatusMessage.all_workers_template(hostname)
- workers = read_all(ws).collect{ |w| Skynet::WorkerStatusMessage.new(w) }#.sort{ |a,b| a.process_id <=> b.process_id }
- end
-
- def clear_worker_status(hostname=nil)
- cnt = 0
- begin
- loop do
- take(Skynet::WorkerStatusMessage.new([:status, :worker, hostname, nil, nil]),0.01)
- cnt += 1
- end
- rescue Skynet::RequestExpiredError
- end
- cnt
- end
-
def list_tasks(iteration=nil,queue_id=0)
read_all(Skynet::Message.outstanding_tasks_template(iteration,queue_id))
end
@@ -165,20 +147,20 @@ def clear_outstanding_tasks
rescue DRb::DRbConnError, Errno::ECONNREFUSED => e
error "ERROR #{e.inspect}", caller
end
-
+
tasks.size.times do |ii|
take(Skynet::Message.outstanding_tasks_template,0.00001)
end
-
+
results = read_all(Skynet::Message.outstanding_results_template)
results.size.times do |ii|
take(Skynet::Message.outstanding_results_template,0.00001)
end
-
+
task_tuples = read_all(Skynet::Message.outstanding_tasks_template)
result_tuples = read_all(Skynet::Message.outstanding_results_template)
return task_tuples + result_tuples
- end
+ end
def stats
t1 = Time.now
@@ -188,7 +170,7 @@ def stats
p_tasks = tasks.partition {|task| task[9] == 0}
{:taken_tasks => p_tasks[1].size, :untaken_tasks => p_tasks[0].size, :results => list_results.size, :time => t2.to_f}
end
-
+
private
attr_accessor :ts
@@ -208,7 +190,7 @@ def read(template,timeout=nil)
def read_all(template)
ts_command(:read_all,template)
end
-
+
###### FALLBACK METHODS
def write_fallback_task(message)
return unless USE_FALLBACK_TASKS
@@ -243,14 +225,14 @@ def ts_command(command,message,timeout=nil)
else
raise InvalidMessage.new("You must provide a valid Skynet::Message object when calling #{command}. You passed #{message.inspect}.")
end
-
+
begin
if command==:read_all
return ts.send(command,tuple)
else
return ts.send(command,tuple,timeout)
end
-
+
rescue Rinda::RequestExpiredError
raise Skynet::RequestExpiredError.new
rescue DRb::DRbConnError => e
@@ -276,7 +258,7 @@ def ts_command(command,message,timeout=nil)
####################################
### XXX ACCEPT MULTIPLE TUPLE SPACES and a flag whether to use replication or failover.
-
+
def self.get_tuple_space
return @@ts if is_valid_tuplespace?(@@ts)
loop do
@@ -289,7 +271,7 @@ def self.get_tuple_space
@@ts = connect_to_tuple_space(host,port)
else
drburi = Skynet::CONFIG[:TS_DRBURIS].first
- drburi = "druby://#{drburi}" unless drburi =~ %r{druby://}
+ drburi = "druby://#{drburi}" unless drburi =~ %r{druby://}
@@ts = get_tuple_space_from_drburi(drburi)
log.info "#{self} CONNECTED TO #{drburi}"
end
@@ -303,12 +285,12 @@ def self.get_tuple_space
raise Skynet::ConnectionError.new("Can't find ring finger @ #{Skynet::CONFIG[:TS_SERVER_HOSTS][@@curhostidx]}. #{e.class} #{e.message}")
end
rescue Exception => e
- raise Skynet::ConnectionError.new("Error getting tuplespace @ #{Skynet::CONFIG[:TS_SERVER_HOSTS][@@curhostidx]}. #{e.class} #{e.message}")
+ raise Skynet::ConnectionError.new("Error getting tuplespace @ #{Skynet::CONFIG[:TS_SERVER_HOSTS][@@curhostidx]}. #{e.class} #{e.message}")
end
end
return @@ts
end
-
+
def self.connect_to_tuple_space(host,port)
log.info "#{self} trying to connect to #{host}:#{port}"
if Skynet::CONFIG[:TS_USE_RINGSERVER]
@@ -323,7 +305,7 @@ def self.connect_to_tuple_space(host,port)
log.info "#{self} CONNECTED TO #{host}:#{port}"
ts
end
-
+
def self.get_tuple_space_from_drburi(drburi)
DRbObject.new(nil, drburi)
end
View
6 lib/skynet/skynet_active_record_extensions.rb
@@ -102,6 +102,12 @@ def log
def each_range(opts={})
opts = opts.clone
opts[:id] || opts[:id] = 0
+ count = model_klass.count(:all,:conditions => opts[:conditions], :joins => opts[:joins])
+ Skynet::Logger.get.error "COUNT #{count}"
+ if count <= batch_size
+ return yield({"first" => 0, "last" => nil, "cnt" => 0}, 0)
+ end
+
rows = chunk_query(opts)
# log.error "ROWS, #{rows.pretty_print_inspect}"
View
61 lib/skynet/skynet_config.rb
@@ -1,16 +1,18 @@
class Skynet
- LOGDIR = "/var/log"
+ DEFAULT_LOG_FILE_LOCATION = ENV["HOME"]
CONFIG = {
:ENABLE => true,
:SOLO => false,
- :SKYNET_LOG_DIR => LOGDIR,
+ :SKYNET_LOG_DIR => DEFAULT_LOG_FILE_LOCATION,
:SKYNET_PID_DIR => "/tmp",
- :SKYNET_PIDS_FILE => "/tmp/skynet.pid",
+ :SKYNET_PID_FILE => "skynet.pid",
:SKYNET_LOG_FILE => "skynet.log",
+ :SKYNET_MANAGER_STATS_FILE => "skynet_manager_stats.txt",
:SKYNET_LOG_LEVEL => Logger::ERROR,
:SKYNET_LOCAL_MANAGER_URL => "druby://localhost:40000",
:MESSAGE_QUEUE_ADAPTER => ("Skynet::MessageQueueAdapter::TupleSpace" || "Skynet::MessageQueueAdapter::Mysql"),
+ :WORKER_QUEUE_ADAPTER => ("Skynet::WorkerQueueAdapter::TupleSpace" || "Skynet::WorkerQueueAdapter::Mysql"),
:TS_USE_RINGSERVER => true,
:TS_DRBURIS => ["druby://localhost:47647"], # If you do not use RINGSERVER, you must specifiy the DRBURI
:TS_SERVER_HOSTS => ["localhost:7647"],
@@ -22,11 +24,11 @@ class Skynet
:MYSQL_NEXT_TASK_TIMEOUT => 60,
:MYSQL_ADAPTER => "mysql",
:MYSQL_HOST => "localhost",
- :MYSQL_DATABASE => "skynet",
- :MYSQL_USERNAME => "root",
+ :MYSQL_DATABASE => nil, # 'skynet'
+ :MYSQL_USERNAME => nil,
:MYSQL_PASSWORD => "",
:NUMBER_OF_WORKERS => 4,
- :WORKER_CHECK_DELAY => 40,
+ :WORKER_CHECK_DELAY => 10,
:WORKER_MAX_MEMORY => 500,
:WORKER_MAX_PROCESSED => 1000,
:WORKER_VERSION_CHECK_DELAY => 30,
@@ -65,6 +67,10 @@ def self.configure(config={})
ret
end
end
+
+ def self.config
+ Skynet::Config.new
+ end
def self.solo(config = {})
raise Skynet::Error.new("You provide a code block to Skynet.solo") unless block_given?
@@ -100,10 +106,10 @@ def self.solo(config = {})
# Skynet.configure(
# :ENABLE => true,
# :SOLO => false,
- # :SKYNET_LOG_DIR => LOGDIR,
# :SKYNET_PID_DIR => "/tmp",
- # :SKYNET_PIDS_FILE => "/tmp/skynet.pid",
- # :SKYNET_LOG_FILE => STDOUT,
+ # :SKYNET_PID_FILE => "skynet.pid",
+ # :SKYNET_LOG_DIR => ENV["HOME"],
+ # :SKYNET_LOG_FILE => "skynet.log",
# :SKYNET_LOG_LEVEL => Logger::ERROR,
# :SKYNET_LOCAL_MANAGER_URL => "druby://localhost:40000",
# :MESSAGE_QUEUE_ADAPTER => "Skynet::MessageQueueAdapter::TupleSpace",
@@ -162,19 +168,19 @@ def each
Skynet::CONFIG.each {|k,v| yield k,v}
end
- def add_message_queue(queue_name)
+ def self.add_message_queue(queue_name)
self.message_queues << queue_name
end
- def queue_id_by_name(queue_name)
+ def self.queue_id_by_name(queue_name)
if Skynet::CONFIG[:MESSAGE_QUEUES].index(queue_name)
return Skynet::CONFIG[:MESSAGE_QUEUES].index(queue_name)
else
raise Skynet::Error("#{queue_name} is not a valid queue")
end
end
- def queue_name_by_id(queue_id)
+ def self.queue_name_by_id(queue_id)
queue_id = queue_id.to_i
if Skynet::CONFIG[:MESSAGE_QUEUES][queue_id]
return Skynet::CONFIG[:MESSAGE_QUEUES][queue_id]
@@ -183,9 +189,38 @@ def queue_name_by_id(queue_id)
end
end
+ def self.logfile_location
+ if skynet_log_file.is_a?(String)
+ skynet_log_dir.sub(/\/$/,'') + "/" + skynet_log_file.sub(/^\//,'')
+ else
+ skynet_log_file
+ end
+ end
+
+ def self.pidfile_location
+ if skynet_pid_dir.is_a?(String)
+ skynet_pid_dir.sub(/\/$/,'') + "/" + skynet_pid_file.sub(/^\//,'')
+ else
+ skynet_pid_dir
+ end
+ end
+ def manager_statfile_location
+ if skynet_log_dir.is_a?(String)
+ skynet_log_dir.sub(/\/$/,'') + "/" + skynet_manager_stats_file.sub(/^\//,'')
+ else
+ skynet_log_dir
+ end
+ end
+ def method_missing(name,*args)
+ if self.class.respond_to?(name)
+ self.class.send(name,*args)
+ else
+ self.class.method_missing(name,*args)
+ end
+ end
- def method_missing(name, *args)
+ def self.method_missing(name, *args)
name = name.to_s.upcase.to_sym
if name.to_s =~ /^(.*)=$/
name = $1.to_sym
View
8 lib/skynet/skynet_job.rb
@@ -408,6 +408,14 @@ def enqueue_messages(messages)
end
end
+ # Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.
+ def self.results_by_job_id(job_id,timeout=2)
+ result_message = mq.take_result(job_id,timeout)
+ result = result_message.payload
+ return nil unless result
+ return result
+ end
+
def gather_results(number_of_tasks, timeout=nil, description=nil)
debug "GATHER RESULTS job_id: #{job_id} - NOT AN ASYNC JOB"
results = {}
View
10 lib/skynet/skynet_launcher.rb
@@ -8,7 +8,15 @@ def self.start(options={})
elsif options[:worker_type] or ARGV.detect {|a| a =~ /worker_type/ }
Skynet::Worker.start(options)
else
- Skynet::Manager.start(options)
+ if Skynet::CONFIG[:SKYNET_LOG_DIR] == Skynet::DEFAULT_LOG_FILE_LOCATION
+ puts "Logging to the default log: #{File.expand_path(Skynet::CONFIG[:SKYNET_LOG_FILE])}. Set Skynet::CONFIG[:SKYNET_LOG_FILE] to change.\nYou will no longer see this warning once the Skynet::CONFIG[:SKYNET_LOG_FILE] is set."
+ end
+ if ARGV.include?('stop')
+ Skynet::Manager.stop(options)
+ else
+ options["daemonize"] = true if ARGV.include?('start')
+ Skynet::Manager.start(options)
+ end
end
end
View
2 lib/skynet/skynet_logger.rb
@@ -16,7 +16,7 @@ class Logger < ::Logger
def self.get
if not @@log
- @@log = self.new(Skynet::CONFIG[:SKYNET_LOG_FILE])
+ @@log = self.new(Skynet::Config.new.logfile_location)
@@log.level = Skynet::CONFIG[:SKYNET_LOG_LEVEL]
end
@@log