Skip to content
This repository has been archived by the owner on Sep 8, 2021. It is now read-only.

Commit

Permalink
Use beanstalkd to queue harvesting jobs.
Browse files Browse the repository at this point in the history
Currently hardcoded to use beanstalkd on localhost only.
  • Loading branch information
jezcope committed Mar 17, 2013
1 parent 6666f8d commit 739317c
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Expand Up @@ -24,6 +24,8 @@ gem 'libxml-ruby'
gem 'activesupport'
gem 'andand'

gem 'beaneater'

group :production do
gem 'pg'
end
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Expand Up @@ -21,6 +21,7 @@ GEM
sys-uname
autotest-growl (0.2.16)
backports (3.0.3)
beaneater (0.3.0)
builder (3.1.4)
chunky_png (1.2.7)
compass (0.12.2)
Expand Down Expand Up @@ -114,6 +115,7 @@ DEPENDENCIES
andand
autotest-fsevent
autotest-growl
beaneater
bootstrap-sass!
compass
database_cleaner
Expand Down
6 changes: 2 additions & 4 deletions models/repository.rb
@@ -1,6 +1,7 @@
require 'sequel'
require 'andand'
require 'oai'
require 'active_support/core_ext'

require_relative 'doi_mapping'

Expand Down Expand Up @@ -43,11 +44,8 @@ def list_records(options = {})
save = options.fetch(:save, false)
full = options.fetch(:full, false)

list_opts = {}
list_opts[:resumption_token] = options[:resumption_token] if options.has_key?(:resumption_token)

client = OAI::Client.new base_url, parser: 'libxml'
response = client.list_records(list_opts)
response = client.list_records(options.slice(:resumption_token))
resumption_token = response.resumption_token

mappings = []
Expand Down
224 changes: 201 additions & 23 deletions tasks/harvest.thor
@@ -1,5 +1,8 @@
require 'beaneater'
require 'json'
require 'andand'
require 'pp'
require 'logger'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..'))
require 'db/init'
Expand All @@ -8,6 +11,57 @@ require 'models/doi_mapping'

module Harvest

module Helpers

def harvest_dois(repository, logger, job = nil)
logger.info "Harvesting from #{repository.base_url}"
say_status "harvest", "#{repository.name} (#{repository.base_url})"
count = 0
list_opts = {save: false, full: false}

if options[:limit]
list_opts[:limit] = options[:limit].to_i
end

response = repository.list_records(list_opts)
until response.resumption_token.nil?
response.mappings.each do |mapping|
begin
mapping.save
say_status "doi", "'#{mapping.doi}' => '#{mapping.url}'", :yellow
rescue Sequel::ValidationFailed => e
logger.warn "Invalid DOI: <#{mapping.doi}> => <#{mapping.url}>"
say_status "invalid", "'#{mapping.doi}' => '#{mapping.url}'", :red
end
end

logger.debug "Got #{response.mappings.length} DOIs for #{repository.base_url}"
count += response.mappings.length

unless job.nil?
if job.stats.time_left == 0
# We've already timed out and we don't want to run again for now
job.bury
logger.info "Job #{job.id} timed out: #{repository.base_url}"
return
end
#logger.debug "Job #{job.id} has #{job.stats.time_left} seconds left"
job.touch
end

list_opts[:resumption_token] = response.resumption_token
response = repository.list_records(list_opts)
end

logger.info "Got #{count} DOIs total for #{repository.base_url}"
say_status "found", "#{count} DOIs", :green
end

end

class NonExistentRepository < ArgumentError
end

class Repositories < Thor

desc 'list', 'list registered repositories'
Expand All @@ -18,51 +72,67 @@ module Harvest
end

desc 'add BASE_URL', 'register a new repository'
option :identify, default: true
def add(base_url)
if Repository.find(base_url: base_url).nil?
say_status "add", base_url
r = Repository.create(base_url: base_url)

say_status "identify", base_url
r.identify!
r.save
if options[:identify]
say_status "identify", base_url
begin
r.identify!
rescue
say_status "error", "identifying #{base_url}", :red
end
end

begin
r.save
rescue
say_status "error", "saving #{base_url}", :red
end
end
end

desc 'rm BASE_URL', 'remove a repository'
def rm(base_url)
repository = Repository.find(base_url: base_url)
repository.doi_mappings.each {|m| m.destroy}
repository.destroy
if repository.nil?
say_status "not found", "base_url: #{base_url}", :red
else
repository.doi_mappings.each {|m| m.destroy}
repository.destroy
say_status "deleted", "base_url: #{base_url}"
end
end

