Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem 'jbuilder', '~> 2.5'
gem 'okcomputer'
gem 'pg'
gem 'puma', '~> 3.11'
gem 'resque', '~> 2.0'
gem 'stomp', '~> 1.4'

group :development, :test do
Expand Down
22 changes: 22 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ GEM
json
nokogiri
nokogiri-happymapper
mono_logger (1.1.0)
msgpack (1.2.10)
multi_json (1.13.1)
multipart-post (2.0.0)
mustermann (1.0.3)
net-http-persistent (3.0.1)
connection_pool (~> 2.2)
net-scp (2.0.0)
Expand All @@ -199,6 +201,8 @@ GEM
pg (1.1.4)
puma (3.12.1)
rack (2.0.7)
rack-protection (2.0.5)
rack
rack-test (1.1.0)
rack (>= 1.0, < 3)
rails (5.2.3)
Expand Down Expand Up @@ -234,6 +238,15 @@ GEM
rb-fsevent (0.10.3)
rb-inotify (0.10.0)
ffi (~> 1.0)
redis (4.1.0)
redis-namespace (1.6.0)
redis (>= 3.0.4)
resque (2.0.0)
mono_logger (~> 1.0)
multi_json (~> 1.0)
redis-namespace (~> 1.6)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
retries (0.0.5)
rspec-core (3.8.0)
rspec-support (~> 3.8.0)
Expand Down Expand Up @@ -261,6 +274,11 @@ GEM
unicode-display_width (>= 1.4.0, < 1.6)
ruby-progressbar (1.10.0)
ruby_dep (1.5.0)
sinatra (2.0.5)
mustermann (~> 1.0)
rack (~> 2.0)
rack-protection (= 2.0.5)
tilt (~> 2.0)
spring (2.0.2)
activesupport (>= 4.2)
spring-watcher-listen (2.0.1)
Expand All @@ -279,9 +297,12 @@ GEM
stomp (1.4.8)
thor (0.20.3)
thread_safe (0.3.6)
tilt (2.0.9)
tzinfo (1.2.5)
thread_safe (~> 0.1)
unicode-display_width (1.5.0)
vegas (0.1.11)
rack (>= 1.0.0)
websocket-driver (0.7.0)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.3)
Expand Down Expand Up @@ -309,6 +330,7 @@ DEPENDENCIES
puma (~> 3.11)
rails (~> 5.2.0)
rails-controller-testing
resque (~> 2.0)
rspec-rails
rubocop
spring
Expand Down
11 changes: 9 additions & 2 deletions app/controllers/steps_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# API for handling requests about a specific step within an object's workflow.
class StepsController < ApplicationController
# Update a single WorkflowStep
# If there are next steps, they are enqueued.
# rubocop:disable Metrics/AbcSize
def update
parser = ProcessParser.new(process_from_request_body)
Expand All @@ -14,8 +15,13 @@ def update
return render plain: status_mismatch_error(step), status: :conflict if params['current-status'] && step.status != params['current-status']

step.update(parser.to_h)

# Enqueue next steps
next_steps = NextStepService.for(step: step)
next_steps.each { |next_step| QueueService.enqueue(next_step) }

SendUpdateMessage.publish(druid: step.druid)
render json: { next_steps: NextStepService.for(step: step) }
render json: { next_steps: next_steps }
end
# rubocop:enable Metrics/AbcSize

Expand Down Expand Up @@ -48,6 +54,7 @@ def process_from_request_body
end

def current_version
ObjectVersionService.current_version(params[:druid])
# Providing the version as a param is for local testing without needing to run DOR services.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like us to move to this pattern in production too.

params[:version] || ObjectVersionService.current_version(params[:druid])
end
end
3 changes: 2 additions & 1 deletion app/controllers/versions_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def initialize_workflow
end

def current_version
@current_version ||= ObjectVersionService.current_version(params[:druid])
# Providing the version as a param is for local testing without needing to run DOR services.
@current_version ||= params[:version] || ObjectVersionService.current_version(params[:druid])
end

def initial_workflow
Expand Down
3 changes: 2 additions & 1 deletion app/controllers/workflows_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def create
private

