Skip to content

Commit

Permalink
Rename things in related classes and specs
Browse files Browse the repository at this point in the history
  • Loading branch information
tallenaz committed Jul 14, 2018
1 parent 878162a commit 0d113f7
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 80 deletions.
18 changes: 9 additions & 9 deletions app/jobs/plexer_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Preconditions:
# All needed PreservedCopy and ArchivePreservedCopy rows are already made.
# All needed PreservedCopy and ZippedMoabVersion rows are already made.
# Possible TODO: replace precondition w/ invocation of PO2PC/PC2APC method.
#
# Responsibilities:
Expand Down Expand Up @@ -29,21 +29,21 @@ class PlexerJob < ZipPartJobBase
# @option metadata [String] :zip_cmd
# @option metadata [String] :zip_version
def perform(druid, version, part_s3_key, metadata)
apcs.each do |apc|
find_or_create_unreplicated_part(apc, part_s3_key, metadata)
zmvs.each do |zmv|
find_or_create_unreplicated_part(zmv, part_s3_key, metadata)
end
deliverers.each { |worker| worker.perform_later(druid, version, part_s3_key, metadata) }
end

private

# @return [ActiveRecord::Relation] effectively an Array of ArchivePreservedCopy objects
def apcs
@apcs ||= ArchivePreservedCopy.by_druid(zip.druid.id).where(version: zip.version)
# @return [ActiveRecord::Relation] effectively an Array of ZippedMoabVersion objects
def zmvs
@zmvs ||= ZippedMoabVersion.by_druid(zip.druid.id).where(version: zip.version)
end

