Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem 'jbuilder', '~> 2.5'
gem 'okcomputer'
gem 'pg'
gem 'puma', '~> 3.11'
gem 'resque'
gem 'stomp', '~> 1.4'

group :development, :test do
Expand Down
22 changes: 22 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ GEM
json
nokogiri
nokogiri-happymapper
mono_logger (1.1.0)
msgpack (1.2.9)
multi_json (1.13.1)
multipart-post (2.0.0)
mustermann (1.0.3)
net-http-persistent (3.0.0)
connection_pool (~> 2.2)
net-scp (2.0.0)
Expand All @@ -200,6 +202,8 @@ GEM
psych (3.1.0)
puma (3.12.1)
rack (2.0.7)
rack-protection (2.0.5)
rack
rack-test (1.1.0)
rack (>= 1.0, < 3)
rails (5.2.3)
Expand Down Expand Up @@ -235,6 +239,15 @@ GEM
rb-fsevent (0.10.3)
rb-inotify (0.10.0)
ffi (~> 1.0)
redis (4.1.0)
redis-namespace (1.6.0)
redis (>= 3.0.4)
resque (2.0.0)
mono_logger (~> 1.0)
multi_json (~> 1.0)
redis-namespace (~> 1.6)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
retries (0.0.5)
rspec-core (3.8.0)
rspec-support (~> 3.8.0)
Expand Down Expand Up @@ -263,6 +276,11 @@ GEM
unicode-display_width (>= 1.4.0, < 1.6)
ruby-progressbar (1.10.0)
ruby_dep (1.5.0)
sinatra (2.0.5)
mustermann (~> 1.0)
rack (~> 2.0)
rack-protection (= 2.0.5)
tilt (~> 2.0)
spring (2.0.2)
activesupport (>= 4.2)
spring-watcher-listen (2.0.1)
Expand All @@ -281,9 +299,12 @@ GEM
stomp (1.4.8)
thor (0.20.3)
thread_safe (0.3.6)
tilt (2.0.9)
tzinfo (1.2.5)
thread_safe (~> 0.1)
unicode-display_width (1.5.0)
vegas (0.1.11)
rack (>= 1.0.0)
websocket-driver (0.7.0)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.3)
Expand Down Expand Up @@ -311,6 +332,7 @@ DEPENDENCIES
puma (~> 3.11)
rails (~> 5.2.0)
rails-controller-testing
resque
rspec-rails
rubocop
spring
Expand Down
19 changes: 19 additions & 0 deletions app/controllers/steps_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ def update
end
# rubocop:enable Metrics/AbcSize

# Update a single WorkflowStep and if the status was "completed", enqueue the next.
# rubocop:disable Metrics/AbcSize
def next
parser = ProcessParser.new(process_from_request_body)
step = find_or_create_step_for_process

return render plain: process_mismatch_error(parser), status: :bad_request if parser.process != params[:process]

return render plain: status_mismatch_error(step), status: :conflict if params['current-status'] && step.status != params['current-status']

step.update(parser.to_h)
SendUpdateMessage.publish(druid: step.druid)

next_steps = NextStepService.for(step: step)
WorkerQueue.enqueue_steps(next_steps) if step.complete?
render json: { next_steps: next_steps }
end
# rubocop:enable Metrics/AbcSize

private

def process_mismatch_error(parser)
Expand Down
4 changes: 4 additions & 0 deletions app/models/workflow_step.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ def attributes_for_process
end
end
# rubocop:enable Metrics/MethodLength

def complete?
status == 'completed'
end
end
38 changes: 38 additions & 0 deletions app/services/worker_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

# Pushes workflow_step's into the Resque queue so that work can begin
#
# You may set the environment variable SETTINGS__ENABLE_QUEUING=false to
# prevent sending work to the Resque queue.
class WorkerQueue
# @param [ActiveRecord::Relation<WorkflowStep>] steps to enqueue
def self.enqueue_steps(steps)
return unless Settings.enable_queuing

steps.each do |step|
Resque.enqueue_to queue_name(step), job_name(step), step.druid
step.update(status: 'queued')
end
end

def self.job_name(step)
[
'Robots',
step.repository.camelcase + 'Repo', # 'Dor' conflicts with dor-services
step.workflow.sub('WF', '').camelcase,
step.process.tr('-', '_').camelcase
].join('::')
end
private_class_method :job_name

def self.queue_name(step)
[
step.repository,
step.workflow,
step.process,
'default'
].join('_')
end

private_class_method :queue_name
end
3 changes: 3 additions & 0 deletions config/initializers/resque.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# frozen_string_literal: true