def current_version
ObjectVersionService.current_version(params[:druid])
# Providing the version as a param is for local testing without needing to run DOR services.
params[:version] || ObjectVersionService.current_version(params[:druid])
end

def initial_parser
Expand Down
58 changes: 58 additions & 0 deletions app/services/queue_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

# Service for add workflow steps to Resqueue queues
class QueueService
# Enqueue the provided step
# @param [WorkflowStep] workflow step to enqueue
def self.enqueue(step)
QueueService.new(step).enqueue
end

attr_reader :step

# @param [WorkflowStep] workflow step to enqueue
def initialize(step)
@step = step
end

# Enqueue the provided step
def enqueue
# Perform the enqueue to Resque
Resque.enqueue_to(queue_name.to_sym, class_name, step.druid)
Rails.logger.info "Enqueued #{class_name} for #{step.druid} to #{queue_name}"

# Update status
step.status = 'queued'
step.save
end

private

# @example
# => dor:assemblyWF:jp2-create
def step_name
@step_name ||= "#{step.repository}:#{step.workflow}:#{step.process}"
end

# Generate the queue name from step
#
# @example
# => 'dor_assemblyWF_jp2-create_default'
# => 'dor_assemblyWF_jp2-create_mylane'
def queue_name
@queue_name ||= "#{step.repository}_#{step.workflow}_#{step.process}_#{step.lane_id}"
end

# Converts a given step to the Robot class name
# Based on https://github.com/sul-dlss/lyber-core/blob/master/lib/lyber_core/robot.rb#L33
# @example
# => 'Robots::DorRepo::Assembly::Jp2Create'
def class_name
@class_name ||= begin
repo = step.repository.camelize
workflow = step.workflow.sub('WF', '').camelize
process = step.process.tr('-', '_').camelize
"Robots::#{repo}Repo::#{workflow}::#{process}"
end
end
end
11 changes: 11 additions & 0 deletions app/services/workflow_creator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,23 @@ def create_workflow_steps
WorkflowStep.create!(workflow_attributes(process))
end
end
enqueue
end

private

attr_reader :processes, :workflow_id

def enqueue
# Get the first step and enqueue any next steps
first_step = WorkflowStep.find_by(workflow: workflow_id, druid: version.druid, active_version: true,
process: processes.first.process)

# Enqueue next steps
next_steps = NextStepService.for(step: first_step)
next_steps.each { |next_step| QueueService.enqueue(next_step) }
end

def workflow_attributes(process)
{
workflow: workflow_id,
Expand Down
9 changes: 9 additions & 0 deletions config/initializers/redis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

# Load Resque configuration and controller
redis = Redis.new(host: Settings.redis.hostname,
port: Settings.redis.port,
thread_safe: true,
db: Settings.redis.db)

Resque.redis = Redis::Namespace.new(Settings.redis.namespace, redis: redis)
7 changes: 7 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ messaging:
uri: 'failover:(stomp+ssl://localhost:61612,stomp://remotehost:61613)'
# fedora_url is sent inside the message as the `entry.author.uri` field
fedora_url: 'https://dor-test.stanford.edu'


redis:
hostname: localhost
port: 6379
db: 0
namespace: 'resque:development'
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
app:
build:
context: ./
dockerfile: docker/Dockerfile.dev
dockerfile: Dockerfile
environment:
- RAILS_LOG_TO_STDOUT=true
- DATABASE_NAME=workflow-server
Expand All @@ -13,7 +13,8 @@ services:
- DATABASE_HOSTNAME=db
- DATABASE_PORT=5432
- SECRET_KEY_BASE="${SECRET_KEY_BASE}"
image: 'suldlss/workflow-server:latest-dev'
- SETTINGS__ENABLE_STOMP=false
- SETTINGS__REDIS__HOSTNAME=redis
volumes:
- .:/app
ports:
Expand All @@ -32,5 +33,11 @@ services:
- ./postgres-data:/var/lib/postgresql/data
networks:
- wfs
redis:
image: redis
ports:
- "6379:6379"
networks:
- wfs
networks:
wfs:
26 changes: 26 additions & 0 deletions lib/tasks/workflow.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

namespace :workflow do
desc 'Update a workflow step'
task :step, %i[repo druid workflow process version status] => :environment do |_task, args|
step = WorkflowStep.find_by(
repository: args[:repo],
druid: args[:druid],
workflow: args[:workflow],
process: args[:process],
version: args[:version]
)

raise 'Workflow step does not already exist' if step.nil?

step.status = args[:status]
step.save
puts("Setting #{args[:process]} to #{args[:status]}")

# Enqueue next steps
next_steps = NextStepService.for(step: step)
next_steps.each { |next_step| QueueService.enqueue(next_step) }

SendUpdateMessage.publish(druid: step.druid)
end
end
63 changes: 63 additions & 0 deletions spec/controllers/steps_controller_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe StepsController do
let(:repository) { 'dor' }
let(:client) { instance_double(Dor::Services::Client::Object, current_version: '1') }
let(:druid) { first_step.druid }
let(:workflow_id) { 'accessionWF' }
let(:first_step) { FactoryBot.create(:workflow_step, status: 'completed') } # start-accession, which is already completed

before do
FactoryBot.create(:workflow_step, druid: druid, process: 'descriptive-metadata')
FactoryBot.create(:workflow_step, druid: druid, process: 'rights-metadata')
allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client)
allow(SendUpdateMessage).to receive(:publish)
allow(QueueService).to receive(:enqueue)
end

