Skip to content

Commit

Permalink
Merge 2689fd1 into e67c120
Browse files Browse the repository at this point in the history
  • Loading branch information
SaravShah committed Jul 17, 2018
2 parents e67c120 + 2689fd1 commit 7594d0d
Show file tree
Hide file tree
Showing 27 changed files with 261 additions and 254 deletions.
4 changes: 2 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Metrics/LineLength:
- 'spec/lib/audit/moab_to_catalog_spec.rb' # 1 line 126
- 'spec/lib/audit/checksum_spec.rb'
- 'spec/models/endpoint_spec.rb' # line 83 is 123
- 'spec/models/archive_endpoint_spec.rb' # tests about have/need scopes are easier to skim with all expectations being one line
- 'spec/models/zip_endpoint_spec.rb' # tests about have/need scopes are easier to skim with all expectations being one line
- 'spec/services/audit_results_spec.rb'
- 'spec/services/checksum_validator_spec.rb'
- 'spec/services/preserved_object_handler_check_exist_spec.rb' # 17 lines, but who's counting, officer?
Expand Down Expand Up @@ -175,7 +175,7 @@ RSpec/MessageSpies:
RSpec/MultipleExpectations:
Max: 5
Exclude:
- 'spec/models/archive_endpoint_spec.rb' # testing queries by adding records and scope results based on different input combos
- 'spec/models/zip_endpoint_spec.rb' # testing queries by adding records and scope results based on different input combos

RSpec/NamedSubject:
Enabled: false
Expand Down
4 changes: 2 additions & 2 deletions app/jobs/plexer_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# Responsibilities:
# Record zip part metadata info in DB.
# Split message out to all necessary endpoints.
# Split message out to all necessary zip endpoints.
# For example:
# Endpoint1Delivery.perform_later(druid, version, part_s3_key)
# Endpoint2Delivery.perform_later(druid, version, part_s3_key)
Expand Down Expand Up @@ -54,6 +54,6 @@ def find_or_create_unreplicated_part(apc, part_s3_key, metadata)

# @return [Array<Class>] target delivery worker classes
def deliverers
apcs.map { |apc| apc.archive_endpoint.delivery_class }.uniq
apcs.map { |apc| apc.zip_endpoint.delivery_class }.uniq
end
end
4 changes: 2 additions & 2 deletions app/jobs/replicated_file_check_job.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Confirms existence of PreservedCopy on an endpoint.
# Confirms existence of PreservedCopy on a zip endpoint.
# Confirms the MD5 checksum matches in database and s3.
# Usage info:
# ReplicatedFileCheck.set(queue: :endpoint_check_us_west_2).perform_later(pc)
Expand All @@ -8,7 +8,7 @@ class ReplicatedFileCheckJob < ApplicationJob
queue_as :override_this_queue
delegate :bucket, :bucket_name, to: PreservationCatalog::S3

# @param [PreservedCopy] verify that the archived preserved_copy exists on an endpoint
# @param [PreservedCopy] verify that the archived preserved_copy exists on a zip endpoint
def perform(preserved_copy)
if preserved_copy.unreplicated?
Rails.logger.error("#{preserved_copy} should be replicated, but has a status of #{preserved_copy.status}.")
Expand Down
12 changes: 6 additions & 6 deletions app/jobs/results_recorder_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
# If NO, do nothing further.
# If YES, send a message to a non-job pub/sub queue.
class ResultsRecorderJob < ApplicationJob
queue_as :endpoint_events
queue_as :zip_endpoint_events
attr_accessor :apc, :apcs

before_perform do |job|
job.apcs ||= ArchivePreservedCopy
.by_druid(job.arguments.first)
.joins(:archive_endpoint)
.joins(:zip_endpoint)
.where(version: job.arguments.second)
job.apc ||= apcs.find_by!(archive_endpoints: { delivery_class: Object.const_get(job.arguments.fourth) })
job.apc ||= apcs.find_by!(zip_endpoints: { delivery_class: Object.const_get(job.arguments.fourth) })
end

