From adda8948f52babd9069e529be4ac33533daeffc4 Mon Sep 17 00:00:00 2001 From: Justin Littman Date: Fri, 3 May 2019 09:04:31 -0400 Subject: [PATCH] Adds redis queueing of next steps. --- Gemfile | 1 + Gemfile.lock | 22 +++++++ app/controllers/steps_controller.rb | 11 +++- app/controllers/versions_controller.rb | 3 +- app/controllers/workflows_controller.rb | 3 +- app/services/queue_service.rb | 58 +++++++++++++++++ app/services/workflow_creator.rb | 11 ++++ config/initializers/redis.rb | 9 +++ config/settings.yml | 7 +++ docker-compose.yml | 11 +++- lib/tasks/workflow.rake | 26 ++++++++ spec/controllers/steps_controller_spec.rb | 63 +++++++++++++++++++ spec/controllers/workflows_controller_spec.rb | 1 + spec/factories/workflow_steps.rb | 1 + spec/models/workflow_step_spec.rb | 2 +- spec/requests/version_close_spec.rb | 1 + spec/requests/workflows/update_step_spec.rb | 2 +- spec/services/queue_service_spec.rb | 46 ++++++++++++++ spec/services/workflow_creator_spec.rb | 6 ++ .../views/workflows/index.xml.builder_spec.rb | 2 +- spec/views/workflows/show.xml.builder_spec.rb | 2 +- 21 files changed, 278 insertions(+), 10 deletions(-) create mode 100644 app/services/queue_service.rb create mode 100644 config/initializers/redis.rb create mode 100644 lib/tasks/workflow.rake create mode 100644 spec/controllers/steps_controller_spec.rb create mode 100644 spec/services/queue_service_spec.rb diff --git a/Gemfile b/Gemfile index f423849d..5724234a 100644 --- a/Gemfile +++ b/Gemfile @@ -16,6 +16,7 @@ gem 'jbuilder', '~> 2.5' gem 'okcomputer' gem 'pg' gem 'puma', '~> 3.11' +gem 'resque', '~> 2.0' gem 'stomp', '~> 1.4' group :development, :test do diff --git a/Gemfile.lock b/Gemfile.lock index 14e9f30c..dcc8d6a8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -179,9 +179,11 @@ GEM json nokogiri nokogiri-happymapper + mono_logger (1.1.0) msgpack (1.2.10) multi_json (1.13.1) multipart-post (2.0.0) + mustermann (1.0.3) net-http-persistent (3.0.1) connection_pool (~> 2.2) net-scp (2.0.0) @@ -199,6 +201,8 @@ GEM pg (1.1.4) puma (3.12.1) rack (2.0.7) + rack-protection (2.0.5) + rack rack-test (1.1.0) rack (>= 1.0, < 3) rails (5.2.3) @@ -234,6 +238,15 @@ GEM rb-fsevent (0.10.3) rb-inotify (0.10.0) ffi (~> 1.0) + redis (4.1.0) + redis-namespace (1.6.0) + redis (>= 3.0.4) + resque (2.0.0) + mono_logger (~> 1.0) + multi_json (~> 1.0) + redis-namespace (~> 1.6) + sinatra (>= 0.9.2) + vegas (~> 0.1.2) retries (0.0.5) rspec-core (3.8.0) rspec-support (~> 3.8.0) @@ -261,6 +274,11 @@ GEM unicode-display_width (>= 1.4.0, < 1.6) ruby-progressbar (1.10.0) ruby_dep (1.5.0) + sinatra (2.0.5) + mustermann (~> 1.0) + rack (~> 2.0) + rack-protection (= 2.0.5) + tilt (~> 2.0) spring (2.0.2) activesupport (>= 4.2) spring-watcher-listen (2.0.1) @@ -279,9 +297,12 @@ GEM stomp (1.4.8) thor (0.20.3) thread_safe (0.3.6) + tilt (2.0.9) tzinfo (1.2.5) thread_safe (~> 0.1) unicode-display_width (1.5.0) + vegas (0.1.11) + rack (>= 1.0.0) websocket-driver (0.7.0) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.3) @@ -309,6 +330,7 @@ DEPENDENCIES puma (~> 3.11) rails (~> 5.2.0) rails-controller-testing + resque (~> 2.0) rspec-rails rubocop spring diff --git a/app/controllers/steps_controller.rb b/app/controllers/steps_controller.rb index e40dcd7b..7fd0c44f 100644 --- a/app/controllers/steps_controller.rb +++ b/app/controllers/steps_controller.rb @@ -4,6 +4,7 @@ # API for handling requests about a specific step within an object's workflow. class StepsController < ApplicationController # Update a single WorkflowStep + # If there are next steps, they are enqueued. # rubocop:disable Metrics/AbcSize def update parser = ProcessParser.new(process_from_request_body) @@ -14,8 +15,13 @@ def update return render plain: status_mismatch_error(step), status: :conflict if params['current-status'] && step.status != params['current-status'] step.update(parser.to_h) + + # Enqueue next steps + next_steps = NextStepService.for(step: step) + next_steps.each { |next_step| QueueService.enqueue(next_step) } + SendUpdateMessage.publish(druid: step.druid) - render json: { next_steps: NextStepService.for(step: step) } + render json: { next_steps: next_steps } end # rubocop:enable Metrics/AbcSize @@ -48,6 +54,7 @@ def process_from_request_body end def current_version - ObjectVersionService.current_version(params[:druid]) + # Providing the version as a param is for local testing without needing to run DOR services. + params[:version] || ObjectVersionService.current_version(params[:druid]) end end diff --git a/app/controllers/versions_controller.rb b/app/controllers/versions_controller.rb index 5965e683..e0fa19c8 100644 --- a/app/controllers/versions_controller.rb +++ b/app/controllers/versions_controller.rb @@ -42,7 +42,8 @@ def initialize_workflow end def current_version - @current_version ||= ObjectVersionService.current_version(params[:druid]) + # Providing the version as a param is for local testing without needing to run DOR services. + @current_version ||= params[:version] || ObjectVersionService.current_version(params[:druid]) end def initial_workflow diff --git a/app/controllers/workflows_controller.rb b/app/controllers/workflows_controller.rb index 02d165cc..36b5edbd 100644 --- a/app/controllers/workflows_controller.rb +++ b/app/controllers/workflows_controller.rb @@ -92,7 +92,8 @@ def create private def current_version - ObjectVersionService.current_version(params[:druid]) + # Providing the version as a param is for local testing without needing to run DOR services. + params[:version] || ObjectVersionService.current_version(params[:druid]) end def initial_parser diff --git a/app/services/queue_service.rb b/app/services/queue_service.rb new file mode 100644 index 00000000..6abb26b7 --- /dev/null +++ b/app/services/queue_service.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +# Service for add workflow steps to Resqueue queues +class QueueService + # Enqueue the provided step + # @param [WorkflowStep] workflow step to enqueue + def self.enqueue(step) + QueueService.new(step).enqueue + end + + attr_reader :step + + # @param [WorkflowStep] workflow step to enqueue + def initialize(step) + @step = step + end + + # Enqueue the provided step + def enqueue + # Perform the enqueue to Resque + Resque.enqueue_to(queue_name.to_sym, class_name, step.druid) + Rails.logger.info "Enqueued #{class_name} for #{step.druid} to #{queue_name}" + + # Update status + step.status = 'queued' + step.save + end + + private + + # @example + # => dor:assemblyWF:jp2-create + def step_name + @step_name ||= "#{step.repository}:#{step.workflow}:#{step.process}" + end + + # Generate the queue name from step + # + # @example + # => 'dor_assemblyWF_jp2-create_default' + # => 'dor_assemblyWF_jp2-create_mylane' + def queue_name + @queue_name ||= "#{step.repository}_#{step.workflow}_#{step.process}_#{step.lane_id}" + end + + # Converts a given step to the Robot class name + # Based on https://github.com/sul-dlss/lyber-core/blob/master/lib/lyber_core/robot.rb#L33 + # @example + # => 'Robots::DorRepo::Assembly::Jp2Create' + def class_name + @class_name ||= begin + repo = step.repository.camelize + workflow = step.workflow.sub('WF', '').camelize + process = step.process.tr('-', '_').camelize + "Robots::#{repo}Repo::#{workflow}::#{process}" + end + end +end diff --git a/app/services/workflow_creator.rb b/app/services/workflow_creator.rb index 5abad599..e85ceb0f 100644 --- a/app/services/workflow_creator.rb +++ b/app/services/workflow_creator.rb @@ -29,12 +29,23 @@ def create_workflow_steps WorkflowStep.create!(workflow_attributes(process)) end end + enqueue end private attr_reader :processes, :workflow_id + def enqueue + # Get the first step and enqueue any next steps + first_step = WorkflowStep.find_by(workflow: workflow_id, druid: version.druid, active_version: true, + process: processes.first.process) + + # Enqueue next steps + next_steps = NextStepService.for(step: first_step) + next_steps.each { |next_step| QueueService.enqueue(next_step) } + end + def workflow_attributes(process) { workflow: workflow_id, diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb new file mode 100644 index 00000000..93fd9d5f --- /dev/null +++ b/config/initializers/redis.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +# Load Resque configuration and controller +redis = Redis.new(host: Settings.redis.hostname, + port: Settings.redis.port, + thread_safe: true, + db: Settings.redis.db) + +Resque.redis = Redis::Namespace.new(Settings.redis.namespace, redis: redis) diff --git a/config/settings.yml b/config/settings.yml index 8487a973..a5c28a55 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -9,3 +9,10 @@ messaging: uri: 'failover:(stomp+ssl://localhost:61612,stomp://remotehost:61613)' # fedora_url is sent inside the message as the `entry.author.uri` field fedora_url: 'https://dor-test.stanford.edu' + + +redis: + hostname: localhost + port: 6379 + db: 0 + namespace: 'resque:development' diff --git a/docker-compose.yml b/docker-compose.yml index f3d26a35..efb523ad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,7 @@ services: app: build: context: ./ - dockerfile: docker/Dockerfile.dev + dockerfile: Dockerfile environment: - RAILS_LOG_TO_STDOUT=true - DATABASE_NAME=workflow-server @@ -13,7 +13,8 @@ services: - DATABASE_HOSTNAME=db - DATABASE_PORT=5432 - SECRET_KEY_BASE="${SECRET_KEY_BASE}" - image: 'suldlss/workflow-server:latest-dev' + - SETTINGS__ENABLE_STOMP=false + - SETTINGS__REDIS__HOSTNAME=redis volumes: - .:/app ports: @@ -32,5 +33,11 @@ services: - ./postgres-data:/var/lib/postgresql/data networks: - wfs + redis: + image: redis + ports: + - "6379:6379" + networks: + - wfs networks: wfs: diff --git a/lib/tasks/workflow.rake b/lib/tasks/workflow.rake new file mode 100644 index 00000000..1fc5d3a9 --- /dev/null +++ b/lib/tasks/workflow.rake @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +namespace :workflow do + desc 'Update a workflow step' + task :step, %i[repo druid workflow process version status] => :environment do |_task, args| + step = WorkflowStep.find_by( + repository: args[:repo], + druid: args[:druid], + workflow: args[:workflow], + process: args[:process], + version: args[:version] + ) + + raise 'Workflow step does not already exist' if step.nil? + + step.status = args[:status] + step.save + puts("Setting #{args[:process]} to #{args[:status]}") + + # Enqueue next steps + next_steps = NextStepService.for(step: step) + next_steps.each { |next_step| QueueService.enqueue(next_step) } + + SendUpdateMessage.publish(druid: step.druid) + end +end diff --git a/spec/controllers/steps_controller_spec.rb b/spec/controllers/steps_controller_spec.rb new file mode 100644 index 00000000..3e4b4fbe --- /dev/null +++ b/spec/controllers/steps_controller_spec.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe StepsController do + let(:repository) { 'dor' } + let(:client) { instance_double(Dor::Services::Client::Object, current_version: '1') } + let(:druid) { first_step.druid } + let(:workflow_id) { 'accessionWF' } + let(:first_step) { FactoryBot.create(:workflow_step, status: 'completed') } # start-accession, which is already completed + + before do + FactoryBot.create(:workflow_step, druid: druid, process: 'descriptive-metadata') + FactoryBot.create(:workflow_step, druid: druid, process: 'rights-metadata') + allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client) + allow(SendUpdateMessage).to receive(:publish) + allow(QueueService).to receive(:enqueue) + end + + describe 'PUT update' do + context 'when updating a step' do + let(:body) { '' } + + it 'updates the step' do + put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id, + process: 'descriptive-metadata', format: :xml } + expect(response.body).to eq('{"next_steps":[]}') + expect(SendUpdateMessage).to have_received(:publish).with(druid: druid) + expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('test') + expect(QueueService).to_not have_received(:enqueue) + end + + it 'verifies the current status' do + put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id, + process: 'descriptive-metadata', 'current-status': 'not-waiting', + format: :xml } + expect(response.body).to eq('Status in params (not-waiting) does not match current status (waiting)') + expect(response.code).to eq('409') + end + + it 'verifies that process in url and body match' do + put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id, + process: 'rights-metadata', format: :xml } + expect(response.body).to eq('Process name in body (descriptive-metadata) does not match process name in URI ' \ + '(rights-metadata)') + expect(response.code).to eq('400') + end + end + + context 'when completing a step' do + let(:body) { '' } + it 'updates the step and enqueues next step' do + put :update, body: body, params: { repo: repository, druid: druid, workflow: workflow_id, + process: 'descriptive-metadata', format: :xml } + expect(response.body).to match(/rights-metadata/) + expect(SendUpdateMessage).to have_received(:publish).with(druid: druid) + expect(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata').status).to eq('completed') + expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid, + process: 'rights-metadata')) + end + end + end +end diff --git a/spec/controllers/workflows_controller_spec.rb b/spec/controllers/workflows_controller_spec.rb index a6677166..a5a0d420 100644 --- a/spec/controllers/workflows_controller_spec.rb +++ b/spec/controllers/workflows_controller_spec.rb @@ -11,6 +11,7 @@ before do allow(Dor::Services::Client).to receive(:object).with(druid).and_return(client) + allow(QueueService).to receive(:enqueue) end describe 'GET archive' do diff --git a/spec/factories/workflow_steps.rb b/spec/factories/workflow_steps.rb index e7ee2fc0..b9f70da4 100644 --- a/spec/factories/workflow_steps.rb +++ b/spec/factories/workflow_steps.rb @@ -10,5 +10,6 @@ repository { 'dor' } version { 1 } lane_id { 'default' } + status { 'waiting' } end end diff --git a/spec/models/workflow_step_spec.rb b/spec/models/workflow_step_spec.rb index 140ada12..676bb7c5 100644 --- a/spec/models/workflow_step_spec.rb +++ b/spec/models/workflow_step_spec.rb @@ -49,7 +49,7 @@ elapsed: nil, attempts: 0, datetime: String, - status: nil, + status: 'waiting', name: 'start-accession' ) } diff --git a/spec/requests/version_close_spec.rb b/spec/requests/version_close_spec.rb index 3c519cab..76f7d075 100644 --- a/spec/requests/version_close_spec.rb +++ b/spec/requests/version_close_spec.rb @@ -9,6 +9,7 @@ obj_client = instance_double(Dor::Services::Client::Object, current_version: '2') allow(Dor::Services::Client).to receive(:object).with(druid).and_return(obj_client) + allow(QueueService).to receive(:enqueue) end it 'closes the version' do diff --git a/spec/requests/workflows/update_step_spec.rb b/spec/requests/workflows/update_step_spec.rb index c85b4e33..86c91801 100644 --- a/spec/requests/workflows/update_step_spec.rb +++ b/spec/requests/workflows/update_step_spec.rb @@ -93,7 +93,7 @@ it 'does not update the step' do expect_any_instance_of(WorkflowStep).not_to receive(:update) - put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=waiting", params: process_xml + put "/dor/objects/#{druid}/workflows/#{wf.workflow}/#{wf.process}?current-status=not-waiting", params: process_xml # NOTE: `#be_conflict` does not exist as a matcher for 409 errors expect(response.status).to eq 409 diff --git a/spec/services/queue_service_spec.rb b/spec/services/queue_service_spec.rb new file mode 100644 index 00000000..e5a1552e --- /dev/null +++ b/spec/services/queue_service_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe QueueService do + let(:service) { described_class.new step } + + let(:step) { FactoryBot.create(:workflow_step, workflow: 'assemblyWF', process: 'jp2-create') } + + describe '#enqueue' do + before do + allow(Resque).to receive(:enqueue_to) + end + + it 'enqueues to Resque and updates status' do + service.enqueue + expect(Resque).to have_received(:enqueue_to).with(:"dor_assemblyWF_jp2-create_default", + 'Robots::DorRepo::Assembly::Jp2Create', step.druid) + expect(step.status).to eq('queued') + end + end + + describe '#step_name' do + let(:step_name) { service.send(:step_name) } + + it 'create correct step_name' do + expect(step_name).to eq('dor:assemblyWF:jp2-create') + end + end + + describe '#queue_name' do + let(:queue_name) { service.send(:queue_name) } + + it 'create correct queue_name' do + expect(queue_name).to eq('dor_assemblyWF_jp2-create_default') + end + end + + describe '#class_name' do + let(:class_name) { service.send(:class_name) } + + it 'create correct class_name' do + expect(class_name).to eq('Robots::DorRepo::Assembly::Jp2Create') + end + end +end diff --git a/spec/services/workflow_creator_spec.rb b/spec/services/workflow_creator_spec.rb index 2e3edf87..5fa0ce7a 100644 --- a/spec/services/workflow_creator_spec.rb +++ b/spec/services/workflow_creator_spec.rb @@ -23,6 +23,10 @@ ) end + before do + allow(QueueService).to receive(:enqueue) + end + describe '#create_workflow_steps' do subject(:create_workflow_steps) { wf_creator.create_workflow_steps } @@ -32,6 +36,7 @@ end.to change(WorkflowStep, :count).by(13) expect(WorkflowStep.last.druid).to eq druid expect(WorkflowStep.last.repository).to eq repository + expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata')) end context 'when workflow steps already exists' do @@ -43,6 +48,7 @@ expect do create_workflow_steps end.not_to change(WorkflowStep, :count) + expect(QueueService).to have_received(:enqueue).with(WorkflowStep.find_by(druid: druid, process: 'descriptive-metadata')) end end end diff --git a/spec/views/workflows/index.xml.builder_spec.rb b/spec/views/workflows/index.xml.builder_spec.rb index 49b2300c..a5b347ef 100644 --- a/spec/views/workflows/index.xml.builder_spec.rb +++ b/spec/views/workflows/index.xml.builder_spec.rb @@ -47,7 +47,7 @@ ['elapsed', ''], ['attempts', /0/], ['datetime', //], - ['status', ''], + %w[status waiting], %w[name start-accession] ) end diff --git a/spec/views/workflows/show.xml.builder_spec.rb b/spec/views/workflows/show.xml.builder_spec.rb index f0b67fb9..c570c0ba 100644 --- a/spec/views/workflows/show.xml.builder_spec.rb +++ b/spec/views/workflows/show.xml.builder_spec.rb @@ -37,7 +37,7 @@ ['elapsed', ''], ['attempts', /0/], ['datetime', //], - ['status', ''], + %w[status waiting], %w[name start-accession] ) end