forked from mytestbed/omf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use deferred process to execute incoming messages in resource proxy
- Loading branch information
Jack C Hong
committed
May 24, 2012
1 parent
6e1fc70
commit ba2f343
Showing
1 changed file
with
88 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,172 +1,133 @@ | ||
require 'omf_common' | ||
require 'omf_rc/deferred_process' | ||
require 'securerandom' | ||
require 'hashie' | ||
|
||
class OmfRc::ResourceProxy::AbstractResource | ||
attr_accessor :uid, :type, :properties, :comm | ||
DISCONNECT_WAIT = 5 | ||
attr_accessor :uid, :hrn, :type, :properties, :comm | ||
attr_reader :opts, :children, :host | ||
|
||
def initialize(type, opts = nil, comm = nil) | ||
@opts = Hashie::Mash.new(opts) | ||
@type = type | ||
@uid = @opts.uid || SecureRandom.uuid | ||
@hrn = @opts.hrn | ||
@properties = Hashie::Mash.new(@opts.properties) | ||
@children ||= [] | ||
@comm = comm || OmfCommon::Comm.new(@opts.dsl) | ||
@host = nil | ||
register_default_comm_callbacks | ||
end | ||
|
||
# Custom validation rules, extend this to validation specific properties | ||
def validate | ||
# Definitely need a type | ||
raise StandardError if type.nil? | ||
end | ||
@comm = comm || OmfCommon::Comm.new(@opts.dsl) | ||
# Fire when connection to pubsub server established | ||
@comm.when_ready do | ||
logger.info "CONNECTED: #{@comm.jid.inspect}" | ||
@host = "#{@opts.pubsub_host}.#{@comm.jid.domain}" | ||
|
||
# Release a child resource | ||
def release(resource, &block) | ||
resource.children.each do |child| | ||
resource.release(child) | ||
# Once connection established, create a pubsub node, then subscribe to it | ||
@comm.create_node(uid, host) do |s| | ||
# Creating node failed, no point to continue; clean up and disconnect | ||
# Otherwise go subscribe to this pubsub node | ||
s.error? ? disconnect : @comm.subscribe(uid, host) | ||
end | ||
end | ||
children.delete(resource) | ||
cleanup if self.class.method_defined? :cleanup | ||
block.call if block | ||
end | ||
|
||
def get_all(conditions) | ||
children.find_all do |v| | ||
flag = true | ||
conditions.each_pair do |key, value| | ||
flag &&= v.send(key) == value | ||
# Fire when message published | ||
@comm.node_event do |e| | ||
logger.info e.node | ||
e.items.each do |item| | ||
process_omf_message(item.payload, e.node) | ||
end | ||
flag | ||
end | ||
end | ||
|
||
# Creates a new resource in the context of this resource. | ||
# | ||
# @param [Hash] opts options to create new resource | ||
# @option opts [String] :type Type of resource | ||
# @option opts [Hash] :properties See +configure+ for explanation | ||
def create(type, opts = nil, &block) | ||
new_resource = OmfRc::ResourceFactory.new(type.to_sym, opts, @comm) | ||
children << new_resource | ||
block.call(new_resource) if block | ||
# Generic pubsub event | ||
@comm.pubsub_event do |e| | ||
logger.debug "PUBSUB GENERIC EVENT: #{e}" | ||
end | ||
end | ||
|
||
# Returns a resource instance if already exists, in the context of this resource, throw exception otherwise. | ||
# | ||
# @param [String] resource_uid Resource' global unique identifier | ||
# @return [Object] resource instance | ||
def get(resource_uid) # String => Resource | ||
resource = children.find { |v| v.uid == resource_uid } | ||
raise Exception, "Resource #{resource_uid} not found" if resource.nil? | ||
resource | ||
# Connect to pubsub server | ||
def connect | ||
@comm.connect(opts.user, opts.password, opts.server) | ||
end | ||
|
||
# Returns a set of child resources based on properties and conditions | ||
def request(properties, conditions = {}, &block) | ||
resources = get_all(conditions).map do |resource| | ||
Hashie::Mash.new.tap do |mash| | ||
properties.each do |key| | ||
mash[key] ||= resource.request_property(key) | ||
end | ||
def disconnect | ||
@comm.pubsub.affiliations(host) do |a| | ||
my_pubsub_nodes = a[:owner] ? a[:owner].size : 0 | ||
if my_pubsub_nodes > 0 | ||
logger.info "Cleaning #{my_pubsub_nodes} pubsub node(s)" | ||
a[:owner].each { |node| @comm.delete_node(node, host) } | ||
else | ||
logger.info "Disconnecting now" | ||
@comm.disconnect(host) | ||
end | ||
end | ||
block.call(resources) if block | ||
end | ||
|
||
# Configure this resource. | ||
# | ||
# @param [Hash] properties property configuration key value pair | ||
def configure(properties, &block) | ||
Hashie::Mash.new(properties).each_pair do |key, value| | ||
configure_property(key, value) | ||
EM.add_timer(DISCONNECT_WAIT) do | ||
logger.info "Disconnecting in #{DISCONNECT_WAIT} seconds" | ||
@comm.disconnect(host) | ||
end | ||
block.call if block | ||
end | ||
|
||
def configure_property(property, value) | ||
properties.send("#{property}=", value) | ||
# Create a new resource in the context of this resource. This resource becomes parent, and newly created resource becomes child | ||
# | ||
def create(context_id, type, opts = nil) | ||
new_resource = OmfRc::ResourceFactory.new(type.to_sym, opts, @comm) | ||
children << new_resource | ||
[new_resource, context_id] | ||
end | ||
|
||
def request_property(property) | ||
properties.send(property) | ||
# Release a resource | ||
# | ||
def release | ||
end | ||
|
||
def register_default_comm_callbacks | ||
@comm.when_ready do | ||
logger.info "CONNECTED: #{@comm.jid.inspect}" | ||
@host = "#{opts.pubsub_host}.#{@comm.jid.domain}" | ||
private | ||
|
||
@comm.create_node(uid, host) do |s| | ||
@comm.subscribe(uid, host) | ||
end | ||
end | ||
|
||
# Fired when message published | ||
@comm.node_item_event do |e| | ||
e.items.each do |item| | ||
m = OmfCommon::Message.parse(item.payload) | ||
logger.error "Invalid Message\n#{m.to_xml}" unless m.valid? | ||
context_id = m.read_element("//context_id").first.content | ||
#logger.info "RECEIVED: #{m.operation.to_s} <Context ID> #{context_id}" | ||
|
||
begin | ||
case m.operation | ||
when :create | ||
create_opts = opts.dup | ||
create_opts.uid = nil | ||
|
||
create(m.read_property(:type), create_opts) do |new_resource| | ||
@comm.create_node(new_resource.uid, host) do | ||
@comm.subscribe(new_resource.uid, host) do | ||
inform_msg = OmfCommon::Message.inform(context_id, 'CREATED') do |i| | ||
i.element('resource_id', new_resource.uid) | ||
i.element('resource_address', new_resource.uid) | ||
end.sign | ||
@comm.publish(uid, inform_msg, host) | ||
end | ||
end | ||
end | ||
when :request | ||
request_cpu_model do |result| | ||
result[:success] | ||
inform_msg = OmfCommon::Message.inform(context_id, 'STATUS') do |i| | ||
i.property('cpu_model') do |p| | ||
p.element('current', result[:success]) | ||
end | ||
end.sign | ||
@comm.publish(uid, inform_msg, host) | ||
end | ||
when :configure | ||
@comm.publish(uid, OmfCommon::Message.inform(context_id, 'STATUS').sign, host) | ||
when :relase | ||
@comm.publish(uid, OmfCommon::Message.inform(context_id, 'STATUS').sign, host) | ||
when :inform | ||
# Parse omf message and execute as instructed by the message | ||
# | ||
def process_omf_message(pubsub_item_payload, node) | ||
dp = OmfRc::DeferredProcess.new | ||
|
||
dp.callback do |result| | ||
if result && result[0].class == self.class | ||
context_id = result[1] | ||
@comm.create_node(result[0].uid, host) do | ||
@comm.subscribe(result[0].uid, host) do | ||
inform_msg = OmfCommon::Message.inform(context_id, 'CREATED') do |i| | ||
i.element('resource_id', result[0].uid) | ||
i.element('resource_address', result[0].uid) | ||
end.sign | ||
@comm.publish(uid, inform_msg, host) | ||
end | ||
rescue => e | ||
logger.error "#{e.message}\n#{e.backtrace.join("\n")}" | ||
end | ||
end | ||
end | ||
|
||
# Fired when node created | ||
@comm.node_event do |e| | ||
logger.info "NODES: #{e.items.map(&:id)}" unless e.items.empty? | ||
dp.errback do |e| | ||
logger.error e.message | ||
logger.error e.backtrace.join("\n") | ||
end | ||
|
||
# Generic pubsub event | ||
@comm.pubsub_event do |e| | ||
logger.debug "PUBSUB GENERIC EVENT: #{e}" | ||
dp.fire do | ||
message = OmfCommon::Message.parse(pubsub_item_payload) | ||
# Get the context id, which will be included when informing | ||
context_id = message.read_content("//context_id") | ||
|
||
obj = node == uid ? self : children.find { |v| v.uid == node } | ||
|
||
case message.operation | ||
when :create | ||
create_opts = opts.dup | ||
create_opts.uid = nil | ||
create(context_id, message.read_property(:type), create_opts) | ||
when :request | ||
nil | ||
when :configure | ||
nil | ||
when :release | ||
nil | ||
else | ||
nil | ||
end | ||
end | ||
end | ||
|
||
def connect | ||
@comm.connect(opts.user, opts.password, opts.server) | ||
end | ||
|
||
def disconnect | ||
@comm.disconnect(host) | ||
end | ||
end |