Resque.redis = Settings.redis.url
1 change: 1 addition & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Create should be a POST, but this is what the Java WFS app did.
put ':workflow', to: 'workflows#create'
put ':workflow/:process', to: 'steps#update'
post ':workflow/:process/next', to: 'steps#next'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR is looking good so far. I'm curious about mixing post and put here. Does steps#next
function more like a create operation? The methods look similar to me.

end
end
end
Expand Down
4 changes: 4 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ dor_services:
password: 'password'

enable_stomp: true
enable_queuing: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more about what this change adds?


messaging:
uri: 'failover:(stomp+ssl://localhost:61612,stomp://remotehost:61613)'
# fedora_url is sent inside the message as the `entry.author.uri` field
fedora_url: 'https://dor-test.stanford.edu'

redis:
url: localhost:6379
138 changes: 138 additions & 0 deletions spec/requests/workflows/next_step_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe 'Start the next workflow step for an object', type: :request do
let(:client) { instance_double(Dor::Services::Client::Object, current_version: '1') }
let(:druid) { wf.druid }

before do
allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client)
end

before do
allow(SendUpdateMessage).to receive(:publish)
end

context 'with XML indicating success' do
let(:process_xml) do
'<process name="descriptive-metadata" status="completed" elapsed="3" laneId="default" note="Yay"/>'
end

let(:wf) do
FactoryBot.create(:workflow_step,
workflow: 'accessionWF',
process: 'descriptive-metadata',
status: 'error',
error_msg: 'Bang!',
lifecycle: 'submitted')
end

let!(:next_step) do
FactoryBot.create(:workflow_step,
druid: wf.druid,
workflow: wf.workflow,
process: 'rights-metadata',
status: 'waiting')
end

before do
allow(Resque).to receive(:enqueue_to)
end

it 'enqueues a message to start the next step and sets the status to queued' do
post "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}/next", params: process_xml

wf.reload
expect(wf.status).to eq 'completed'
expect(wf.error_msg).to be_nil

expect(wf.lifecycle).to eq 'submitted'
json = JSON.parse(response.body)
expect(json['next_steps']).to eq [JSON.parse(next_step.reload.to_json)]
expect(SendUpdateMessage).to have_received(:publish).with(druid: wf.druid)
expect(Resque).to have_received(:enqueue_to)
.with('dor_accessionWF_rights-metadata_default',
'Robots::DorRepo::Accession::RightsMetadata',
wf.druid)

expect(next_step.status).to eq 'queued'
end
end

context 'with XML indicating failure' do
let(:process_xml) do
'<process name="descriptive-metadata" status="error" elapsed="3" laneId="default" note="Yay"/>'
end

let(:wf) do
FactoryBot.create(:workflow_step,
workflow: 'accessionWF',
process: 'descriptive-metadata',
status: 'error',
error_msg: 'Bang!',
lifecycle: 'submitted')
end

let!(:next_step) do
FactoryBot.create(:workflow_step,
druid: wf.druid,
workflow: wf.workflow,
process: 'rights-metadata',
status: 'waiting')
end

before do
allow(Resque).to receive(:enqueue_to)
end

it "doesn't enqueue the next step " do
post "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}/next", params: process_xml

wf.reload
expect(wf.status).to eq 'error'
expect(wf.error_msg).to be_nil

expect(wf.lifecycle).to eq 'submitted'

expect(response.body).to eq '{"next_steps":[]}'
expect(SendUpdateMessage).to have_received(:publish).with(druid: wf.druid)
expect(Resque).not_to have_received(:enqueue_to)
end
end

context 'when the next step does not get enqueued' do
let(:process_xml) do
'<process name="sdr-ingest-transfer" status="completed" elapsed="3" laneId="default" note="Yay"/>'
end

let(:wf) do
FactoryBot.create(:workflow_step,
workflow: 'accessionWF',
process: 'sdr-ingest-transfer',
status: 'waiting')
end

let!(:next_step) do
FactoryBot.create(:workflow_step,
druid: wf.druid,
workflow: wf.workflow,
process: 'sdr-ingest-received',
status: 'waiting')
end

before do
allow(Resque).to receive(:enqueue_to)
end

it "doesn't enqueue the next step " do
post "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}/next", params: process_xml

wf.reload
expect(wf.status).to eq 'completed'
expect(response.body).to eq '{"next_steps":[]}'
expect(SendUpdateMessage).to have_received(:publish).with(druid: wf.druid)
expect(Resque).not_to have_received(:enqueue_to)
end
end
end