def find_or_create_unreplicated_part(apc, part_s3_key, metadata)
apc.zip_parts.find_or_create_by(
def find_or_create_unreplicated_part(zmv, part_s3_key, metadata)
zmv.zip_parts.find_or_create_by(
create_info: metadata.slice(:zip_cmd, :zip_version).to_s,
md5: metadata[:checksum_md5],
parts_count: metadata[:parts_count],
Expand All @@ -54,6 +54,6 @@ def find_or_create_unreplicated_part(apc, part_s3_key, metadata)

# @return [Array<Class>] target delivery worker classes
def deliverers
apcs.map { |apc| apc.archive_endpoint.delivery_class }.uniq
zmvs.map { |zmv| zmv.archive_endpoint.delivery_class }.uniq
end
end
20 changes: 10 additions & 10 deletions app/jobs/results_recorder_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,34 @@
# If YES, send a message to a non-job pub/sub queue.
class ResultsRecorderJob < ApplicationJob
queue_as :endpoint_events
attr_accessor :apc, :apcs
attr_accessor :zmv, :zmvs

before_perform do |job|
job.apcs ||= ArchivePreservedCopy
job.zmvs ||= ZippedMoabVersion
.by_druid(job.arguments.first)
.joins(:archive_endpoint)
.where(version: job.arguments.second)
job.apc ||= apcs.find_by!(archive_endpoints: { delivery_class: Object.const_get(job.arguments.fourth) })
job.zmv ||= zmvs.find_by!(archive_endpoints: { delivery_class: Object.const_get(job.arguments.fourth) })
end

# @param [String] druid
# @param [Integer] version
# @param [String] s3_part_key
# @param [String] delivery_class Name of the worker class that performed delivery
def perform(druid, version, s3_part_key, _delivery_class)
part = apc_part!(s3_part_key)
part = zmv_part!(s3_part_key)
part.ok!
apc.ok! if part.all_parts_replicated? # are all of the parts replicated for this endpoint?
zmv.ok! if part.all_parts_replicated? # are all of the parts replicated for this endpoint?
# only publish result if all of the parts replicated for all endpoints
return unless apcs.reload.all?(&:ok?)
return unless zmvs.reload.all?(&:ok?)
publish_result(message(druid, version).to_json)
end

private

def apc_part!(s3_part_key)
raise "Status shifted underneath replication: #{apc.inspect}" unless apc.unreplicated?
apc.zip_parts.find_by!(
def zmv_part!(s3_part_key)
raise "Status shifted underneath replication: #{zmv.inspect}" unless zmv.unreplicated?
zmv.zip_parts.find_by!(
suffix: File.extname(s3_part_key),
status: 'unreplicated'
)
Expand All @@ -46,7 +46,7 @@ def message(druid, version)
{
druid: druid,
version: version,
endpoints: apcs.pluck(:endpoint_name)
endpoints: zmvs.pluck(:endpoint_name)
}
end

Expand Down
6 changes: 3 additions & 3 deletions app/models/archive_endpoint.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Metadata about an endpoint which stores zipped archives of version directories from Moab
# objects.
class ArchiveEndpoint < ApplicationRecord
has_many :archive_preserved_copies, dependent: :restrict_with_exception
has_many :zipped_moab_versions, dependent: :restrict_with_exception
has_and_belongs_to_many :preservation_policies

# @note Hash values cannot be modified without migrating any associated persisted data.
Expand All @@ -23,10 +23,10 @@ class ArchiveEndpoint < ApplicationRecord

# for a given druid, which archive endpoints have an archive copy of the given version?
scope :which_have_archive_copy, lambda { |druid, version|
joins(archive_preserved_copies: [:preserved_object])
joins(zipped_moab_versions: [:preserved_object])
.where(
preserved_objects: { druid: druid },
archive_preserved_copies: { version: version }
zipped_moab_versions: { version: version }
)
}

Expand Down
12 changes: 6 additions & 6 deletions app/models/preserved_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PreservedCopy < ApplicationRecord
belongs_to :preserved_object, inverse_of: :preserved_copies
belongs_to :endpoint, inverse_of: :preserved_copies
has_many :zip_checksums, dependent: :restrict_with_exception
has_many :archive_preserved_copies, dependent: :restrict_with_exception, inverse_of: :preserved_copy
has_many :zipped_moab_versions, dependent: :restrict_with_exception, inverse_of: :preserved_copy

delegate :s3_key, to: :druid_version_zip

Expand Down Expand Up @@ -70,21 +70,21 @@ class PreservedCopy < ApplicationRecord
# to 0 for nulls, which sorts before 1 for non-nulls, which are then sorted by last_checksum_validation)
}

# given a version, create any ArchivePreservedCopy records for that version which don't yet exist for archive
# given a version, create any ZippedMoabVersion records for that version which don't yet exist for archive
# endpoints which implement the parent PreservedObject's PreservationPolicy.
# @param archive_vers [Integer] the version for which archive preserved copies should be created. must be between
# 1 and this PreservedCopy's version (inclusive). Because there's an ArchivePreservedCopy for
# 1 and this PreservedCopy's version (inclusive). Because there's an ZippedMoabVersion for
# each version for each endpoint (whereas there is one PreservedCopy for an entire online Moab).
# @return [Array<ArchivePreservedCopy>] the ArchivePreservedCopy records that were created
def create_archive_preserved_copies!(archive_vers)
# @return [Array<ZippedMoabVersion>] the ZippedMoabVersion records that were created
def create_zipped_moab_versions!(archive_vers)
unless archive_vers > 0 && archive_vers <= version
raise ArgumentError, "archive_vers (#{archive_vers}) must be between 0 and version (#{version})"
end

params = ArchiveEndpoint.which_need_archive_copy(preserved_object.druid, archive_vers).map do |aep|
{ version: archive_vers, archive_endpoint: aep, status: 'unreplicated' }
end
archive_preserved_copies.create!(params)
zipped_moab_versions.create!(params)
end

# Send to asynchronous replication pipeline
Expand Down
8 changes: 4 additions & 4 deletions app/models/zip_part.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
# completely unwieldy file sizes. This represents metadata for one such part.
# This model's data is populated by PlexerJob.
class ZipPart < ApplicationRecord
belongs_to :archive_preserved_copy, inverse_of: :zip_parts
delegate :archive_endpoint, :preserved_copy, to: :archive_preserved_copy
belongs_to :zipped_moab_version, inverse_of: :zip_parts
delegate :archive_endpoint, :preserved_copy, to: :zipped_moab_version
delegate :preserved_object, to: :preserved_copy

enum status: {
'ok' => 0,
'unreplicated' => 1 # DB-level default
}

validates :archive_preserved_copy, :create_info, presence: true
validates :zipped_moab_version, :create_info, presence: true
validates :md5, presence: true, format: { with: /\A[0-9a-f]{32}\z/ }
validates :size, presence: true, numericality: { only_integer: true, greater_than: 0 }
validates :suffix, presence: true, format: { with: /\A\.z(ip|[0-9]+)\z/ }
Expand All @@ -21,7 +21,7 @@ class ZipPart < ApplicationRecord
# @return [Boolean] true if all expected parts are now replicated
def all_parts_replicated?
return false unless persisted? && ok?
parts = archive_preserved_copy.zip_parts.where(suffix: suffixes_in_set)
parts = zipped_moab_version.zip_parts.where(suffix: suffixes_in_set)
parts.count == parts_count && parts.all?(&:ok?)
end

Expand Down
6 changes: 3 additions & 3 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@

create_table "zip_parts", force: :cascade do |t|
t.bigint "size"
t.bigint "archive_preserved_copy_id", null: false
t.bigint "zipped_moab_version_id", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.string "md5", null: false
t.string "create_info", null: false
t.integer "parts_count", null: false
t.string "suffix", null: false
t.integer "status", default: 1, null: false
t.index ["archive_preserved_copy_id"], name: "index_zip_parts_on_archive_preserved_copy_id"
t.index ["zipped_moab_version_id"], name: "index_zip_parts_on_zipped_moab_version_id"
end

create_table "zipped_moab_versions", force: :cascade do |t|
Expand All @@ -136,7 +136,7 @@
add_foreign_key "preserved_copies", "preserved_objects"
add_foreign_key "preserved_objects", "preservation_policies"
add_foreign_key "zip_checksums", "preserved_copies"
add_foreign_key "zip_parts", "zipped_moab_versions", column: "archive_preserved_copy_id"
add_foreign_key "zip_parts", "zipped_moab_versions"
add_foreign_key "zipped_moab_versions", "archive_endpoints"
add_foreign_key "zipped_moab_versions", "preserved_copies"
end
2 changes: 1 addition & 1 deletion spec/factories/zip_parts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
size 1234
status 'unreplicated'
suffix { parts_count == 1 ? '.zip' : format('.z%02d', parts_count) }
archive_preserved_copy
zipped_moab_version
end
end
10 changes: 5 additions & 5 deletions spec/jobs/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
describe 'the whole replication pipeline', type: :job do # rubocop:disable RSpec/DescribeClass
let(:s3_object) { instance_double(Aws::S3::Object, exists?: false, put: true) }
let(:bucket) { instance_double(Aws::S3::Bucket, object: s3_object) }
let(:apc) { create(:archive_preserved_copy) }
let(:druid) { apc.preserved_object.druid }
let(:version) { apc.version }
let(:deliverer) { apc.archive_endpoint.delivery_class.to_s }
let(:zmv) { create(:zipped_moab_version) }
let(:druid) { zmv.preserved_object.druid }
let(:version) { zmv.version }
let(:deliverer) { zmv.archive_endpoint.delivery_class.to_s }
let(:hash) do
{ druid: druid, version: version, endpoints: [apc.archive_endpoint.endpoint_name] }
{ druid: druid, version: version, endpoints: [zmv.archive_endpoint.endpoint_name] }
end
let(:s3_key) { 'bj/102/hs/9687/bj102hs9687.v0001.zip' }

Expand Down
4 changes: 2 additions & 2 deletions spec/jobs/plexer_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
describe '#perform' do
let(:east_ep) { create(:archive_endpoint, delivery_class: 2) }
let(:pc) { create(:preserved_copy, preserved_object: po) }
let!(:apc1) { create(:archive_preserved_copy, preserved_copy: pc, version: version) }
let!(:apc2) { create(:archive_preserved_copy, preserved_copy: pc, version: version, archive_endpoint: east_ep) }
let!(:apc1) { create(:zipped_moab_version, preserved_copy: pc, version: version) }
let!(:apc2) { create(:zipped_moab_version, preserved_copy: pc, version: version, archive_endpoint: east_ep) }
let(:s3_key) { job.zip.s3_key(metadata[:suffix]) }

it 'splits the message out to endpoints' do
Expand Down
26 changes: 13 additions & 13 deletions spec/jobs/results_recorder_job_spec.rb
Original file line number Diff line number Diff line change
@@ -1,50 +1,50 @@
require 'rails_helper'

describe ResultsRecorderJob, type: :job do
let(:apc) { create(:archive_preserved_copy) }
let(:druid) { apc.preserved_object.druid }
let(:endpoint) { apc.archive_endpoint }
let(:zmv) { create(:zipped_moab_version) }
let(:druid) { zmv.preserved_object.druid }
let(:endpoint) { zmv.archive_endpoint }

before { apc.zip_parts.create(attributes_for(:zip_part)) }
before { zmv.zip_parts.create(attributes_for(:zip_part)) }

it 'descends from ApplicationJob' do
expect(described_class.new).to be_an(ApplicationJob)
end

context 'when all parts for endpoint are replicated' do
it 'sets the ArchivePreservedCopy status to ok' do
described_class.perform_now(druid, apc.version, 'fake.zip', endpoint.delivery_class.to_s)
expect(apc.reload).to be_ok
it 'sets the ZippedMoabVersion status to ok' do
described_class.perform_now(druid, zmv.version, 'fake.zip', endpoint.delivery_class.to_s)
expect(zmv.reload).to be_ok
end
it 'sets part status to ok' do
skip 'write test for individual part status'
end
end

context 'when some parts for endpoint are replicated' do
it 'does not set parent archive_preserved_copy status to ok' do
skip 'write test for parent apc status'
it 'does not set parent zipped_moab_version status to ok' do
skip 'write test for parent zmv status'
end
end

context 'when all endpoints are fulfilled' do
it 'posts a message to replication.results queue' do
hash = { druid: druid, version: apc.version, endpoints: [endpoint.endpoint_name] }
hash = { druid: druid, version: zmv.version, endpoints: [endpoint.endpoint_name] }
expect(Resque.redis.redis).to receive(:lpush).with('replication.results', hash.to_json)
described_class.perform_now(druid, apc.version, 'fake.zip', endpoint.delivery_class.to_s)
described_class.perform_now(druid, zmv.version, 'fake.zip', endpoint.delivery_class.to_s)
end
end

context 'when other endpoints remain unreplicated' do
let(:other_ep) { create(:archive_endpoint, delivery_class: 2) }

before do
create(:archive_preserved_copy, preserved_copy: apc.preserved_copy, archive_endpoint: other_ep)
create(:zipped_moab_version, preserved_copy: zmv.preserved_copy, archive_endpoint: other_ep)
end

it 'does not send to replication.results queue' do
expect(Resque.redis.redis).not_to receive(:lpush)
described_class.perform_now(druid, apc.version, 'fake.zip', endpoint.delivery_class.to_s)
described_class.perform_now(druid, zmv.version, 'fake.zip', endpoint.delivery_class.to_s)
end
end
end
14 changes: 7 additions & 7 deletions spec/models/archive_endpoint_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
expect(described_class.delivery_classes).to include(S3WestDeliveryJob, S3EastDeliveryJob)
end

it { is_expected.to have_many(:archive_preserved_copies) }
it { is_expected.to have_many(:zipped_moab_versions) }
it { is_expected.to have_db_index(:endpoint_name) }
# TODO: add indexes
# it { is_expected.to have_db_index(:endpoint_node) }
Expand Down Expand Up @@ -91,7 +91,7 @@
end
end

context 'ArchivePreservedCopy presence on ArchiveEndpoint' do
context 'ZippedMoabVersion presence on ArchiveEndpoint' do
let(:version) { 3 }
let!(:po) { create(:preserved_object, current_version: version, druid: druid) }
let(:pc) { create(:preserved_copy, preserved_object: po) }
Expand All @@ -109,19 +109,19 @@
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version - 1).pluck(:endpoint_name)).to eq []

