Skip to content

Commit

Permalink
Merge "make mysql_node thread safe" into services-r6
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Liu authored and testazuretrain committed Nov 4, 2011
2 parents af6c167 + a7bc972 commit da17472
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 34 deletions.
88 changes: 55 additions & 33 deletions mysql/lib/mysql_service/node.rb
Expand Up @@ -7,6 +7,7 @@
require "uuidtools"
require "mysql"
require "open3"
require "thread"

$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', '..', '..', 'base', 'lib')
require 'base/node'
Expand Down Expand Up @@ -80,6 +81,7 @@ def initialize(options)
check_db_consistency()

@available_storage = options[:available_storage] * 1024 * 1024
@available_storage_lock = Mutex.new
@node_capacity = @available_storage
ProvisionedService.all.each do |provisioned_service|
@available_storage -= storage_for_service(provisioned_service)
Expand All @@ -91,6 +93,7 @@ def initialize(options)
get_qps
@long_queries_killed = 0
@long_tx_killed = 0
@statistics_lock = Mutex.new
@provision_served = 0
@binding_served = 0
end
Expand All @@ -110,10 +113,12 @@ def all_bindings_list
end

def announcement
a = {
:available_storage => @available_storage
}
a
@available_storage_lock.synchronize do
a = {
:available_storage => @available_storage
}
a
end
end

def check_db_consistency()
Expand Down Expand Up @@ -214,31 +219,41 @@ def kill_long_transaction

def provision(plan, credential=nil)
provisioned_service = ProvisionedService.new
if credential
name, user, password = %w(name user password).map{|key| credential[key]}
provisioned_service.name = name
provisioned_service.user = user
provisioned_service.password = password
else
# mysql database name should start with alphabet character
provisioned_service.name = 'd' + UUIDTools::UUID.random_create.to_s.gsub(/-/, '')
provisioned_service.user = 'u' + generate_credential
provisioned_service.password = 'p' + generate_credential
end
provisioned_service.plan = plan
storage = storage_for_service(provisioned_service)
begin
@available_storage_lock.synchronize do
@available_storage -= storage
end
if credential
name, user, password = %w(name user password).map{|key| credential[key]}
provisioned_service.name = name
provisioned_service.user = user
provisioned_service.password = password
else
# mysql database name should start with alphabet character
provisioned_service.name = 'd' + UUIDTools::UUID.random_create.to_s.gsub(/-/, '')
provisioned_service.user = 'u' + generate_credential
provisioned_service.password = 'p' + generate_credential
end
raise "Could not create database" unless create_database(provisioned_service)

raise "Could not create database" unless create_database(provisioned_service)

if not provisioned_service.save
@logger.error("Could not save entry: #{provisioned_service.errors.inspect}")
raise MysqlError.new(MysqlError::MYSQL_LOCAL_DB_ERROR)
if not provisioned_service.save
@logger.error("Could not save entry: #{provisioned_service.errors.inspect}")
raise MysqlError.new(MysqlError::MYSQL_LOCAL_DB_ERROR)
end
response = gen_credential(provisioned_service.name, provisioned_service.user, provisioned_service.password)
@statistics_lock.synchronize do
@provision_served += 1
end
return response
rescue => e
@available_storage_lock.synchronize do
@available_storage += storage
end
delete_database(provisioned_service)
raise e
end
response = gen_credential(provisioned_service.name, provisioned_service.user, provisioned_service.password)
@provision_served += 1
return response
rescue => e
delete_database(provisioned_service)
raise e
end

def unprovision(name, credentials)
Expand All @@ -256,11 +271,14 @@ def unprovision(name, credentials)
end
delete_database(provisioned_service)
storage = storage_for_service(provisioned_service)
@available_storage += storage
if not provisioned_service.destroy
@logger.error("Could not delete service: #{provisioned_service.errors.inspect}")
raise MysqlError.new(MysqError::MYSQL_LOCAL_DB_ERROR)
end
# the order is important, restore quota only when record is deleted from local db.
@available_storage_lock.synchronize do
@available_storage += storage
end
@logger.debug("Successfully fulfilled unprovision request: #{name}")
true
end
Expand Down Expand Up @@ -290,7 +308,9 @@ def bind(name, bind_opts, credential=nil)

response = gen_credential(name, binding[:user], binding[:password])
@logger.debug("Bind response: #{response.inspect}")
@binding_served += 1
@statistics_lock.synchronize do
@binding_served += 1
end
return response
rescue => e
delete_database_user(binding[:user]) if binding
Expand All @@ -316,8 +336,6 @@ def create_database(provisioned_service)
@logger.debug("Creating: #{provisioned_service.inspect}")
@connection.query("CREATE DATABASE #{name}")
create_database_user(name, user, password)
storage = storage_for_service(provisioned_service)
@available_storage -= storage
@logger.debug("Done creating #{provisioned_service.inspect}. Took #{Time.now - start}.")
return true
rescue Mysql::Error => e
Expand Down Expand Up @@ -507,13 +525,17 @@ def varz_details()
varz[:database_status] = status
# node capacity
varz[:node_storage_capacity] = @node_capacity
varz[:node_storage_used] = @node_capacity - @available_storage
@available_storage_lock.synchronize do
varz[:node_storage_used] = @node_capacity - @available_storage
end
# how many long queries and long txs are killed.
varz[:long_queries_killed] = @long_queries_killed
varz[:long_transactions_killed] = @long_tx_killed
# how many provision/binding operations since startup.
varz[:provision_served] = @provision_served
varz[:binding_served] = @binding_served
@statistics_lock.synchronize do
varz[:provision_served] = @provision_served
varz[:binding_served] = @binding_served
end
varz
rescue => e
@logger.error("Error during generate varz: #{e}")
Expand Down
24 changes: 23 additions & 1 deletion mysql/spec/mysql_node_spec.rb
Expand Up @@ -13,7 +13,7 @@ module VCAP
module Services
module Mysql
class Node
attr_reader :connection, :logger, :available_storage
attr_reader :connection, :logger, :available_storage, :provision_served, :binding_served
end
end
end
Expand Down Expand Up @@ -583,6 +583,28 @@ class MysqlError
end
end

it "should be thread safe" do
EM.run do
available_storage = @node.available_storage
provision_served = @node.provision_served
binding_served = @node.binding_served
NUM = 20
threads = []
NUM.times do
threads << Thread.new do
db = @node.provision(@default_plan)
binding = @node.bind(db["name"], @default_opts)
@node.unprovision(db["name"], [binding])
end
end
threads.each {|t| t.join}
available_storage.should == @node.available_storage
provision_served.should == @node.provision_served - NUM
binding_served.should == @node.binding_served - NUM
EM.stop
end
end

it "should enforce max connection limitation per user account" do
EM.run do
opts = @opts.dup
Expand Down

0 comments on commit da17472

Please sign in to comment.