diff --git a/Gemfile b/Gemfile
index f423849d..5724234a 100644
--- a/Gemfile
+++ b/Gemfile
@@ -16,6 +16,7 @@ gem 'jbuilder', '~> 2.5'
gem 'okcomputer'
gem 'pg'
gem 'puma', '~> 3.11'
+gem 'resque', '~> 2.0'
gem 'stomp', '~> 1.4'
group :development, :test do
diff --git a/Gemfile.lock b/Gemfile.lock
index 14e9f30c..dcc8d6a8 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -179,9 +179,11 @@ GEM
json
nokogiri
nokogiri-happymapper
+ mono_logger (1.1.0)
msgpack (1.2.10)
multi_json (1.13.1)
multipart-post (2.0.0)
+ mustermann (1.0.3)
net-http-persistent (3.0.1)
connection_pool (~> 2.2)
net-scp (2.0.0)
@@ -199,6 +201,8 @@ GEM
pg (1.1.4)
puma (3.12.1)
rack (2.0.7)
+ rack-protection (2.0.5)
+ rack
rack-test (1.1.0)
rack (>= 1.0, < 3)
rails (5.2.3)
@@ -234,6 +238,15 @@ GEM
rb-fsevent (0.10.3)
rb-inotify (0.10.0)
ffi (~> 1.0)
+ redis (4.1.0)
+ redis-namespace (1.6.0)
+ redis (>= 3.0.4)
+ resque (2.0.0)
+ mono_logger (~> 1.0)
+ multi_json (~> 1.0)
+ redis-namespace (~> 1.6)
+ sinatra (>= 0.9.2)
+ vegas (~> 0.1.2)
retries (0.0.5)
rspec-core (3.8.0)
rspec-support (~> 3.8.0)
@@ -261,6 +274,11 @@ GEM
unicode-display_width (>= 1.4.0, < 1.6)
ruby-progressbar (1.10.0)
ruby_dep (1.5.0)
+ sinatra (2.0.5)
+ mustermann (~> 1.0)
+ rack (~> 2.0)
+ rack-protection (= 2.0.5)
+ tilt (~> 2.0)
spring (2.0.2)
activesupport (>= 4.2)
spring-watcher-listen (2.0.1)
@@ -279,9 +297,12 @@ GEM
stomp (1.4.8)
thor (0.20.3)
thread_safe (0.3.6)
+ tilt (2.0.9)
tzinfo (1.2.5)
thread_safe (~> 0.1)
unicode-display_width (1.5.0)
+ vegas (0.1.11)
+ rack (>= 1.0.0)
websocket-driver (0.7.0)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.3)
@@ -309,6 +330,7 @@ DEPENDENCIES
puma (~> 3.11)
rails (~> 5.2.0)
rails-controller-testing
+ resque (~> 2.0)
rspec-rails
rubocop
spring
diff --git a/app/controllers/steps_controller.rb b/app/controllers/steps_controller.rb
index e40dcd7b..7fd0c44f 100644
--- a/app/controllers/steps_controller.rb
+++ b/app/controllers/steps_controller.rb
@@ -4,6 +4,7 @@
# API for handling requests about a specific step within an object's workflow.
class StepsController < ApplicationController
# Update a single WorkflowStep
+ # If there are next steps, they are enqueued.
# rubocop:disable Metrics/AbcSize
def update
parser = ProcessParser.new(process_from_request_body)
@@ -14,8 +15,13 @@ def update
return render plain: status_mismatch_error(step), status: :conflict if params['current-status'] && step.status != params['current-status']
step.update(parser.to_h)
+
+ # Enqueue next steps
+ next_steps = NextStepService.for(step: step)
+ next_steps.each { |next_step| QueueService.enqueue(next_step) }
+
SendUpdateMessage.publish(druid: step.druid)
- render json: { next_steps: NextStepService.for(step: step) }
+ render json: { next_steps: next_steps }
end
# rubocop:enable Metrics/AbcSize
@@ -48,6 +54,7 @@ def process_from_request_body
end
def current_version
- ObjectVersionService.current_version(params[:druid])
+ # Providing the version as a param is for local testing without needing to run DOR services.
+ params[:version] || ObjectVersionService.current_version(params[:druid])
end
end
diff --git a/app/controllers/versions_controller.rb b/app/controllers/versions_controller.rb
index 5965e683..e0fa19c8 100644
--- a/app/controllers/versions_controller.rb
+++ b/app/controllers/versions_controller.rb
@@ -42,7 +42,8 @@ def initialize_workflow
end
def current_version
- @current_version ||= ObjectVersionService.current_version(params[:druid])
+ # Providing the version as a param is for local testing without needing to run DOR services.
+ @current_version ||= params[:version] || ObjectVersionService.current_version(params[:druid])
end
def initial_workflow
diff --git a/app/controllers/workflows_controller.rb b/app/controllers/workflows_controller.rb
index 02d165cc..36b5edbd 100644
--- a/app/controllers/workflows_controller.rb
+++ b/app/controllers/workflows_controller.rb
@@ -92,7 +92,8 @@ def create
private
def current_version
- ObjectVersionService.current_version(params[:druid])
+ # Providing the version as a param is for local testing without needing to run DOR services.
+ params[:version] || ObjectVersionService.current_version(params[:druid])
end
def initial_parser
diff --git a/app/services/queue_service.rb b/app/services/queue_service.rb
new file mode 100644
index 00000000..6abb26b7
--- /dev/null
+++ b/app/services/queue_service.rb
@@ -0,0 +1,58 @@
+# frozen_string_literal: true
+
+# Service for add workflow steps to Resqueue queues
+class QueueService
+ # Enqueue the provided step
+ # @param [WorkflowStep] workflow step to enqueue
+ def self.enqueue(step)
+ QueueService.new(step).enqueue
+ end
+
+ attr_reader :step
+
+ # @param [WorkflowStep] workflow step to enqueue
+ def initialize(step)
+ @step = step
+ end
+
+ # Enqueue the provided step
+ def enqueue
+ # Perform the enqueue to Resque
+ Resque.enqueue_to(queue_name.to_sym, class_name, step.druid)
+ Rails.logger.info "Enqueued #{class_name} for #{step.druid} to #{queue_name}"
+
+ # Update status
+ step.status = 'queued'
+ step.save
+ end
+
+ private
+
+ # @example
+ # => dor:assemblyWF:jp2-create
+ def step_name
+ @step_name ||= "#{step.repository}:#{step.workflow}:#{step.process}"
+ end
+
+ # Generate the queue name from step
+ #
+ # @example
+ # => 'dor_assemblyWF_jp2-create_default'
+ # => 'dor_assemblyWF_jp2-create_mylane'
+ def queue_name
+ @queue_name ||= "#{step.repository}_#{step.workflow}_#{step.process}_#{step.lane_id}"
+ end
+
+ # Converts a given step to the Robot class name
+ # Based on https://github.com/sul-dlss/lyber-core/blob/master/lib/lyber_core/robot.rb#L33
+ # @example
+ # => 'Robots::DorRepo::Assembly::Jp2Create'
+ def class_name
+ @class_name ||= begin
+ repo = step.repository.camelize
+ workflow = step.workflow.sub('WF', '').camelize
+ process = step.process.tr('-', '_').camelize
+ "Robots::#{repo}Repo::#{workflow}::#{process}"
+ end
+ end
+end
diff --git a/app/services/workflow_creator.rb b/app/services/workflow_creator.rb
index 5abad599..e85ceb0f 100644
--- a/app/services/workflow_creator.rb
+++ b/app/services/workflow_creator.rb
@@ -29,12 +29,23 @@ def create_workflow_steps
WorkflowStep.create!(workflow_attributes(process))
end
end
+ enqueue
end
private
attr_reader :processes, :workflow_id
+ def enqueue
+ # Get the first step and enqueue any next steps
+ first_step = WorkflowStep.find_by(workflow: workflow_id, druid: version.druid, active_version: true,
+ process: processes.first.process)
+
+ # Enqueue next steps
+ next_steps = NextStepService.for(step: first_step)
+ next_steps.each { |next_step| QueueService.enqueue(next_step) }
+ end
+
def workflow_attributes(process)
{
workflow: workflow_id,
diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb
new file mode 100644
index 00000000..93fd9d5f
--- /dev/null
+++ b/config/initializers/redis.rb
@@ -0,0 +1,9 @@
+# frozen_string_literal: true
+
+# Load Resque configuration and controller
+redis = Redis.new(host: Settings.redis.hostname,
+ port: Settings.redis.port,
+ thread_safe: true,
+ db: Settings.redis.db)
+
+Resque.redis = Redis::Namespace.new(Settings.redis.namespace, redis: redis)
diff --git a/config/settings.yml b/config/settings.yml
index 8487a973..a5c28a55 100644
--- a/config/settings.yml
+++ b/config/settings.yml
@@ -9,3 +9,10 @@ messaging:
uri: 'failover:(stomp+ssl://localhost:61612,stomp://remotehost:61613)'
# fedora_url is sent inside the message as the `entry.author.uri` field
fedora_url: 'https://dor-test.stanford.edu'
+
+
+redis:
+ hostname: localhost
+ port: 6379
+ db: 0
+ namespace: 'resque:development'
diff --git a/docker-compose.yml b/docker-compose.yml
index f3d26a35..efb523ad 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -4,7 +4,7 @@ services:
app:
build:
context: ./
- dockerfile: docker/Dockerfile.dev
+ dockerfile: Dockerfile
environment:
- RAILS_LOG_TO_STDOUT=true
- DATABASE_NAME=workflow-server
@@ -13,7 +13,8 @@ services:
- DATABASE_HOSTNAME=db
- DATABASE_PORT=5432
- SECRET_KEY_BASE="${SECRET_KEY_BASE}"
- image: 'suldlss/workflow-server:latest-dev'
+ - SETTINGS__ENABLE_STOMP=false
+ - SETTINGS__REDIS__HOSTNAME=redis
volumes:
- .:/app
ports:
@@ -32,5 +33,11 @@ services:
- ./postgres-data:/var/lib/postgresql/data
networks:
- wfs
+ redis:
+ image: redis
+ ports:
+ - "6379:6379"
+ networks:
+ - wfs
networks:
wfs:
diff --git a/lib/tasks/workflow.rake b/lib/tasks/workflow.rake
new file mode 100644
index 00000000..1fc5d3a9
--- /dev/null
+++ b/lib/tasks/workflow.rake
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+namespace :workflow do
+ desc 'Update a workflow step'
+ task :step, %i[repo druid workflow process version status] => :environment do |_task, args|
+ step = WorkflowStep.find_by(
+ repository: args[:repo],
+ druid: args[:druid],
+ workflow: args[:workflow],
+ process: args[:process],
+ version: args[:version]
+ )
+
+ raise 'Workflow step does not already exist' if step.nil?
+
+ step.status = args[:status]
+ step.save
+ puts("Setting #{args[:process]} to #{args[:status]}")
+
+ # Enqueue next steps
+ next_steps = NextStepService.for(step: step)
+ next_steps.each { |next_step| QueueService.enqueue(next_step) }
+
+ SendUpdateMessage.publish(druid: step.druid)
+ end
+end
diff --git a/spec/controllers/steps_controller_spec.rb b/spec/controllers/steps_controller_spec.rb
new file mode 100644
index 00000000..3e4b4fbe
--- /dev/null
+++ b/spec/controllers/steps_controller_spec.rb
@@ -0,0 +1,63 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe StepsController do
+ let(:repository) { 'dor' }
+ let(:client) { instance_double(Dor::Services::Client::Object, current_version: '1') }
+ let(:druid) { first_step.druid }
+ let(:workflow_id) { 'accessionWF' }
+ let(:first_step) { FactoryBot.create(:workflow_step, status: 'completed') } # start-accession, which is already completed
+
+ before do
+ FactoryBot.create(:workflow_step, druid: druid, process: 'descriptive-metadata')
+ FactoryBot.create(:workflow_step, druid: druid, process: 'rights-metadata')
+ allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client)
+ allow(SendUpdateMessage).to receive(:publish)
+ allow(QueueService).to receive(:enqueue)
+ end
+
+ describe 'PUT update' do
+ context 'when updating a step' do
+ let(:body) { '' }
+
+ it 'updates the step' do
+ put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
+ process: 'descriptive-metadata', format: :xml }
+ expect(response.body).to eq('{"next_steps":[]}')
+ expect(SendUpdateMessage).to have_received(:publish).with(druid: druid)
+ expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('test')
+ expect(QueueService).to_not have_received(:enqueue)
+ end
+
+ it 'verifies the current status' do
+ put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
+ process: 'descriptive-metadata', 'current-status': 'not-waiting',
+ format: :xml }
+ expect(response.body).to eq('Status in params (not-waiting) does not match current status (waiting)')
+ expect(response.code).to eq('409')
+ end
+
+ it 'verifies that process in url and body match' do
+ put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
+ process: 'rights-metadata', format: :xml }
+ expect(response.body).to eq('Process name in body (descriptive-metadata) does not match process name in URI ' \
+ '(rights-metadata)')
+ expect(response.code).to eq('400')
+ end
+ end
+
+ context 'when completing a step' do
+ let(:body) { '' }
+ it 'updates the step and enqueues next step' do
+ put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
+ process: 'descriptive-metadata', format: :xml }
+ expect(response.body).to match(/rights-metadata/)
+ expect(SendUpdateMessage).to have_received(:publish).with(druid: druid)
+ expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('completed')
+ expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid,
+ process: 'rights-metadata'))
+ end
+ end
+ end
+end
diff --git a/spec/controllers/workflows_controller_spec.rb b/spec/controllers/workflows_controller_spec.rb
index a6677166..a5a0d420 100644
--- a/spec/controllers/workflows_controller_spec.rb
+++ b/spec/controllers/workflows_controller_spec.rb
@@ -11,6 +11,7 @@
before do
allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client)
+ allow(QueueService).to receive(:enqueue)
end
describe 'GET archive' do
diff --git a/spec/factories/workflow_steps.rb b/spec/factories/workflow_steps.rb
index e7ee2fc0..b9f70da4 100644
--- a/spec/factories/workflow_steps.rb
+++ b/spec/factories/workflow_steps.rb
@@ -10,5 +10,6 @@
repository { 'dor' }
version { 1 }
lane_id { 'default' }
+ status { 'waiting' }
end
end
diff --git a/spec/models/workflow_step_spec.rb b/spec/models/workflow_step_spec.rb
index 140ada12..676bb7c5 100644
--- a/spec/models/workflow_step_spec.rb
+++ b/spec/models/workflow_step_spec.rb
@@ -49,7 +49,7 @@
elapsed: nil,
attempts: 0,
datetime: String,
- status: nil,
+ status: 'waiting',
name: 'start-accession'
)
}
diff --git a/spec/requests/version_close_spec.rb b/spec/requests/version_close_spec.rb
index 3c519cab..76f7d075 100644
--- a/spec/requests/version_close_spec.rb
+++ b/spec/requests/version_close_spec.rb
@@ -9,6 +9,7 @@
obj_client = instance_double(Dor::Services::Client::Object, current_version: '2')
allow(Dor::Services::Client).to receive(:object).with(druid).and_return(obj_client)
+ allow(QueueService).to receive(:enqueue)
end
it 'closes the version' do
diff --git a/spec/requests/workflows/update_step_spec.rb b/spec/requests/workflows/update_step_spec.rb
index c85b4e33..86c91801 100644
--- a/spec/requests/workflows/update_step_spec.rb
+++ b/spec/requests/workflows/update_step_spec.rb
@@ -93,7 +93,7 @@
it 'does not update the step' do
expect_any_instance_of(WorkflowStep).not_to receive(:update)
- put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=waiting", params: process_xml
+ put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=not-waiting", params: process_xml
# NOTE: `#be_conflict` does not exist as a matcher for 409 errors
expect(response.status).to eq 409
diff --git a/spec/services/queue_service_spec.rb b/spec/services/queue_service_spec.rb
new file mode 100644
index 00000000..e5a1552e
--- /dev/null
+++ b/spec/services/queue_service_spec.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe QueueService do
+ let(:service) { described_class.new step }
+
+ let(:step) { FactoryBot.create(:workflow_step, workflow: 'assemblyWF', process: 'jp2-create') }
+
+ describe '#enqueue' do
+ before do
+ allow(Resque).to receive(:enqueue_to)
+ end
+
+ it 'enqueues to Resque and updates status' do
+ service.enqueue
+ expect(Resque).to have_received(:enqueue_to).with(:"dor_assemblyWF_jp2-create_default",
+ 'Robots::DorRepo::Assembly::Jp2Create', step.druid)
+ expect(step.status).to eq('queued')
+ end
+ end
+
+ describe '#step_name' do
+ let(:step_name) { service.send(:step_name) }
+
+ it 'create correct step_name' do
+ expect(step_name).to eq('dor:assemblyWF:jp2-create')
+ end
+ end
+
+ describe '#queue_name' do
+ let(:queue_name) { service.send(:queue_name) }
+
+ it 'create correct queue_name' do
+ expect(queue_name).to eq('dor_assemblyWF_jp2-create_default')
+ end
+ end
+
+ describe '#class_name' do
+ let(:class_name) { service.send(:class_name) }
+
+ it 'create correct class_name' do
+ expect(class_name).to eq('Robots::DorRepo::Assembly::Jp2Create')
+ end
+ end
+end
diff --git a/spec/services/workflow_creator_spec.rb b/spec/services/workflow_creator_spec.rb
index 2e3edf87..5fa0ce7a 100644
--- a/spec/services/workflow_creator_spec.rb
+++ b/spec/services/workflow_creator_spec.rb
@@ -23,6 +23,10 @@
)
end
+ before do
+ allow(QueueService).to receive(:enqueue)
+ end
+
describe '#create_workflow_steps' do
subject(:create_workflow_steps) { wf_creator.create_workflow_steps }
@@ -32,6 +36,7 @@
end.to change(WorkflowStep, :count).by(13)
expect(WorkflowStep.last.druid).to eq druid
expect(WorkflowStep.last.repository).to eq repository
+ expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata'))
end
context 'when workflow steps already exists' do
@@ -43,6 +48,7 @@
expect do
create_workflow_steps
end.not_to change(WorkflowStep, :count)
+ expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata'))
end
end
end
diff --git a/spec/views/workflows/index.xml.builder_spec.rb b/spec/views/workflows/index.xml.builder_spec.rb
index 49b2300c..a5b347ef 100644
--- a/spec/views/workflows/index.xml.builder_spec.rb
+++ b/spec/views/workflows/index.xml.builder_spec.rb
@@ -47,7 +47,7 @@
['elapsed', ''],
['attempts', /0/],
['datetime', //],
- ['status', ''],
+ %w[status waiting],
%w[name start-accession]
)
end
diff --git a/spec/views/workflows/show.xml.builder_spec.rb b/spec/views/workflows/show.xml.builder_spec.rb
index f0b67fb9..c570c0ba 100644
--- a/spec/views/workflows/show.xml.builder_spec.rb
+++ b/spec/views/workflows/show.xml.builder_spec.rb
@@ -37,7 +37,7 @@
['elapsed', ''],
['attempts', /0/],
['datetime', //],
- ['status', ''],
+ %w[status waiting],
%w[name start-accession]
)
end