/
azurebatch_helpers.go
104 lines (83 loc) · 3.25 KB
/
azurebatch_helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package providers
// NewServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the
import (
"context"
"fmt"
"time"
log "github.com/sirupsen/logrus"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/azure-sdk-for-go/services/batch/2017-09-01.6.0/batch"
)
func getPool(ctx context.Context, batchBaseURL, poolID string, auth autorest.Authorizer) (*batch.PoolClient, error) {
poolClient := batch.NewPoolClientWithBaseURI(batchBaseURL)
poolClient.Authorizer = auth
poolClient.RetryAttempts = 0
pool, err := poolClient.Get(ctx, poolID, "*", "", nil, nil, nil, nil, "", "", nil, nil)
// If we observe an error which isn't related to the pool not existing panic.
// 404 is expected if this is first run.
if err != nil && pool.Response.Response == nil {
log.WithError(err).Error("Failed to get pool. nil response %v", poolID)
return nil, err
} else if err != nil && pool.StatusCode == 404 {
log.WithError(err).Error("Pool doesn't exist 404 received PoolID: %v", poolID)
return nil, err
} else if err != nil {
log.WithError(err).Error("Failed to get pool. Response:%v", pool.Response)
return nil, err
}
if pool.State == batch.PoolStateActive {
log.Println("Pool active and running...")
return &poolClient, nil
}
return nil, fmt.Errorf("Pool not in active state: %v", pool.State)
}
func createOrGetJob(ctx context.Context, batchBaseURL, jobID, poolID string, auth autorest.Authorizer) (*batch.JobClient, error) {
jobClient := batch.NewJobClientWithBaseURI(batchBaseURL)
jobClient.Authorizer = auth
// check if job exists already
currentJob, err := jobClient.Get(ctx, jobID, "", "", nil, nil, nil, nil, "", "", nil, nil)
if err == nil && currentJob.State == batch.JobStateActive {
log.Println("Wrapper job already exists...")
return &jobClient, nil
} else if currentJob.Response.StatusCode == 404 {
log.Println("Wrapper job missing... creating...")
wrapperJob := batch.JobAddParameter{
ID: &jobID,
PoolInfo: &batch.PoolInformation{
PoolID: &poolID,
},
}
res, err := jobClient.Add(ctx, wrapperJob, nil, nil, nil, nil)
log.WithField("jobReponse", res).Debug("created job")
if err != nil {
return nil, err
}
return &jobClient, nil
} else if currentJob.State == batch.JobStateDeleting {
log.Info("Job is being deleted... Waiting then will retry")
time.Sleep(time.Minute)
return createOrGetJob(ctx, batchBaseURL, jobID, poolID, auth)
}
return nil, err
}
func getBatchBaseURL(batchAccountName, batchAccountLocation string) string {
return fmt.Sprintf("https://%s.%s.batch.azure.com", batchAccountName, batchAccountLocation)
}
func logFieldsForTask(batchTask *batch.CloudTask) log.Fields {
fields := log.Fields{
"taskID": batchTask.ID,
"taskName": batchTask.DisplayName,
}
if batchTask.NodeInfo != nil {
fields["taskPoolID"] = batchTask.NodeInfo.PoolID
fields["taskNodeID"] = batchTask.NodeInfo.NodeID
}
if batchTask.ExecutionInfo != nil {
if batchTask.ExecutionInfo.StartTime != nil && batchTask.ExecutionInfo.EndTime != nil {
fields["endTime"] = *batchTask.ExecutionInfo.EndTime
fields["startTime"] = *batchTask.ExecutionInfo.StartTime
fields["taskDurationSec"] = batchTask.ExecutionInfo.EndTime.Sub(batchTask.ExecutionInfo.StartTime.Time).Seconds
}
}
return fields
}