Permalink
Browse files

Merge remote-tracking branch 'upstream/master'

  • Loading branch information...
2 parents a052416 + e26ced2 commit 4d0744ba2f392e354c22f06db2edf9793c797295 @lukeledet committed Oct 7, 2011
View
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|
s.name = 'cloud-crowd'
- s.version = '0.5.2' # Keep version in sync with cloud-cloud.rb
- s.date = '2010-08-03'
+ s.version = '0.6.2' # Keep version in sync with cloud-cloud.rb
+ s.date = '2011-04-14'
s.homepage = "http://wiki.github.com/documentcloud/cloud-crowd"
s.summary = "Parallel Processing for the Rest of Us"
@@ -20,7 +20,6 @@ Gem::Specification.new do |s|
s.require_paths = ['lib']
s.executables = ['crowd']
- s.has_rdoc = true
s.extra_rdoc_files = ['README']
s.rdoc_options << '--title' << 'CloudCrowd | Parallel Processing for the Rest of Us' <<
'--exclude' << 'test' <<
View
@@ -45,7 +45,7 @@ module CloudCrowd
autoload :WorkUnit, 'cloud_crowd/models'
# Keep this version in sync with the gemspec.
- VERSION = '0.5.2'
+ VERSION = '0.6.2'
# Increment the schema version when there's a backwards incompatible change.
SCHEMA_VERSION = 4
View
@@ -29,7 +29,7 @@ class Action
def initialize(status, input, options, store)
@input, @options, @store = input, options, store
@job_id, @work_unit_id = options['job_id'], options['work_unit_id']
- @work_directory = File.expand_path(File.join(@store.temp_storage_path, storage_prefix))
+ @work_directory = File.expand_path(File.join(@store.temp_storage_path, local_storage_prefix))
FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
parse_input
download_input
@@ -59,7 +59,7 @@ def download(url, path)
# Takes a local filesystem path, saves the file to S3, and returns the
# public (or authenticated) url on S3 where the file can be accessed.
def save(file_path)
- save_path = File.join(storage_prefix, File.basename(file_path))
+ save_path = File.join(remote_storage_prefix, File.basename(file_path))
@store.save(file_path, save_path)
end
@@ -89,14 +89,18 @@ def safe_filename(url)
File.basename(name, ext).gsub('.', '-') + ext
end
- # The directory prefix to use for both local and S3 storage.
- # [action]/job_[job_id]/unit_[work_unit_it]
- def storage_prefix
- path_parts = []
- path_parts << Inflector.underscore(self.class)
- path_parts << "job_#{@job_id}"
- path_parts << "unit_#{@work_unit_id}" if @work_unit_id
- @storage_prefix ||= File.join(path_parts)
+ # The directory prefix to use for remote storage.
+ # [action]/job_[job_id]
+ def remote_storage_prefix
+ @remote_storage_prefix ||= Inflector.underscore(self.class) +
+ "/job_#{@job_id}" + (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
+ end
+
+ # The directory prefix to use for local storage.
+ # [action]/unit_[work_unit_id]
+ def local_storage_prefix
+ @local_storage_prefix ||= Inflector.underscore(self.class) +
+ (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end
# If we think that the input is JSON, replace it with the parsed form.
@@ -39,27 +39,35 @@ class WorkUnit < ActiveRecord::Base
# action in question disabled.
def self.distribute_to_nodes
reservation = nil
- filter = {}
loop do
+
+ # Find the available nodes, and determine what actions we're capable
+ # of running at the moment.
+ available_nodes = NodeRecord.available
+ available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
+ filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
+
+ # Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)
- available_nodes = NodeRecord.available
- while node = available_nodes.shift and unit = work_units.shift do
- if node.actions.include?(unit.action)
- if node.send_work_unit(unit)
- available_nodes.push(node) unless node.busy?
- next
+
+ # Round robin through the nodes and units, sending the unit if the node
+ # is able to process it.
+ work_units.each do |unit|
+ available_nodes.each do |node|
+ if node.actions.include? unit.action
+ if node.send_work_unit unit
+ work_units.delete unit
+ available_nodes.delete node if node.busy?
+ break
+ end
end
- else
- unit.cancel_reservation
end
- work_units.push(unit)
- end
- if work_units.any? && available_nodes.any?
- filter = {:action => available_nodes.map {|node| node.actions }.flatten.uniq }
- next
end
+
+ # If we still have units at this point, or we're fresh out of nodes,
+ # that means we're done.
return if work_units.any? || available_nodes.empty?
end
ensure
@@ -70,7 +78,8 @@ def self.distribute_to_nodes
# were none available.
def self.reserve_available(options={})
reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
- any = WorkUnit.available.update_all("reservation = #{reservation}", options[:conditions], options) > 0
+ conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}"
+ any = WorkUnit.update_all("reservation = #{reservation}", conditions, options) > 0
any && reservation
end
@@ -41,8 +41,6 @@
t.datetime "updated_at"
end
- # Here be indices. After looking, it seems faster not to have them at all.
- #
add_index "jobs", ["status"], :name => "index_jobs_on_status"
add_index "work_units", ["job_id"], :name => "index_work_units_on_job_id"
add_index "work_units", ["worker_pid"], :name => "index_work_units_on_worker_pid"
View
@@ -50,7 +50,7 @@ class ActionTest < Test::Unit::TestCase
end
should "be able to count the number of words in this file" do
- assert @action.process == 274
+ assert @action.process == 266
end
should "raise an exception when backticks fail" do
@@ -60,8 +60,8 @@ def @action.process; `utter failure 2>&1`; end
should "be able to download a remote file" do
path = "temp.txt"
- @action.download('http://example.com', path)
- assert File.read(path).match(/These domain names are reserved for use in documentation/)
+ @action.download('http://www.w3.org', path)
+ assert File.read(path).match(/standards/i)
FileUtils.rm path
end
View
@@ -1,3 +1,16 @@
+h3. Version 0.6.2
+
+* Another stab at fixing the bug for work units being twice delivered.
+
+h3. Version 0.6.1
+
+* Fixed a bug in 0.6.0 that could occasionally cause a race condition when delivering work units, sending a single work unit to be processed twice.
+
+h3. Version 0.6.0
+
+* Fixed a bug in "distribute_to_nodes" logic that would prevent mismatched actions and workers from ever getting served.
+* Better cleanup of temp directories for worker machines.
+
h3. Version 0.5.2
* Tweaks to timeout settings, name of the worker process, exceptions to be rescued and retried...

0 comments on commit 4d0744b

Please sign in to comment.