Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Commit

Permalink
WIP on bag replication - extract class for BagReplication and related…
Browse files Browse the repository at this point in the history
… refactoring
  • Loading branch information
dazza-codes committed Aug 10, 2016
1 parent 4feb97a commit 984ed21
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 197 deletions.
53 changes: 13 additions & 40 deletions lib/dpn/workers/bag_paths.rb
Original file line number Diff line number Diff line change
@@ -1,60 +1,33 @@
# This class adapts state and logic from models and workers from dpn-server,e.g.
# https://github.com/dpn-admin/dpn-server/tree/master/app/jobs/bag_man
# https://github.com/dpn-admin/dpn-server/blob/master/app/models/bag_man_request.rb
# The code in dpn-server is governed by the following copyright notice:
# Copyright (c) 2015 The Regents of the University of Michigan.
# All Rights Reserved.
# Licensed according to the terms of the Revised BSD License
# See LICENSE.md for details.

require 'forwardable'
require 'pairtree'

module DPN
module Workers
##
# BagReplicatorPaths manages file system paths used in DPN bag replication
# BagPaths manages file system paths used in DPN bag replication
class BagPaths
extend Forwardable

attr_reader :paths
attr_reader :replication

# Provide accessors for SyncSettings.replication parameters
def_delegators :paths, :staging_dir, :storage_dir, :ssh_identity_file
def_delegators :settings, :staging_dir, :storage_dir, :ssh_identity_file

# @param [Hash] replication transfer resource
# @raise RuntimeError
def initialize(replication)
@replication = OpenStruct.new(replication)
settings = SyncSettings.replication
raise 'Cannot access staging directory' unless File.writable? settings.staging_dir
raise 'Cannot access storage directory' unless File.writable? settings.storage_dir
@paths = OpenStruct.new(settings)
end

def retrieved
paths.retrieved ||= begin
path = File.join staging, File.basename(replication.link)
raise "Failed to retrieve" unless File.exist? path
path
end
def initialize
@settings = SyncSettings.replication
raise 'Cannot access staging directory' unless File.writable? @settings.staging_dir
raise 'Cannot access storage directory' unless File.writable? @settings.storage_dir
end

def staging
paths.staging ||= begin
destination = File.join staging_dir, replication.replication_id
FileUtils.mkdir_p(destination) unless File.exist? destination
destination
end
def staging(location)
destination = File.join staging_dir, location
FileUtils.mkdir_p(destination) unless File.exist? destination
destination
end

def storage(location)
paths.storage ||= begin
pairtree = ::Pairtree.at(storage_dir, create: true)
ppath = pairtree.mk(location)
ppath.path
end
pairtree = ::Pairtree.at(storage_dir, create: true)
ppath = pairtree.mk(location)
ppath.path
end
end
end
Expand Down
243 changes: 243 additions & 0 deletions lib/dpn/workers/bag_replication.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# This class adapts some state and logic from dpn-server, i.e.
# https://github.com/dpn-admin/dpn-server
# The code in dpn-server is governed by the following copyright notice:
# Copyright (c) 2015 The Regents of the University of Michigan.
# All Rights Reserved.
# Licensed according to the terms of the Revised BSD License
# See LICENSE.md for details.

require 'rsync'

module DPN
module Workers
# Ignore :reek:Attribute :reek:TooManyInstanceVariables :reek:TooManyMethods

##
# A Bag Replication-Transfer-Resource
# @see https://github.com/dpn-admin/DPN-REST-Wiki/blob/master/Replication-Transfer-Resource.md
# @see https://wiki.duraspace.org/display/DPNC/BagIt+Specification
class BagReplication

attr_reader :status

# @param [Hash] replication transfer resource
def initialize(replication)
create_attributes(replication)
@_paths = DPN::Workers::BagPaths.new
end

# Replication Request ID - the unique (UUIDv4) identifier for this replication request
# @return [String] replication_id
def id
@replication_id
end

# Replication Bag ID - the unique (UUIDv4) identifier for the DPN bag
# @return [String] bag_id
def bag_id
# The DPN REST spec uses 'bag' but some implementations use 'uuid', so
# provide a fall-back to 'uuid' for compatibility.
@bag || @uuid # or use `baggit.id`?
end

# @return [Boolean] success of preservation
def preserve
preserve_rsync && preserve_validate
update 'stored'
end

# @return [Boolean] success of retrieval
def retrieve
retrieve_rsync && retrieve_validate && retrieve_fixity
update 'received'
end

# @return [Hash]
def to_h
attributes.map do |var|
key = var.to_s.delete('@').to_sym
[key, instance_variable_get(var)]
end.to_h
end

private