describe 'PUT update' do
context 'when updating a step' do
let(:body) { '<process name="descriptive-metadata" status="test" />' }

it 'updates the step' do
put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
process: 'descriptive-metadata', format: :xml }
expect(response.body).to eq('{"next_steps":[]}')
expect(SendUpdateMessage).to have_received(:publish).with(druid: druid)
expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('test')
expect(QueueService).to_not have_received(:enqueue)
end

it 'verifies the current status' do
put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
process: 'descriptive-metadata', 'current-status': 'not-waiting',
format: :xml }
expect(response.body).to eq('Status in params (not-waiting) does not match current status (waiting)')
expect(response.code).to eq('409')
end

it 'verifies that process in url and body match' do
put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
process: 'rights-metadata', format: :xml }
expect(response.body).to eq('Process name in body (descriptive-metadata) does not match process name in URI ' \
'(rights-metadata)')
expect(response.code).to eq('400')
end
end

context 'when completing a step' do
let(:body) { '<process name="descriptive-metadata" status="completed" />' }
it 'updates the step and enqueues next step' do
put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id,
process: 'descriptive-metadata', format: :xml }
expect(response.body).to match(/rights-metadata/)
expect(SendUpdateMessage).to have_received(:publish).with(druid: druid)
expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('completed')
expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid,
process: 'rights-metadata'))
end
end
end
end
1 change: 1 addition & 0 deletions spec/controllers/workflows_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

before do
allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client)
allow(QueueService).to receive(:enqueue)
end

describe 'GET archive' do
Expand Down
1 change: 1 addition & 0 deletions spec/factories/workflow_steps.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
repository { 'dor' }
version { 1 }
lane_id { 'default' }
status { 'waiting' }
end
end
2 changes: 1 addition & 1 deletion spec/models/workflow_step_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
elapsed: nil,
attempts: 0,
datetime: String,
status: nil,
status: 'waiting',
name: 'start-accession'
)
}
Expand Down
1 change: 1 addition & 0 deletions spec/requests/version_close_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
obj_client = instance_double(Dor::Services::Client::Object, current_version: '2')

allow(Dor::Services::Client).to receive(:object).with(druid).and_return(obj_client)
allow(QueueService).to receive(:enqueue)
end

it 'closes the version' do
Expand Down
2 changes: 1 addition & 1 deletion spec/requests/workflows/update_step_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
it 'does not update the step' do
expect_any_instance_of(WorkflowStep).not_to receive(:update)

put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=waiting", params: process_xml
put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=not-waiting", params: process_xml

# NOTE: `#be_conflict` does not exist as a matcher for 409 errors
expect(response.status).to eq 409
Expand Down
Loading