# @param [String] druid
Expand All @@ -25,8 +25,8 @@ class ResultsRecorderJob < ApplicationJob
def perform(druid, version, s3_part_key, _delivery_class)
part = apc_part!(s3_part_key)
part.ok!
apc.ok! if part.all_parts_replicated? # are all of the parts replicated for this endpoint?
# only publish result if all of the parts replicated for all endpoints
apc.ok! if part.all_parts_replicated? # are all of the parts replicated for this zip_endpoint?
# only publish result if all of the parts replicated for all zip_endpoints
return unless apcs.reload.all?(&:ok?)
publish_result(message(druid, version).to_json)
end
Expand All @@ -46,7 +46,7 @@ def message(druid, version)
{
druid: druid,
version: version,
endpoints: apcs.pluck(:endpoint_name)
zip_endpoints: apcs.pluck(:endpoint_name)
}
end

Expand Down
8 changes: 4 additions & 4 deletions app/models/archive_preserved_copy.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Corresponds to a Moab-Version on an ArchiveEndpoint.
# Corresponds to a Moab-Version on an ZipEndpoint.
# There will be individual parts (at least one) - see ArchivepreservedCopyPart.
# For a fully consistent system, given an (Online) PreservedCopy, the number of associated
# ArchivePreservedCopy objects should be:
# pc.preserved_object.current_version * number_of_archive_endpoints
# pc.preserved_object.current_version * number_of_zip_endpoints
#
# @note Does not have size independent of part(s)
class ArchivePreservedCopy < ApplicationRecord
belongs_to :preserved_copy
belongs_to :archive_endpoint
belongs_to :zip_endpoint
has_many :archive_preserved_copy_parts, dependent: :destroy, inverse_of: :archive_preserved_copy
has_one :preserved_object, through: :preserved_copy, dependent: :restrict_with_exception
delegate :preserved_object, to: :preserved_copy
Expand All @@ -21,7 +21,7 @@ class ArchivePreservedCopy < ApplicationRecord
'invalid_checksum' => 3
}

validates :archive_endpoint, presence: true
validates :zip_endpoint, presence: true
validates :preserved_copy, presence: true
validates :status, inclusion: { in: statuses.keys }
validates :version, presence: true
Expand Down
4 changes: 2 additions & 2 deletions app/models/archive_preserved_copy_part.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This model's data is populated by PlexerJob.
class ArchivePreservedCopyPart < ApplicationRecord
belongs_to :archive_preserved_copy, inverse_of: :archive_preserved_copy_parts
delegate :archive_endpoint, :preserved_copy, to: :archive_preserved_copy
delegate :zip_endpoint, :preserved_copy, to: :archive_preserved_copy
delegate :preserved_object, to: :preserved_copy

