Skip to content

Commit

Permalink
limit the number of parallel things happening via proxmox API
Browse files Browse the repository at this point in the history
  • Loading branch information
Grant Gongaware committed Jul 21, 2017
1 parent 2f2c5df commit 365b131
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 19 deletions.
53 changes: 41 additions & 12 deletions proxmox/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import (
)

type providerConfiguration struct {
Client *pxapi.Client
Client *pxapi.Client
MaxParallel int
CurrentParallel int
MaxVmId int
Mutex *sync.Mutex
Cond *sync.Cond
}

func Provider() *schema.Provider {
Expand All @@ -36,6 +41,11 @@ func Provider() *schema.Provider {
DefaultFunc: schema.EnvDefaultFunc("PM_API_URL", nil),
Description: "https://host.fqdn:8006/api2/json",
},
"pm_parallel": {
Type: schema.TypeInt,
Optional: true,
Default: 4,
},
},

ResourcesMap: map[string]*schema.Resource{
Expand All @@ -54,8 +64,14 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
if err != nil {
return nil, err
}
var mut sync.Mutex
return &providerConfiguration{
Client: client,
Client: client,
MaxParallel: d.Get("pm_parallel").(int),
CurrentParallel: 0,
MaxVmId: 0,
Mutex: &mut,
Cond: sync.NewCond(&mut),
}, nil
}

Expand All @@ -68,23 +84,36 @@ func getClient(pm_api_url string, pm_user string, pm_password string) (*pxapi.Cl
return client, nil
}

var mutex = &sync.Mutex{}
var maxVmId = 0