create(:archive_preserved_copy, preserved_copy: pc, version: version, archive_endpoint: ma1_ep)
create(:zipped_moab_version, preserved_copy: pc, version: version, archive_endpoint: ma1_ep)
expect(ArchiveEndpoint.which_have_archive_copy(druid, version).pluck(:endpoint_name)).to eq %w[mock_archive1]
expect(ArchiveEndpoint.which_have_archive_copy(druid, version - 1).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version - 1).pluck(:endpoint_name)).to eq []

create(:archive_preserved_copy, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: ma1_ep)
create(:zipped_moab_version, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: ma1_ep)
expect(ArchiveEndpoint.which_have_archive_copy(druid, version).pluck(:endpoint_name)).to eq %w[mock_archive1]
expect(ArchiveEndpoint.which_have_archive_copy(druid, version - 1).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version - 1).pluck(:endpoint_name)).to eq %w[mock_archive1]

create(:archive_preserved_copy, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: archive_endpoint)
create(:zipped_moab_version, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: archive_endpoint)
expect(ArchiveEndpoint.which_have_archive_copy(druid, version).pluck(:endpoint_name)).to eq %w[mock_archive1]
expect(ArchiveEndpoint.which_have_archive_copy(druid, version - 1).pluck(:endpoint_name)).to eq []
expect(ArchiveEndpoint.which_have_archive_copy(other_druid, version).pluck(:endpoint_name)).to eq []
Expand All @@ -136,13 +136,13 @@
expect(ArchiveEndpoint.which_need_archive_copy(other_druid, version).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]
expect(ArchiveEndpoint.which_need_archive_copy(other_druid, version - 1).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]

create(:archive_preserved_copy, preserved_copy: pc, version: version, archive_endpoint: ma1_ep)
create(:zipped_moab_version, preserved_copy: pc, version: version, archive_endpoint: ma1_ep)
expect(ArchiveEndpoint.which_need_archive_copy(druid, version).pluck(:endpoint_name)).to eq %w[archive-endpoint]
expect(ArchiveEndpoint.which_need_archive_copy(druid, version - 1).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]
expect(ArchiveEndpoint.which_need_archive_copy(other_druid, version).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]
expect(ArchiveEndpoint.which_need_archive_copy(other_druid, version - 1).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]

create(:archive_preserved_copy, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: ma1_ep)
create(:zipped_moab_version, preserved_copy: pc_other_druid, version: version - 1, archive_endpoint: ma1_ep)
expect(ArchiveEndpoint.which_need_archive_copy(druid, version).pluck(:endpoint_name)).to eq %w[archive-endpoint]
expect(ArchiveEndpoint.which_need_archive_copy(druid, version - 1).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]
expect(ArchiveEndpoint.which_need_archive_copy(other_druid, version).pluck(:endpoint_name).sort).to eq %w[archive-endpoint mock_archive1]
Expand Down

0 comments on commit 0d113f7

Please sign in to comment.