enum status: {
Expand All @@ -17,7 +17,7 @@ class ArchivePreservedCopyPart < ApplicationRecord
validates :suffix, presence: true, format: { with: /\A\.z(ip|[0-9]+)\z/ }
validates :parts_count, presence: true, numericality: { only_integer: true, greater_than: 0 }

# For this persisted part, are it and all its cohort now replicated (to one endpoint)?
# For this persisted part, are it and all its cohort now replicated (to one zip_endpoint)?
# @return [Boolean] true if all expected parts are now replicated
def all_parts_replicated?
return false unless persisted? && ok?
Expand Down
2 changes: 1 addition & 1 deletion app/models/preservation_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class PreservationPolicy < ApplicationRecord
# NOTE: The time to live (ttl) fields stored in PreservationPolicy are Integer measurements in seconds
has_many :preserved_objects, dependent: :restrict_with_exception
has_and_belongs_to_many :endpoints
has_and_belongs_to_many :archive_endpoints
has_and_belongs_to_many :zip_endpoints

validates :preservation_policy_name, presence: true, uniqueness: true
validates :archive_ttl, presence: true, numericality: { only_integer: true, greater_than: 0 }
Expand Down
4 changes: 2 additions & 2 deletions app/models/preserved_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def create_archive_preserved_copies!(archive_vers)
raise ArgumentError, "archive_vers (#{archive_vers}) must be between 0 and version (#{version})"
end

params = ArchiveEndpoint.which_need_archive_copy(preserved_object.druid, archive_vers).map do |aep|
{ version: archive_vers, archive_endpoint: aep, status: 'unreplicated' }
params = ZipEndpoint.which_need_archive_copy(preserved_object.druid, archive_vers).map do |zep|
{ version: archive_vers, zip_endpoint: zep, status: 'unreplicated' }
end
archive_preserved_copies.create!(params)
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/preserved_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PreservedObject < ApplicationRecord
# Log an error message.
# Calls ReplicatedFileCheckJob
# This builds off of #917
def check_endpoints!
def check_zip_endpoints!
# FIXME: STUB
# Ticket: 920
end
Expand Down
36 changes: 18 additions & 18 deletions app/models/archive_endpoint.rb → app/models/zip_endpoint.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Metadata about an endpoint which stores zipped archives of version directories from Moab
# Metadata about a zip endpoint which stores zipped archives of version directories from Moab
# objects.
class ArchiveEndpoint < ApplicationRecord
class ZipEndpoint < ApplicationRecord
has_many :archive_preserved_copies, dependent: :restrict_with_exception
has_and_belongs_to_many :preservation_policies

Expand All @@ -16,12 +16,12 @@ class ArchiveEndpoint < ApplicationRecord
# TODO: after switching to string, validate that input resolves to class which #is_a class of the right type?
validates :delivery_class, presence: true

# for the given druid, which archive endpoints should have archive copies, as per the preservation_policy?
scope :archive_targets, lambda { |druid|
# for the given druid, which zip endpoints should have archive copies, as per the preservation_policy?
scope :zip_targets, lambda { |druid|
joins(preservation_policies: [:preserved_objects]).where(preserved_objects: { druid: druid })
}

# for a given druid, which archive endpoints have an archive copy of the given version?
# for a given druid, which zip endpoints have an archive copy of the given version?
scope :which_have_archive_copy, lambda { |druid, version|
joins(archive_preserved_copies: [:preserved_object])
.where(
Expand All @@ -30,27 +30,27 @@ class ArchiveEndpoint < ApplicationRecord
)
}

# for a given version of a druid, which archive endpoints need an archive copy, based on the governing pres policy?
# for a given version of a druid, which zip endpoints need an archive copy, based on the governing pres policy?
scope :which_need_archive_copy, lambda { |druid, version|
archive_targets(druid).where.not(id: which_have_archive_copy(druid, version))
zip_targets(druid).where.not(id: which_have_archive_copy(druid, version))
}

# iterates over the archive endpoints enumerated in settings, creating an ArchiveEndpoint for each if one doesn't
# iterates over the zip endpoints enumerated in settings, creating a ZipEndpoint for each if one doesn't
# already exist.
# @param preservation_policies [Enumerable<PreservationPolicy>] the list of preservation policies
# which the newly created endpoints implement.
# @return [Array<ArchiveEndpoint>] the ArchiveEndpoint list for the archive endpoints defined in the config (all
# which the newly created zip endpoints implement.
# @return [Array<ZipEndpoint>] the ZipEndpoint list for the zip endpoints defined in the config (all
# entries, including any entries that may have been seeded already)
# @note this adds new entries from the config, and leaves existing entries alone, but won't delete anything.
# TODO: figure out deletion/update based on config?
def self.seed_archive_endpoints_from_config(preservation_policies)
return unless Settings.archive_endpoints
Settings.archive_endpoints.map do |endpoint_name, endpoint_config|
find_or_create_by!(endpoint_name: endpoint_name.to_s) do |endpoint|
endpoint.endpoint_node = endpoint_config.endpoint_node
endpoint.storage_location = endpoint_config.storage_location
endpoint.preservation_policies = preservation_policies
endpoint.delivery_class = delivery_classes[endpoint_config.delivery_class.constantize]
def self.seed_zip_endpoints_from_config(preservation_policies)
return unless Settings.zip_endpoints
Settings.zip_endpoints.map do |endpoint_name, endpoint_config|
find_or_create_by!(endpoint_name: endpoint_name.to_s) do |zip_endpoint|
zip_endpoint.endpoint_node = endpoint_config.endpoint_node
zip_endpoint.storage_location = endpoint_config.storage_location
zip_endpoint.preservation_policies = preservation_policies
zip_endpoint.delivery_class = delivery_classes[endpoint_config.delivery_class.constantize]
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion config/initializers/okcomputer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def check
workflows_url = "#{Settings.workflow_services_url}sdr/objects/druid:oo000oo0000/workflows"
OkComputer::Registry.register "external-workflow-services-url", OkComputer::HttpCheck.new(workflows_url)

# Replication (only) uses zip_storage directory to build the zips to send to endpoints
# Replication (only) uses zip_storage directory to build the zips to send to zip endpoints
OkComputer::Registry.register "feature-zip_storage_dir", OkComputer::DirectoryCheck.new(Settings.zip_storage)

# check PreservedCopy#last_version_audit to ensure it isn't too old
Expand Down
2 changes: 1 addition & 1 deletion config/resque-pool.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
# These are queue names (like would be passed as QUEUES=... rake resque:work)
# and the count of workers that should be spun up to service them.
checksum_validation: 3
endpoint_events: 4
zip_endpoint_events: 4
"s3_us_east_1_delivery,s3_us_west_2_delivery": 6
"zipmaker,zips_made": 4
2 changes: 1 addition & 1 deletion config/settings/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ storage_root_map:
preservation_catalog_prod_a:
fixture_sr1: 'spec/fixtures/storage_root01'
fixture_sr2: 'spec/fixtures/storage_root02'
archive_endpoints:
zip_endpoints:
mock_archive1:
endpoint_node: 'localhost'
storage_location: 'bucket_name'
Expand Down
2 changes: 1 addition & 1 deletion config/settings/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ storage_root_map:
preservation_catalog_prod_a:
fixture_sr1: 'spec/fixtures/storage_root01'
fixture_sr2: 'spec/fixtures/storage_root02'
archive_endpoints:
zip_endpoints:
mock_archive1:
endpoint_node: 'localhost'
storage_location: 'bucket_name'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
class RenameArchiveEndpointToZipEndpoint < ActiveRecord::Migration[5.1]
def change
# removed index and added index back, to keep the index names consistent with the updated table names.
remove_index :archive_endpoints_preservation_policies, name: "index_archive_endpoints_pres_policies_on_pres_policy_id"
remove_index :archive_endpoints_preservation_policies, name: "index_archive_endpoints_pres_policies_on_archive_endpoint_id"
rename_table :archive_endpoints, :zip_endpoints
rename_column :archive_preserved_copies, :archive_endpoint_id, :zip_endpoint_id
rename_table :archive_endpoints_preservation_policies, :preservation_policies_zip_endpoints
rename_column :preservation_policies_zip_endpoints, :archive_endpoint_id, :zip_endpoint_id
add_index :preservation_policies_zip_endpoints, :zip_endpoint_id, name: "index_pres_policies_zip_endpoints_on_zip_endpoint_id"
add_index :preservation_policies_zip_endpoints, :preservation_policy_id, name: "index_pres_policies_zip_endpoints_on_pres_policy_id"
end
end
47 changes: 23 additions & 24 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,22 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 20180711204251) do

ActiveRecord::Schema.define(version: 20180712203748) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

create_table "archive_endpoints", force: :cascade do |t|
t.string "endpoint_name", null: false
t.integer "delivery_class", null: false
t.string "endpoint_node"
t.string "storage_location"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["endpoint_name"], name: "index_archive_endpoints_on_endpoint_name", unique: true
end

create_table "archive_endpoints_preservation_policies", force: :cascade do |t|
t.bigint "preservation_policy_id", null: false
t.bigint "archive_endpoint_id", null: false
t.index ["archive_endpoint_id"], name: "index_archive_endpoints_pres_policies_on_archive_endpoint_id"
t.index ["preservation_policy_id"], name: "index_archive_endpoints_pres_policies_on_pres_policy_id"
end

create_table "archive_preserved_copies", force: :cascade do |t|
t.integer "version", null: false
t.datetime "last_existence_check"
t.bigint "preserved_copy_id", null: false
t.bigint "archive_endpoint_id", null: false
t.bigint "zip_endpoint_id", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "status", null: false
t.index ["archive_endpoint_id"], name: "index_archive_preserved_copies_on_archive_endpoint_id"
t.index ["last_existence_check"], name: "index_archive_preserved_copies_on_last_existence_check"
t.index ["preserved_copy_id"], name: "index_archive_preserved_copies_on_preserved_copy_id"
t.index ["status"], name: "index_archive_preserved_copies_on_status"
t.index ["zip_endpoint_id"], name: "index_archive_preserved_copies_on_zip_endpoint_id"
end

create_table "archive_preserved_copy_parts", force: :cascade do |t|
Expand Down Expand Up @@ -85,6 +67,13 @@
t.index ["preservation_policy_name"], name: "index_preservation_policies_on_preservation_policy_name", unique: true
end

create_table "preservation_policies_zip_endpoints", force: :cascade do |t|
t.bigint "preservation_policy_id", null: false
t.bigint "zip_endpoint_id", null: false
t.index ["preservation_policy_id"], name: "index_pres_policies_zip_endpoints_on_pres_policy_id"
t.index ["zip_endpoint_id"], name: "index_pres_policies_zip_endpoints_on_zip_endpoint_id"
end

create_table "preserved_copies", force: :cascade do |t|
t.integer "version", null: false
t.bigint "preserved_object_id", null: false
Expand Down Expand Up @@ -119,13 +108,23 @@
t.index ["updated_at"], name: "index_preserved_objects_on_updated_at"
end

add_foreign_key "archive_endpoints_preservation_policies", "archive_endpoints"
add_foreign_key "archive_endpoints_preservation_policies", "preservation_policies"
add_foreign_key "archive_preserved_copies", "archive_endpoints"
create_table "zip_endpoints", force: :cascade do |t|
t.string "endpoint_name", null: false
t.integer "delivery_class", null: false
t.string "endpoint_node"
t.string "storage_location"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["endpoint_name"], name: "index_zip_endpoints_on_endpoint_name", unique: true
end

add_foreign_key "archive_preserved_copies", "preserved_copies"
add_foreign_key "archive_preserved_copies", "zip_endpoints"
add_foreign_key "archive_preserved_copy_parts", "archive_preserved_copies"
add_foreign_key "endpoints_preservation_policies", "endpoints"
add_foreign_key "endpoints_preservation_policies", "preservation_policies"
add_foreign_key "preservation_policies_zip_endpoints", "preservation_policies"
add_foreign_key "preservation_policies_zip_endpoints", "zip_endpoints"
add_foreign_key "preserved_copies", "endpoints"
add_foreign_key "preserved_copies", "preserved_objects"
add_foreign_key "preserved_objects", "preservation_policies"
Expand Down
4 changes: 2 additions & 2 deletions db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
ApplicationRecord.transaction(isolation: :serializable) do
PreservationPolicy.seed_from_config
Endpoint.seed_storage_root_endpoints_from_config([PreservationPolicy.default_policy])
ArchiveEndpoint.seed_archive_endpoints_from_config([PreservationPolicy.default_policy])
ZipEndpoint.seed_zip_endpoints_from_config([PreservationPolicy.default_policy])
end

puts "seeded database. state of seeded object types after seeding:"
puts "> PreservationPolicy.all: #{PreservationPolicy.all.to_a}"
puts "> Endpoint.all: #{Endpoint.all.to_a}"
puts "> ArchiveEndpoint.all: #{ArchiveEndpoint.all.to_a}"
puts "> ZipEndpoint.all: #{ZipEndpoint.all.to_a}"
2 changes: 1 addition & 1 deletion spec/factories/archive_preserved_copies.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
factory :archive_preserved_copy do
status 'unreplicated'
version 1
archive_endpoint
zip_endpoint
preserved_copy
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FactoryBot.define do
factory :archive_endpoint do
factory :zip_endpoint do
sequence(:endpoint_name) { |n| "endpoint#{format('%02d', n)}" }
sequence(:endpoint_node) { |n| "us-west-#{format('%02d', n)}" }
storage_location 'bucket_name'
Expand Down

0 comments on commit 7594d0d

Please sign in to comment.