Permalink
Browse files

First import

  • Loading branch information...
0 parents commit 674ff32b4f1a2e531ade90c06306a2ce23a26c6f Nico committed Apr 15, 2010
Showing with 291 additions and 0 deletions.
  1. +55 −0 clients/mc-xen
  2. +172 −0 clients/mc-xen-balancer
  3. +2 −0 etc/xen-mappings.csv
  4. +62 −0 plugins/agents/xenagent.rb
@@ -0,0 +1,55 @@
+#!/usr/bin/env ruby
+
+require 'mcollective'
+include MCollective::RPC
+
+options = rpcoptions do |parser, options|
+ parser.define_head "Xen Client"
+ parser.banner = "Usage: [options] [filters]"
+
+ parser.on('-a', '--action ACTION', 'action to run') do |v|
+ options[:action] = v
+ end
+
+ parser.on('-d', '--domu NAME', 'domU name to search') do |v|
+ options[:name] = v
+ end
+end
+
+unless options.include?(:action)
+ puts "You must pass at least an action"
+ exit! 1
+end
+
+if options[:action] == "find" then
+ unless options.include?(:name)
+ puts("You need to specify a domU to find with --domu")
+ exit! 1
+ end
+end
+
+mc = rpcclient("xenagent")
+mc.progress = false
+client = mc.client
+
+if options[:action] == "find" then
+ mc.find(:name => options[:name]).each do |resp|
+ printf("%-25s: %s\n", resp[:sender], resp[:data])
+ end
+end
+
+if options[:action] == "list" then
+ mc.list.each do |resp|
+ printf("%-25s\n", resp[:sender])
+ # drop domain 0 from list
+ resp[:data][:slices].delete("Domain-0")
+ if resp[:data][:slices] != [] then
+ resp[:data][:slices].each { |d|
+ puts "\t #{d}"
+ }
+ else
+ puts "\t no domU running"
+ end
+ puts ""
+ end
+end
@@ -0,0 +1,172 @@
+#!/usr/bin/env ruby
+
+# A (wannabe) load balancer for Xen host
+# based on mcollective, and inspired by VmWare features
+
+require 'mcollective'
+require 'csv'
+
+include MCollective::RPC
+
+mc = rpcclient("xenagent")
+mc.progress = false
+
+@config = { :interval => 30, # polling interval
+ :load_threshold => 0.4, # cpu max load
+ :daemonize => false, # do we go in background ?
+ :max_over => 1, # max MINUTES where load is over threshold.
+ # this should be _at least_ interval*2
+ :debug => true, # being verbose
+ :max_vm_per_host => 10, # try not to go over 9000^Wthis many VMs per host
+ :max_load_candidate => 0.8, # if we reach this load we're
+ # no more eligible as a host for vms
+ :host_mapping => "/etc/mcollective/xen-mappings.csv" # the hypervisor name / IP mapping
+}
+
+@load_counter = {}
+@domu_times = {}
+@domu_counter = {}
+@hypervisors = {}
+
+def load_hypervisors()
+ CSV.read(@config[:host_mapping]).find_all { |r|
+ @hypervisors[r[0].to_s]=r[1].to_s
+ }
+end
+
+def debug(msg)
+ if @config[:debug] then
+ puts "[+] " + msg
+ end
+end
+
+def log(msg)
+ puts msg
+end
+
+def choose_from_times(hostname)
+ if !@domu_times_used.empty? then
+ highest = @domu_times_used.sort_by { |v| v[1] }
+ debug("VM key : " + highest[-1][0])
+ debug("Time consumed in a run (interval is "+@config[:interval].to_s+"s) : " + highest[-1][1].to_s)
+ domu_name = highest[-1][0].to_s.sub(/^#{hostname}-/,"")
+ else
+ nil
+ end
+end
+
+loop do
+ load_hypervisors()
+
+ mc.stat.each do |resp|
+ # clean up time consumption at each loop
+ @domu_times_used = {}
+ # know how many VMs each host has (may be used when migrating)
+ @domu_counter[resp[:sender]]=resp[:data][:slices].count
+
+ debug(resp[:sender] + " : " + resp[:data][:load].to_s + " load and " + @domu_counter[resp[:sender]].to_s + " slice(s) running")
+
+ # Do we hit the limit ?
+ if resp[:data][:load] > @config[:load_threshold] then
+ if @load_counter[resp[:sender]] then
+ @load_counter[resp[:sender]] +=1
+ else # init/reset it, we want load spikes to be consecutives
+ @load_counter[resp[:sender]] = 1
+ end
+ else # if no reinit counter
+ @load_counter[resp[:sender]] = 0
+ debug("init/reset load counter for " + resp[:sender])
+ end
+
+ # store & calculate domU time consumption
+ if !resp[:data][:times].empty? then
+ resp[:data][:times].each_pair { |k,v|
+ v=v.to_f
+ key=resp[:sender]+"-"+k
+ # does not have a value for this domU on this dom0
+ if !@domu_times.has_key?(key) then
+ @domu_times[key]=v
+ @domu_times_used[key]=0
+ debug("added #{k} on #{resp[:sender]} with 0 CPU time (registered #{@domu_times[key]} as a reference)")
+ else # we have one, calculate time consumption
+ @domu_times_used[key]=v-@domu_times[key]
+ @domu_times[key]=v
+ debug("updated #{k} on #{resp[:sender]} with #{@domu_times_used[key]} CPU time eaten (registered #{@domu_times[key]} as a reference)")
+ end
+ }
+ else
+ debug("#{resp[:sender]} has no slices consuming CPU time")
+ end
+ end # End of mc.stat loop
+
+ # Time for analysis & decision
+ @load_counter.each_pair { |k,v|
+ # Yes, this machine has reached the limit
+ if v > (@config[:max_over]*60)/@config[:interval] then
+ debug("#{k} has #{v} threshold overload")
+ debug("Time to see if we can migrate a VM from #{k}")
+ vm=choose_from_times(k)
+ # now look for a new home
+ if vm != nil
+ # Let's exclude those that don't match config criterias
+ # first, max vms :
+ host_list=@domu_counter.map { |d| if d[1] < @config[:max_vm_per_host] then d[0] end }
+ host_list.delete(nil) # drop nil
+ host_list.delete(k) # drop ourself
+ if !host_list.empty? and @config[:debug] then
+ host_list.each { |h|
+ debug("#{h} is a candidate for being a host (step 1 : max VMs)")
+ }
+ end
+ # next, max load
+ load_excluded = @load_counter.map { |l| if l[1] > @config[:max_load_candidate] then l[0] end }
+ host_list -= load_excluded
+ if !host_list.empty? and @config[:debug] then
+ host_list.each { |h|
+ debug("#{h} is a candidate for being a host (step 2 : max load)")
+ }
+
+ # there is at least 1 host, so let's sort by load and take the first one
+ new_host = @load_counter.sort_by { |h| h[1] }[0][0]
+ if @hypervisors.has_key? new_host then
+ new_host_ip = @hypervisors[new_host]
+ else
+ puts "FIXME : implement DNS resolution"
+ exit!
+ end
+
+ log("trying to migrate #{vm} from #{k} to #{new_host} (#{new_host_ip})")
+ # We create a new client, with filters
+ mc_migration = rpcclient("xenagent")
+ mc_migration.progress = false
+ mc_migration.verbose = false
+ mc_migration.fact_filter("hostname",k)
+ result = mc_migration.migrate(:slice => vm, :newhost => new_host_ip)
+ if result[0][:data][:result] == true then
+ log("Successfully migrated #{vm} !")
+ else
+ log("Failed to migrate #{vm}. You should take a look")
+ end
+
+ # VM migrated, reset load_counter
+ @load_counter[k]=0
+ # drop times values
+ @domu_times.delete(vm)
+ @domu_times_used.delete(vm)
+ else
+ debug("no more candidates, doing nothing...")
+ end
+ else
+ debug("could not guess the name of the VM to migrate, weird")
+ exit! 1
+ end
+ end
+ }
+
+
+
+ debug("sleeping for " + @config[:interval].to_s + " seconds")
+ log("")
+ sleep(@config[:interval])
+end
+
@@ -0,0 +1,2 @@
+hypervisor2,10.0.0.2
+hypervisor3,10.0.0.3
@@ -0,0 +1,62 @@
+require 'xen'
+
+module MCollective
+ module Agent
+ class Xenagent < RPC::Agent
+ def startup_hook
+ # Increase timeout because migration can be longer than the default one
+ # Yes, it is quite bad
+ @timeout = 30
+ end
+
+ # Basic echo server, kept for tests
+ def echo_action
+ validate :msg, String
+
+ reply.data = request[:msg]
+ end
+
+ def find_action
+ validate :name, String
+
+ x=Xen::XenServer.new
+ if x.has? request[:name] then
+ reply.data = "Present"
+ else
+ reply.data = "Absent"
+ end
+ end
+
+ def list_action
+ x=Xen::XenServer.new
+ reply[:slices] = x.slices
+ end
+
+
+ def stat_action
+ x=Xen::XenServer.new
+ # We don't care about domain 0
+ slices = x.slices
+ slices.delete("Domain-0")
+ reply[:slices] = slices
+ # UGLY -- this is purely linux -- need to be fixed
+ reply[:load] = open("/proc/loadavg","r").readline.split()[0].to_f
+ reply[:times] = {}
+ if !slices.empty? then
+ slices.each { |s| reply[:times][s]=x.get(s).time }
+ end
+ end
+
+ def migrate_action
+ validate :slice, String
+ validate :newhost, :ipv4address
+
+ x=Xen::XenServer.new
+ result=x.migrate(request[:slice],request[:newhost])
+ reply[:result]=result
+ end
+
+ # end of class
+ end
+ end
+end

0 comments on commit 674ff32

Please sign in to comment.