func nextVmId(client *pxapi.Client) (nextId int, err error) {
mutex.Lock()
if maxVmId == 0 {
maxVmId, err = pxapi.MaxVmId(client)
func nextVmId(pconf *providerConfiguration) (nextId int, err error) {
pconf.Mutex.Lock()
if pconf.MaxVmId == 0 {
pconf.MaxVmId, err = pxapi.MaxVmId(pconf.Client)
if err != nil {
return 0, err
}
}
maxVmId++
nextId = maxVmId
mutex.Unlock()
pconf.MaxVmId++
nextId = pconf.MaxVmId
pconf.Mutex.Unlock()
return nextId, nil
}

func pmParallelBegin(pconf *providerConfiguration) {
pconf.Mutex.Lock()
for pconf.CurrentParallel >= pconf.MaxParallel {
pconf.Cond.Wait()
}
pconf.CurrentParallel++
pconf.Mutex.Unlock()
}

func pmParallelEnd(pconf *providerConfiguration) {
pconf.Mutex.Lock()
pconf.CurrentParallel--
pconf.Cond.Signal()
pconf.Mutex.Unlock()
}

func resourceId(targetNode string, resType string, vmId int) string {
return fmt.Sprintf("%s/%s/%d", targetNode, resType, vmId)
}
Expand Down
57 changes: 50 additions & 7 deletions proxmox/resource_vm_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ func resourceVmQemu() *schema.Resource {
}

func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client
pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmName := d.Get("name").(string)
disk_gb := d.Get("disk_gb").(float64)
config := pxapi.ConfigQemu{
Expand All @@ -146,17 +148,20 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
targetNode := d.Get("target_node").(string)

if dupVmr != nil && forceCreate {
pmParallelEnd(pconf)
return fmt.Errorf("Duplicate VM name (%s) with vmId: %d. Set force_create=false to recycle", vmName, dupVmr.VmId())
} else if dupVmr != nil && dupVmr.Node() != targetNode {
pmParallelEnd(pconf)
return fmt.Errorf("Duplicate VM name (%s) with vmId: %d on different target_node=%s", vmName, dupVmr.VmId(), dupVmr.Node())
}

vmr := dupVmr

if vmr == nil {
// get unique id
nextid, err := nextVmId(client)
nextid, err := nextVmId(pconf)
if err != nil {
pmParallelEnd(pconf)
return err
}
vmr = pxapi.NewVmRef(nextid)
Expand All @@ -166,23 +171,30 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
if d.Get("clone").(string) != "" {
sourceVmr, err := client.GetVmRefByName(d.Get("clone").(string))
if err != nil {
pmParallelEnd(pconf)
return err
}
log.Print("[DEBUG] cloning VM")
err = config.CloneVm(sourceVmr, vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}

// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)

err = prepareDiskSize(client, vmr, disk_gb)
if err != nil {
pmParallelEnd(pconf)
return err
}

} else if d.Get("iso").(string) != "" {
config.QemuIso = d.Get("iso").(string)
err := config.CreateVm(vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}
}
Expand All @@ -193,26 +205,40 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {

err := config.UpdateConfig(vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}

// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)

err = prepareDiskSize(client, vmr, disk_gb)
if err != nil {
pmParallelEnd(pconf)
return err
}
}
d.SetId(resourceId(targetNode, "qemu", vmr.VmId()))

// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)

log.Print("[DEBUG] starting VM")
_, err := client.StartVm(vmr)
if err != nil {
pmParallelEnd(pconf)
return err
}
log.Print("[DEBUG] setting up SSH forward")
sshPort, err := pxapi.SshForwardUsernet(vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}

// Done with proxmox API, end parallel and do the SSH things
pmParallelEnd(pconf)

d.SetConnInfo(map[string]string{
"type": "ssh",
"host": d.Get("ssh_forward_ip").(string),
Expand All @@ -228,15 +254,15 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {

case "ubuntu":
// give sometime to bootup
time.Sleep(5 * time.Second)
time.Sleep(9 * time.Second)
err = preProvisionUbuntu(d)
if err != nil {
return err
}

case "centos":
// give sometime to bootup
time.Sleep(8 * time.Second)
time.Sleep(9 * time.Second)
err = preProvisionCentos(d)
if err != nil {
return err
Expand All @@ -245,13 +271,17 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
default:
return fmt.Errorf("Unknown os_type: %s", d.Get("os_type").(string))
}

return nil
}

func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client
pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmr, err := client.GetVmRefByName(d.Get("name").(string))
if err != nil {
pmParallelEnd(pconf)
return err
}
vmName := d.Get("name").(string)
Expand All @@ -276,6 +306,7 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {

sshPort, err := pxapi.SshForwardUsernet(vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}
d.SetConnInfo(map[string]string{
Expand All @@ -285,17 +316,21 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {
"user": d.Get("ssh_user").(string),
"private_key": d.Get("ssh_private_key").(string),
})
pmParallelEnd(pconf)
return nil
}

func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client
pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmr, err := client.GetVmRefByName(d.Get("name").(string))
if err != nil {
return err
}
config, err := pxapi.NewConfigQemuFromApi(vmr, client)
if err != nil {
pmParallelEnd(pconf)
return err
}
d.SetId(resourceId(vmr.Node(), "qemu", vmr.VmId()))
Expand All @@ -311,23 +346,31 @@ func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error {
d.Set("nic", config.QemuNicModel)
d.Set("bridge", config.QemuBrige)
d.Set("vlan", config.QemuVlanTag)
pmParallelEnd(pconf)
return nil
}

func resourceVmQemuImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
// TODO: research proper import
err := resourceVmQemuRead(d, meta)
return []*schema.ResourceData{d}, err
}

func resourceVmQemuDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client
pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmId, _ := strconv.Atoi(path.Base(d.Id()))
vmr := pxapi.NewVmRef(vmId)
_, err := client.StopVm(vmr)
if err != nil {
pmParallelEnd(pconf)
return err
}
// give sometime to proxmox to catchup
time.Sleep(2 * time.Second)
_, err = client.DeleteVm(vmr)
pmParallelEnd(pconf)
return err
}

Expand Down

0 comments on commit 365b131

Please sign in to comment.