Skip to content

Commit

Permalink
Protect nats client in multi thread environment for base node
Browse files Browse the repository at this point in the history
Change-Id: I6795f10d2e56451a0e53832d8e7b47b46bcf031f
  • Loading branch information
anferneeg committed Nov 3, 2011
1 parent e020356 commit 8fc2a42
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
8 changes: 8 additions & 0 deletions base/lib/base/base.rb
Expand Up @@ -32,6 +32,8 @@ def initialize(options)
@options = options
@local_ip = VCAP.local_ip(options[:ip_route])
@logger.info("#{service_description}: Initializing")
@nats_lock = Mutex.new

NATS.on_error do |e|
if e.kind_of? NATS::ConnectError
@logger.error("EXITING! NATS connection failed: #{e}")
Expand Down Expand Up @@ -67,6 +69,12 @@ def service_description()
return "#{service_name}-#{flavor}"
end

def publish(reply, msg)
@nats_lock.synchronize do
@node_nats.publish(reply, msg)
end
end

def update_varz()
vz = varz_details
vz.each { |k,v|
Expand Down
36 changes: 18 additions & 18 deletions base/lib/base/node.rb
Expand Up @@ -73,10 +73,10 @@ def on_provision(msg, reply)
credential['node_id'] = @node_id
response.credentials = credential
@logger.debug("#{service_description}: Successfully provisioned service for request #{msg}: #{response.inspect}")
@node_nats.publish(reply, encode_success(response))
publish(reply, encode_success(response))
rescue => e
@logger.warn(e)
@node_nats.publish(reply, encode_failure(response, e))
publish(reply, encode_failure(response, e))
end

def on_unprovision(msg, reply)
Expand All @@ -87,13 +87,13 @@ def on_unprovision(msg, reply)
bindings = unprovision_req.bindings
result = unprovision(name, bindings)
if result
@node_nats.publish(reply, encode_success(response))
publish(reply, encode_success(response))
else
@node_nats.publish(reply, encode_failure(response))
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn(e)
@node_nats.publish(reply, encode_failure(response, e))
publish(reply, encode_failure(response, e))
end

def on_bind(msg, reply)
Expand All @@ -104,10 +104,10 @@ def on_bind(msg, reply)
bind_opts = bind_message.bind_opts
credentials = bind_message.credentials
response.credentials = bind(name, bind_opts, credentials)
@node_nats.publish(reply, encode_success(response))
publish(reply, encode_success(response))
rescue => e
@logger.warn(e)
@node_nats.publish(reply, encode_failure(response, e))
publish(reply, encode_failure(response, e))
end

def on_unbind(msg, reply)
Expand All @@ -116,13 +116,13 @@ def on_unbind(msg, reply)
unbind_req = UnbindRequest.decode(msg)
result = unbind(unbind_req.credentials)
if result
@node_nats.publish(reply, encode_success(response))
publish(reply, encode_success(response))
else
@node_nats.publish(reply, encode_failure(response))
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn(e)
@node_nats.publish(reply, encode_failure(response, e))
publish(reply, encode_failure(response, e))
end

def on_restore(msg, reply)
Expand All @@ -133,13 +133,13 @@ def on_restore(msg, reply)
backup_path = restore_message.backup_path
result = restore(instance_id, backup_path)
if result
@node_nats.publish(reply, encode_success(response))
publish(reply, encode_success(response))
else
@node_nats.publish(reply, encode_failure(response))
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn(e)
@node_nats.publish(reply, encode_failure(response, e))
publish(reply, encode_failure(response, e))
end

# disable and dump instance
Expand All @@ -152,7 +152,7 @@ def on_disable_instance(msg, reply)
FileUtils.mkdir_p(file_path)
result = disable_instance(prov_cred, binding_creds)
result = dump_instance(prov_cred, binding_creds, file_path) if result
@node_nats.publish(reply, Yajl::Encoder.encode(result))
publish(reply, Yajl::Encoder.encode(result))
rescue => e
@logger.warn(e)
end
Expand All @@ -167,7 +167,7 @@ def on_enable_instance(msg, reply)
prov_cred, binding_creds_hash = result
prov_cred['node_id'] = @node_id
result = [prov_cred, binding_creds_hash]
@node_nats.publish(reply, Yajl::Encoder.encode(result))
publish(reply, Yajl::Encoder.encode(result))
rescue => e
@logger.warn(e)
end
Expand All @@ -179,7 +179,7 @@ def on_cleanup_nfs(msg, reply)
prov_cred, binding_creds = request
instance = prov_cred['name']
FileUtils.rm_rf(get_migration_folder(instance))
@node_nats.publish(reply, Yajl::Encoder.encode(true))
publish(reply, Yajl::Encoder.encode(true))
rescue => e
@logger.warn(e)
end
Expand Down Expand Up @@ -259,7 +259,7 @@ def on_import_instance(msg, reply)
instance = prov_cred['name']
file_path = get_migration_folder(instance)
result = import_instance(prov_cred, binding_creds, file_path, plan)
@node_nats.publish(reply, Yajl::Encoder.encode(result))
publish(reply, Yajl::Encoder.encode(result))
rescue => e
@logger.warn(e)
end
Expand All @@ -279,7 +279,7 @@ def send_node_announcement(reply = nil)
@logger.debug("#{service_description}: Sending announcement for #{reply || "everyone"}")
a = announcement
a[:id] = @node_id
@node_nats.publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a))
publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a))
rescue
@logger.warn(e)
end
Expand Down

0 comments on commit 8fc2a42

Please sign in to comment.