diff --git a/base/lib/base/base.rb b/base/lib/base/base.rb index 84abf69b..1b24a68b 100644 --- a/base/lib/base/base.rb +++ b/base/lib/base/base.rb @@ -32,6 +32,8 @@ def initialize(options) @options = options @local_ip = VCAP.local_ip(options[:ip_route]) @logger.info("#{service_description}: Initializing") + @nats_lock = Mutex.new + NATS.on_error do |e| if e.kind_of? NATS::ConnectError @logger.error("EXITING! NATS connection failed: #{e}") @@ -67,6 +69,12 @@ def service_description() return "#{service_name}-#{flavor}" end + def publish(reply, msg) + @nats_lock.synchronize do + @node_nats.publish(reply, msg) + end + end + def update_varz() vz = varz_details vz.each { |k,v| diff --git a/base/lib/base/node.rb b/base/lib/base/node.rb index 652857f7..b85820df 100644 --- a/base/lib/base/node.rb +++ b/base/lib/base/node.rb @@ -73,10 +73,10 @@ def on_provision(msg, reply) credential['node_id'] = @node_id response.credentials = credential @logger.debug("#{service_description}: Successfully provisioned service for request #{msg}: #{response.inspect}") - @node_nats.publish(reply, encode_success(response)) + publish(reply, encode_success(response)) rescue => e @logger.warn(e) - @node_nats.publish(reply, encode_failure(response, e)) + publish(reply, encode_failure(response, e)) end def on_unprovision(msg, reply) @@ -87,13 +87,13 @@ def on_unprovision(msg, reply) bindings = unprovision_req.bindings result = unprovision(name, bindings) if result - @node_nats.publish(reply, encode_success(response)) + publish(reply, encode_success(response)) else - @node_nats.publish(reply, encode_failure(response)) + publish(reply, encode_failure(response)) end rescue => e @logger.warn(e) - @node_nats.publish(reply, encode_failure(response, e)) + publish(reply, encode_failure(response, e)) end def on_bind(msg, reply) @@ -104,10 +104,10 @@ def on_bind(msg, reply) bind_opts = bind_message.bind_opts credentials = bind_message.credentials response.credentials = bind(name, bind_opts, credentials) - @node_nats.publish(reply, encode_success(response)) + publish(reply, encode_success(response)) rescue => e @logger.warn(e) - @node_nats.publish(reply, encode_failure(response, e)) + publish(reply, encode_failure(response, e)) end def on_unbind(msg, reply) @@ -116,13 +116,13 @@ def on_unbind(msg, reply) unbind_req = UnbindRequest.decode(msg) result = unbind(unbind_req.credentials) if result - @node_nats.publish(reply, encode_success(response)) + publish(reply, encode_success(response)) else - @node_nats.publish(reply, encode_failure(response)) + publish(reply, encode_failure(response)) end rescue => e @logger.warn(e) - @node_nats.publish(reply, encode_failure(response, e)) + publish(reply, encode_failure(response, e)) end def on_restore(msg, reply) @@ -133,13 +133,13 @@ def on_restore(msg, reply) backup_path = restore_message.backup_path result = restore(instance_id, backup_path) if result - @node_nats.publish(reply, encode_success(response)) + publish(reply, encode_success(response)) else - @node_nats.publish(reply, encode_failure(response)) + publish(reply, encode_failure(response)) end rescue => e @logger.warn(e) - @node_nats.publish(reply, encode_failure(response, e)) + publish(reply, encode_failure(response, e)) end # disable and dump instance @@ -152,7 +152,7 @@ def on_disable_instance(msg, reply) FileUtils.mkdir_p(file_path) result = disable_instance(prov_cred, binding_creds) result = dump_instance(prov_cred, binding_creds, file_path) if result - @node_nats.publish(reply, Yajl::Encoder.encode(result)) + publish(reply, Yajl::Encoder.encode(result)) rescue => e @logger.warn(e) end @@ -167,7 +167,7 @@ def on_enable_instance(msg, reply) prov_cred, binding_creds_hash = result prov_cred['node_id'] = @node_id result = [prov_cred, binding_creds_hash] - @node_nats.publish(reply, Yajl::Encoder.encode(result)) + publish(reply, Yajl::Encoder.encode(result)) rescue => e @logger.warn(e) end @@ -179,7 +179,7 @@ def on_cleanup_nfs(msg, reply) prov_cred, binding_creds = request instance = prov_cred['name'] FileUtils.rm_rf(get_migration_folder(instance)) - @node_nats.publish(reply, Yajl::Encoder.encode(true)) + publish(reply, Yajl::Encoder.encode(true)) rescue => e @logger.warn(e) end @@ -259,7 +259,7 @@ def on_import_instance(msg, reply) instance = prov_cred['name'] file_path = get_migration_folder(instance) result = import_instance(prov_cred, binding_creds, file_path, plan) - @node_nats.publish(reply, Yajl::Encoder.encode(result)) + publish(reply, Yajl::Encoder.encode(result)) rescue => e @logger.warn(e) end @@ -279,7 +279,7 @@ def send_node_announcement(reply = nil) @logger.debug("#{service_description}: Sending announcement for #{reply || "everyone"}") a = announcement a[:id] = @node_id - @node_nats.publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a)) + publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a)) rescue @logger.warn(e) end