end

class Dois < Thor
include Harvest::Helpers

desc 'harvest', 'harvest DOIs'
desc 'harvest_all', 'harvest DOIs'
option :limit, type: :numeric
def harvest
def harvest_all
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:dois:harvest_all'
Repository.all.each do |r|
count = 0
list_opts = {}
if options[:limit]
list_opts[:limit] = options[:limit].to_i
end
mappings = r.list_records(options)
mappings.each do |mapping|
begin
mapping.save
say_status "doi", "'#{mapping.doi}' => '#{mapping.url}'", :yellow
rescue Exception => e
pp e
say_status "invalid", "'#{mapping.doi}' => '#{mapping.url}'", :red
end
end
say_status "found", "#{mappings.length} DOIs", :green
harvest_dois(r, logger)
end
end

desc 'harvest ID', 'harvest DOIs'
option :limit, type: :numeric
def harvest(id)
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:dois:harvest'
r = Repository.find(id: id)
harvest_dois(r, logger)
end

desc 'count', 'print number of DOI records in database'
def count
say_status "count", "#{DoiMapping.count} DOI records"
Expand All @@ -77,4 +147,112 @@ module Harvest

end

class Queue < Thor
include Harvest::Helpers

desc 'process [TUBE_NAME]', 'process jobs from beanstalkd'
def process(tube_name = 'doi2oa')
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:queue:process'
pool = Beaneater::Pool.new(%w{localhost:11300})

pool.jobs.register(tube_name) do |job|
begin
logger.debug "Claimed job #{job.id}: #{job.body}"
job_info = JSON::load(job.body)
r = Repository.find(id: job_info['repository_id'].to_i)

if r.nil?
logger.warn "Tried to harvest from non-existent repository #{job_info['repository_id']}"
raise NonExistentRepository.new
end

logger.info "Begin harvesting from #{r.base_url}"

harvest_dois(r, logger, job)

logger.info "Finished harvesting from #{r.base_url}"
rescue Exception => e
logger.error "Error: #{e.inspect} on job #{job.inspect}"
raise e
end
end

logger.info "Ready to process jobs"

begin
pool.jobs.process!
rescue Interrupt
puts "Received ^C: exiting..."
end
end

desc 'add_all [TUBE_NAME]', 'enqueue all repositories for harvesting'
def add_all(tube_name = 'doi2oa')
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:queue:add_all'
pool = Beaneater::Pool.new(%w{localhost:11300})
tube = pool.tubes[tube_name]

Repository.all.each do |r|
job = {command: :harvest, repository_id: r.id}
tube.put job.to_json
logger.info "Enqueued #{r.base_url} in tube '#{tube_name}' for harvesting"
logger.debug "Job info: #{job.to_json}"
end
end

desc 'add ID [TUBE_NAME]', 'enqueue a repository for harvesting'
def add(id, tube_name = 'doi2oa')
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:queue:add'
pool = Beaneater::Pool.new(%w{localhost:11300})
tube = pool.tubes[tube_name]

r = Repository.find(id: id)
job = {command: :harvest, repository_id: id}
tube.put job.to_json
logger.info "Enqueued #{r.base_url} in tube '#{tube_name}' for harvesting"
logger.debug "Job info: #{job.to_json}"
end

desc 'delete_buried [TUBE_NAME]', 'delete buried jobs'
def delete_buried(tube_name = 'doi2oa')
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:queue:delete_buried'
pool = Beaneater::Pool.new(%w{localhost:11300})
tube = pool.tubes[tube_name]

job = tube.peek(:buried)
while job
logger.info "Deleting buried job #{job.id} #{job.body}"
say_status "delete", "Buried job #{job.id} #{job.body}"
job.delete
job = tube.peek(:buried)
end
end

desc 'delete_ready [TUBE_NAME]', 'delete waiting jobs'
def delete_ready(tube_name = 'doi2oa')
logger = Logger.new('harvest.log')
logger.formatter = Logger::Formatter.new
logger.progname = 'harvest:queue:delete_ready'
pool = Beaneater::Pool.new(%w{localhost:11300})
tube = pool.tubes[tube_name]

job = tube.peek(:ready)
while job
logger.info "Deleting waiting job #{job.id} #{job.body}"
say_status "delete", "Waiting job #{job.id} #{job.body}"
job.delete
job = tube.peek(:ready)
end
end

end

end

0 comments on commit 739317c

Please sign in to comment.