Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
petemoore committed Feb 15, 2017
1 parent 1c0446c commit ef49cf5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
21 changes: 16 additions & 5 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type configType struct {
TLSKey string `json:"tlsKey"`
DNSSecret string `json:"statelessDNSSecret"`
DNSDomain string `json:"statelessDNSDomain"`
MaxLifeCycle int `json:"maxLifeCycle"`
MaxLifeCycle int64 `json:"maxLifeCycle"`
MinimumDiskSpace int64 `json:"minimumDiskSpace"`
MinimumMemory int64 `json:"minimumMemory"`
MaxTasksToRun int `json:"numberOfTasksToRun"`
}

type credentials struct {
Expand Down Expand Up @@ -219,16 +220,16 @@ func ConfigSchema() schematypes.Object {
"statelessDNSDomain": schematypes.String{},
"maxLifeCycle": schematypes.Integer{
MetaData: schematypes.MetaData{
Title: "Max life cycle of worker",
Description: "Used to limit validity of hostname",
Title: "Maximum lifetime of the worker in seconds",
Description: "Used to limit the time period for which the DNS server will return an IP for the given worker hostname",
},
Minimum: 5 * 60,
Maximum: 31 * 24 * 60 * 60,
},
"minimumDiskSpace": schematypes.Integer{
MetaData: schematypes.MetaData{
Title: "Minimum Disk Space",
Description: `The minimum amount of disk space to have available
Description: `The minimum amount of disk space in bytes to have available
before starting on the next task. Garbage collector will do a
best-effort attempt at releasing resources to satisfy this limit`,
},
Expand All @@ -238,13 +239,23 @@ func ConfigSchema() schematypes.Object {
"minimumMemory": schematypes.Integer{
MetaData: schematypes.MetaData{
Title: "Minimum Memory",
Description: `The minimum amount of memory to have available
Description: `The minimum amount of memory in bytes to have available
before starting on the next task. Garbage collector will do a
best-effort attempt at releasing resources to satisfy this limit`,
},
Minimum: 0,
Maximum: math.MaxInt64,
},
"numberOfTasksToRun": schematypes.Integer{
MetaData: schematypes.MetaData{
Title: "Number of tasks the worker should run before exiting",
Description: `If set to 0, the worker does not limit the number of tasks
it will claim and execute. For positive values > 0, the worker will
exit if it completes the given number of tasks.`,
},
Minimum: 0,
Maximum: math.MaxInt32,
},
},
Required: []string{
"engine",
Expand Down
18 changes: 16 additions & 2 deletions worker/queueservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,19 @@ type (
workerGroup string
log *logrus.Entry
halt atomics.Bool
maxTasksToRun int
totalClaimed int
}
)

// math.Min is for floats, let's avoid casting and float arithmetic...
func min(a, b int) int {
if a < b {
return a
}
return b
}

// Start will begin the task claiming loop and claim as many tasks as the worker
// capacity allows. Claimed tasks will be returned on a channel for consumers
// to run.
Expand All @@ -86,9 +96,12 @@ func (q *queueService) Start() <-chan *taskClaim {
go func() {
for !q.halt.Get() {
q.mu.RLock()
capacity := q.capacity
numberOfTasksToClaim := min(q.capacity, q.maxTasksToRun-q.totalClaimed)
q.mu.RUnlock()
tasks := q.retrieveTasksFromQueue(capacity)
if numberOfTasksToClaim == 0 {
break
}
tasks := q.retrieveTasksFromQueue(numberOfTasksToClaim)
q.claimTasks(tasks)
time.Sleep(time.Duration(q.interval) * time.Second)
}
Expand Down Expand Up @@ -128,6 +141,7 @@ func (q *queueService) claimTasks(tasks []*taskMessage) {
}
q.mu.Lock()
q.capacity--
q.totalClaimed++
q.mu.Unlock()
q.tc <- claim
}(task)
Expand Down
1 change: 1 addition & 0 deletions worker/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func newTaskManager(
workerType: config.WorkerType,
log: log.WithField("component", "Queue Service"),
expirationOffset: config.ReclaimOffset,
maxTasksToRun: config.MaxTasksToRun,
}

m := &Manager{
Expand Down

0 comments on commit ef49cf5

Please sign in to comment.