attr_reader :bag
attr_accessor :bag_valid

attr_reader :replication_id,
:from_node,
:to_node,
:protocol,
:link,
:created_at, :updated_at

attr_reader :fixity_accept, :fixity_algorithm, :fixity_nonce
attr_accessor :fixity_value

attr_writer :status

def baggit
@_baggit
end

def paths
@_paths
end

# @return [Array] instance variable symbols
def attributes
instance_variables.select { |var| !var.to_s.start_with?('@_') }
end

# @param [Hash] opts
def create_attributes(opts)
opts.each { |key, val| instance_variable_set("@#{key}", val) }
end

# Replication file - the basename for the replication link
# @return [String] file
def file
@_file ||= File.basename(link)
end

# @return [Boolean] success of rsync transfer
def preserve_rsync
options = [
'--copy-dirlinks',
'--copy-unsafe-links',
'--partial',
'--quiet',
'--recursive'
].join(' ')
Rsync.run(baggit.location, storage_path, options) do |result|
raise "Failed to preserve: #{result.error}" unless result.success?
end
true
end

# @return [Boolean] validity of preserved bag
def preserve_validate
@_baggit = DPN::Bagit::Bag.new(storage_path)
validate
# TODO: cleanup the staging path?
end

# Administrative node that issued the replication transfer request
# @return [DPN::Workers::Node] remote node
def remote_node
@_remote_node ||= begin
DPN::Workers.nodes.remote_node from_node
end
end

# Calculate bag fixity to set the replication fixity_value
# and verify the fixity with the admin node
# @return [Boolean] fixity accepted
def retrieve_fixity
@_fixity ||= begin
return false unless baggit
@fixity_value = baggit.fixity(:sha256)
update
raise 'Admin node did not accept fixity' unless fixity_accept
fixity_accept
end
end

# @return [String]
def retrieve_path
@_retrieve_path ||= begin
path = File.join staging_path, file
raise "Failed to retrieve" unless File.exist? path
path
end
end

# @return [Boolean] success of rsync transfer
def retrieve_rsync
options = [
'--archive',
'--copy-dirlinks',
'--copy-unsafe-links',
'--partial',
'--quiet',
retrieve_ssh
].join(' ')
Rsync.run(link, staging_path, options) do |result|
raise "Failed to retrieve: #{result.error}" unless result.success?
end
true
end

# Construct an ssh command for rsync, if an ssh identity file is
# provided in the SyncSettings.replication configuration.
# @return [String] ssh command
def retrieve_ssh
@_retrieve_ssh ||= begin
ssh_id = paths.ssh_identity_file
return '' unless ssh_id
ssh_cmd = [
'ssh',
'-o PasswordAuthentication=no',
'-o UserKnownHostsFile=/dev/null',
'-o StrictHostKeyChecking=no',
"-i #{ssh_id}"
].join(' ')
"-e '#{ssh_cmd}'"
end
end

# @return [Boolean] success of unpacking the replication .tar archive
def retrieve_validate
if File.directory?(retrieve_path)
@_baggit = DPN::Bagit::Bag.new(retrieve_path)
else
case File.extname retrieve_path
when ".tar"
serialized_bag = DPN::Bagit::SerializedBag.new(retrieve_path)
@_baggit = serialized_bag.unserialize!
else
raise "Could not unpack file type"
end
end
validate
end

# @return [String] staging_path
def staging_path
@_staging ||= paths.staging(replication_id)
end

# @return [String] storage_path
def storage_path
@_storage ||= paths.storage(bag_id)
end

# Update the replication transfer resource status on the remote node
# @param [String|nil] status of replication transfer
# 'requested' - set by the from_node to indicate the bag is staged for transfer and awaiting response from to_node.
# 'rejected' - set by the to_node to indicate it will not perform the transfer.
# 'received' - set by the to_node to indicate it has performed the transfer.
# 'confirmed' - set by the from_node after it receives all data to confirm a good transfer.
# 'stored' - set by the to_node to indicate the bag has been transferred into its storage repository from the staging area. The to_node promises to fulfill replicating node duties by setting this status.
# 'cancelled' - set by either node to indicate the transfer was cancelled.
# @return [Boolean]
def update(status = nil)
@status = status if status
response = remote_node.client.update_replication(to_h)
success = response.success?
# TODO: check the response.body can be assigned OK here
create_attributes(response.body) if success
success
end

# Validate the DPN bag
# @return [Boolean] valid
def validate
@bag_valid = baggit.valid?
raise "Bag invalid: #{baggit.errors}" unless bag_valid
bag_valid
end
end
end
end
Loading

0 comments on commit 984ed21

Please sign in to comment.