Skip to content

Commit

Permalink
Convert "large" CV jobs to async basis
Browse files Browse the repository at this point in the history
We dispense with the old ordering here in order to use the conventional
`.find_each`.  Note: order should not be a part of a scope in any case.

Async CV performance should be good enough and flexible enough that we
are less concerned about which row gets processed "first", since all of
them will be processed "soon enough".  Alternatively, we can circle back
with a multi-query approach that first enumerates the distinct:
`DATE_TRUNC('day', last_checksum_validation) AS last_cv ORDER BY last_cv`

for expired checksums and then retrieves a batch for each day. But that
can wait.
  • Loading branch information
atz authored and SaravShah committed Jun 28, 2018
1 parent ced00f5 commit 72cd2b1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 118 deletions.
18 changes: 5 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,19 @@ this will generate a log at, for example, `log/profile_C2M_check_version_all_dir
- Parse all manifestInventory.xml and most recent signatureCatalog.xml for stored checksums and verify against computed checksums.
- To run rake tasks below, give the name of the endpoint (e.g. from settings/development.yml)

Note: CV jobs that are asynchronous means that their execution happens in other processes. Therefore there is no `profile` option,
because the computational work that is valuable to profile happens elsewhere.

### Single Root
- Without profiling
This queues objects for asynchronous CV:
```sh
RAILS_ENV=production bundle exec rake cv:one_root[fixture_sr3]
```
- With profiling
```sh
RAILS_ENV=production bundle exec rake cv:one_root[fixture_sr3,profile]
```
this will generate a log at, for example, `log/profile_cv_validate_disk2018-01-01T14:25:31-flat.txt`

### All Roots
- Without profiling:
This is also asynchronous:
```sh
RAILS_ENV=production bundle exec rake cv:all_roots
```
- With profiling:
```sh
RAILS_ENV=production bundle exec rake cv:all_roots[profile]
```
this will generate a log at, for example, `log/profile_cv_validate_disk_all_endpoints2018-01-01T14:25:31-flat.txt`

### Single Druid
- Without profiling:
Expand Down
25 changes: 6 additions & 19 deletions app/lib/audit/checksum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,24 @@ class << self
delegate :logger, to: ::PreservationCatalog::Application
end

# Queues asynchronous CV
def self.validate_disk(endpoint_name, limit=Settings.c2m_sql_limit)
logger.info "#{Time.now.utc.iso8601} CV validate_disk starting for #{endpoint_name}"
# pres_copies is an AR Relation; it could return a lot of results, so we want to process it in
# batches. we can't use ActiveRecord's .find_each, because that'll disregard the order .fixity_check_expired
# specified. so we use our own batch processing method, which does respect Relation order.
pres_copies = PreservedCopy.by_endpoint_name(endpoint_name).for_online_endpoints.fixity_check_expired
logger.info "Number of Preserved Copies to be checksum validated: #{pres_copies.count}"
ActiveRecordUtils.process_in_batches(pres_copies, limit) do |pc|
ChecksumValidator.new(pc).validate_checksums
end
logger.info "Number of Preserved Copies to be enqueued for CV: #{pres_copies.count}"
pres_copies.find_each(&:validate_checksums!)
ensure
logger.info "#{Time.now.utc.iso8601} CV validate_disk ended for #{endpoint_name}"
end

def self.validate_disk_profiled(endpoint_name)
Profiler.print_profile('cv_validate_disk') { validate_disk(endpoint_name) }
logger.info "#{Time.now.utc.iso8601} CV validate_disk for #{endpoint_name}"
end

# Asynchronous
def self.validate_disk_all_endpoints
logger.info "#{Time.now.utc.iso8601} CV validate_disk_all_endpoints starting"
HostSettings.storage_roots.to_h.each_key do |strg_root_name|
validate_disk(strg_root_name)
end
HostSettings.storage_roots.to_h.each_key { |key| validate_disk(key) }
ensure
logger.info "#{Time.now.utc.iso8601} CV validate_disk_all_endpoints ended"
end

def self.validate_disk_all_endpoints_profiled
Profiler.print_profile('cv_validate_disk_all_endpoints') { validate_disk_all_endpoints }
end

def self.validate_druid(druid)
logger.info "#{Time.now.utc.iso8601} CV validate_druid starting for #{druid}"
pres_copies = PreservedCopy.by_druid(druid).for_online_endpoints
Expand Down
27 changes: 6 additions & 21 deletions lib/tasks/cv_tasks.rake
Original file line number Diff line number Diff line change
@@ -1,33 +1,18 @@
namespace :cv do

desc "Run CV (checksum validation) on a single storage root"
task :one_root, [:storage_root, :profile] => [:environment] do |_t, args|
unless args[:profile] == 'profile' || args[:profile].nil?
p "usage: rake cv:one_root[storage_root] || rake cv:one_root[storage_root,profile]"
task :one_root, [:storage_root] => [:environment] do |_t, args|
if args[:storage_root].nil?
p "usage: rake cv:one_root[storage_root]"
exit 1
end
storage_root = args[:storage_root].to_sym
if args[:profile] == 'profile'
puts "When done, check log/profile_cv_validate_disk[TIMESTAMP] for profiling details"
Audit::Checksum.validate_disk_profiled(storage_root)
elsif args[:profile].nil?
Audit::Checksum.validate_disk(storage_root)
end
Audit::Checksum.validate_disk(args[:storage_root].to_sym)
puts "#{Time.now.utc.iso8601} Checksum Validation on #{storage_root} is done."
end

desc "Run CV (checksum validation) on all storage roots"
task :all_roots, [:profile] => [:environment] do |_t, args|
unless args[:profile] == 'profile' || args[:profile].nil?
p "usage: rake cv:all_roots || rake cv:all_roots[profile]"
exit 1
end
if args[:profile] == 'profile'
puts "When done, check log/profile_cv_validate_disk_all_endpoints[TIMESTAMP].txt for profiling details"
Audit::Checksum.validate_disk_all_endpoints_profiled
elsif args[:profile].nil?
Audit::Checksum.validate_disk_all_endpoints
end
task all_roots: [:environment] do
Audit::Checksum.validate_disk_all_endpoints
puts "#{Time.now.utc.iso8601} Checksum Validation on all storage roots is done."
end

Expand Down
73 changes: 9 additions & 64 deletions spec/lib/audit/checksum_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,87 +13,32 @@

describe '.validate_disk' do
include_context 'fixture moabs in db'
let(:subject) { described_class.validate_disk(endpoint_name, limit) }

context 'when there are PreservedCopies to check' do
let(:cv_mock) { instance_double(ChecksumValidator) }

it 'creates an instance and calls #validate_checksums for every result when results are in a single batch' do
allow(ChecksumValidator).to receive(:new).and_return(cv_mock)
expect(cv_mock).to receive(:validate_checksums).exactly(3).times
described_class.validate_disk(endpoint_name, limit)
end

it 'creates an instance and calls #validate_checksums on everything in batches' do
pcs_from_scope = PreservedCopy.by_endpoint_name(endpoint_name).fixity_check_expired
cv_list = pcs_from_scope.map { |pc| ChecksumValidator.new(pc) }
expect(cv_list.size).to eq 3
cv_list.each do |cv|
allow(ChecksumValidator).to receive(:new).with(cv.preserved_copy).and_return(cv)
expect(cv).to receive(:validate_checksums).exactly(1).times.and_call_original
end
described_class.validate_disk(endpoint_name, 2)
end
it 'enqueues matching PCs for CV check' do
expect(ChecksumValidationJob).to receive(:perform_later).with(PreservedCopy).exactly(3).times
described_class.validate_disk(endpoint_name, limit)
end

context 'when there are no PreservedCopies to check' do
it 'will not create an instance of ChecksumValidator' do
allow(ChecksumValidator).to receive(:new)
it 'will not enqueue PCs' do
expect(ChecksumValidationJob).not_to receive(:perform_later)
PreservedCopy.all.update(last_checksum_validation: (Time.now.utc + 2.days))
expect(ChecksumValidator).not_to receive(:new)
subject
described_class.validate_disk(endpoint_name, limit)
end
end
end

describe ".validate_disk_profiled" do
let(:subject) { described_class.validate_disk_profiled('fixture_sr3') }

it "spins up a profiler, calling profiling and printing methods on it" do
mock_profiler = instance_double(Profiler)
expect(Profiler).to receive(:new).and_return(mock_profiler)
expect(mock_profiler).to receive(:prof)
expect(mock_profiler).to receive(:print_results_flat).with('cv_validate_disk')
subject
end

it "calls .validate_disk" do
expect(described_class).to receive(:validate_disk)
subject
end
end

describe ".validate_disk_all_endpoints" do
let(:subject) { described_class.validate_disk_all_endpoints }

it 'calls validate_disk once per storage root' do
expect(described_class).to receive(:validate_disk).exactly(HostSettings.storage_roots.entries.count).times
subject
described_class.validate_disk_all_endpoints
end

it 'calls validate_disk with the right arguments' do
HostSettings.storage_roots.to_h.each_key do |storage_name|
expect(described_class).to receive(:validate_disk).with(
storage_name
)
expect(described_class).to receive(:validate_disk).with(storage_name)
end
subject
end
end

describe ".validate_disk_all_endpoints_profiled" do
let(:subject) { described_class.validate_disk_all_endpoints_profiled }

it "spins up a profiler, calling profiling and printing methods on it" do
mock_profiler = instance_double(Profiler)
expect(Profiler).to receive(:new).and_return(mock_profiler)
expect(mock_profiler).to receive(:prof)
expect(mock_profiler).to receive(:print_results_flat).with('cv_validate_disk_all_endpoints')
subject
end
it "calls .validate_disk_all_endpoints" do
expect(described_class).to receive(:validate_disk_all_endpoints)
subject
described_class.validate_disk_all_endpoints
end
end

Expand Down
2 changes: 1 addition & 1 deletion spec/models/preserved_copy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@

describe '#validate_checksums!' do
it 'passes self to ChecksumValidationJob' do
expect(ChecksumValidationJob).to receive(:perform_later).with(preserved_copy)
expect(ChecksumValidationJob).to receive(:perform_later).with(described_class)
preserved_copy.validate_checksums!
end
end
Expand Down

0 comments on commit 72cd2b1

Please sign in to comment.