Skip to content
This repository has been archived by the owner on Aug 10, 2021. It is now read-only.

Commit

Permalink
Put a lock around tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
lizrice committed Apr 8, 2016
1 parent f168d67 commit 22fa0a3
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 35 deletions.
9 changes: 9 additions & 0 deletions demand/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
// The demand package defines the interface for demand models
package demand

import (
"sync"
)

type Tasks struct {
Tasks map[string]Task
sync.RWMutex
}

type Task struct {
Demand int
Requested int
Expand Down
12 changes: 8 additions & 4 deletions demandchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,27 @@ import (
)

// handleDemandChange updates to changed demand
func handleDemandChange(td []api.TaskDemand, s scheduler.Scheduler, tasks map[string]demand.Task) (err error) {
func handleDemandChange(td []api.TaskDemand, s scheduler.Scheduler, running *demand.Tasks) (err error) {
running.Lock()
defer running.Unlock()
runningTasks := running.Tasks

var demandChanged = false
for _, task := range td {
name := task.App

if existingTask, ok := tasks[name]; ok {
if existingTask, ok := runningTasks[name]; ok {
if existingTask.Demand != task.DemandCount {
demandChanged = true
}
existingTask.Demand = task.DemandCount
tasks[name] = existingTask
runningTasks[name] = existingTask
}
}

if demandChanged {
// Ask the scheduler to make the changes
err = s.StopStartTasks(tasks)
err = s.StopStartTasks(running.Tasks)
if err != nil {
log.Errorf("Failed to stop / start tasks. %v", err)
}
Expand Down
12 changes: 7 additions & 5 deletions demandchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
)

func TestHandleDemandChange(t *testing.T) {
tasks = make(map[string]demand.Task)
tasks["priority1"] = demand.Task{
var tasks *demand.Tasks = new(demand.Tasks)
tasks.Tasks = make(map[string]demand.Task)

tasks.Tasks["priority1"] = demand.Task{
FamilyName: "p1family",
Demand: 4,
Requested: 0,
}

tasks["priority2"] = demand.Task{
tasks.Tasks["priority2"] = demand.Task{
FamilyName: "p2family",
Demand: 3,
Requested: 0,
Expand Down Expand Up @@ -65,8 +67,8 @@ func TestHandleDemandChange(t *testing.T) {
t.Fatalf("handleDemandChange failed")
}
log.Info(tasks)
if !reflect.DeepEqual(tasks, test.newtasks) {
t.Fatalf("Expected %v tasks, got %v", test.newtasks, tasks)
if !reflect.DeepEqual(tasks.Tasks, test.newtasks) {
t.Fatalf("Expected %v tasks, got %v", test.newtasks, tasks.Tasks)
}
}
}
27 changes: 18 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,27 @@ import (
const constSendMetricsSleep = 500 // milliseconds - delay before we send state on the metrics API

var (
tasks map[string]demand.Task
log = logging.MustGetLogger("mssagent")
log = logging.MustGetLogger("mssagent")
)

func init() {
initLogging()
}

// cleanup resets demand for all tasks to 0 before we quit
func cleanup(s scheduler.Scheduler, tasks map[string]demand.Task) {
var err error
func cleanup(s scheduler.Scheduler, running *demand.Tasks) {
running.Lock()

tasks := running.Tasks
for name, task := range tasks {
task.Demand = 0
tasks[name] = task
}

log.Info("Reset tasks to 0 for cleanup")
err = s.StopStartTasks(tasks)
running.Unlock()

log.Debugf("Reset tasks to 0 for cleanup")
err := s.StopStartTasks(tasks)
if err != nil {
log.Errorf("Failed to cleanup tasks. %v", err)
}
Expand All @@ -71,6 +73,7 @@ func cleanup(s scheduler.Scheduler, tasks map[string]demand.Task) {
// For this simple prototype, Microscaling sits in a loop checking for demand changes every X milliseconds
func main() {
var err error
var tasks *demand.Tasks

st := getSettings()

Expand All @@ -80,10 +83,10 @@ func main() {
return
}

tasks := getTasks(st)
tasks = getTasks(st)

// Let the scheduler know about the task types.
for name, task := range tasks {
for name, task := range tasks.Tasks {
err = s.InitScheduler(name, &task)
if err != nil {
log.Errorf("Failed to start task %s: %v", name, err)
Expand All @@ -97,6 +100,12 @@ func main() {
log.Errorf("Failed to count containers. %v", err)
}

// Set the initial requested counts to match what's running
for name, task := range tasks.Tasks {
task.Requested = task.Running
tasks.Tasks[name] = task
}

// Prepare for cleanup when we receive an interrupt
closedown := make(chan os.Signal, 1)
signal.Notify(closedown, os.Interrupt)
Expand Down Expand Up @@ -158,7 +167,7 @@ func main() {
log.Errorf("Failed to count containers. %v", err)
}

err = api.SendMetrics(ws, st.userID, tasks)
err = api.SendMetrics(ws, st.userID, tasks.Tasks)
if err != nil {
log.Errorf("Failed to send metrics. %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion scheduler/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *DockerScheduler) StopStartTasks(tasks map[string]demand.Task) error {
return err
}

func (c *DockerScheduler) CountAllTasks(tasks map[string]demand.Task) error {
func (c *DockerScheduler) CountAllTasks(running *demand.Tasks) error {
// Docker Remote API https://docs.docker.com/reference/api/docker_remote_api_v1.20/
// get /containers/json
var err error
Expand All @@ -186,6 +186,10 @@ func (c *DockerScheduler) CountAllTasks(tasks map[string]demand.Task) error {
return fmt.Errorf("Failed to list containers: %v", err)
}

running.Lock()
defer running.Unlock()
tasks := running.Tasks

// Reset all the running counts to 0
for name, t := range tasks {
t.Running = 0
Expand Down
8 changes: 4 additions & 4 deletions scheduler/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestDockerScheduler(t *testing.T) {
fmt.Printf("Error %v", err)
}

var tasks map[string]demand.Task
tasks = make(map[string]demand.Task)
tasks["anything"] = task
d.CountAllTasks(tasks)
var tasks demand.Tasks
tasks.Tasks = make(map[string]demand.Task)
tasks.Tasks["anything"] = task
d.CountAllTasks(&tasks)
}
2 changes: 1 addition & 1 deletion scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type Scheduler interface {
StopStartTasks(tasks map[string]demand.Task) error

// CountAllTasks updates task.Running to tell us how many instances of each task are currently running
CountAllTasks(tasks map[string]demand.Task) error
CountAllTasks(tasks *demand.Tasks) error
}
6 changes: 5 additions & 1 deletion scheduler/toy/toy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func (t *ToyScheduler) StopStartTasks(tasks map[string]demand.Task) error {
}

// CountAllTasks for the Toy scheduler simply reflects back what has been requested
func (t *ToyScheduler) CountAllTasks(tasks map[string]demand.Task) error {
func (t *ToyScheduler) CountAllTasks(running *demand.Tasks) error {
running.Lock()
defer running.Unlock()
tasks := running.Tasks

for name, task := range tasks {
task.Running = task.Requested
tasks[name] = task
Expand Down
15 changes: 8 additions & 7 deletions scheduler/toy/toy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import (
)

func TestToyScheduler(t *testing.T) {
var tasks map[string]demand.Task = make(map[string]demand.Task)
var tasks demand.Tasks
tasks.Tasks = make(map[string]demand.Task)

tasks["anything"] = demand.Task{Demand: 8, Requested: 3}
tasks.Tasks["anything"] = demand.Task{Demand: 8, Requested: 3}
m := NewScheduler()

task := tasks["anything"]
task := tasks.Tasks["anything"]
m.InitScheduler("anything", &task)

log.Debugf("before start/stop: demand %d, requested %d, running %d", task.Demand, task.Requested, task.Running)
err := m.StopStartTasks(tasks)
err := m.StopStartTasks(tasks.Tasks)
if err != nil {
t.Fatalf("Error %v", err)
}
task = tasks["anything"]
task = tasks.Tasks["anything"]
log.Debugf("after start/stop: demand %d, requested %d, running %d", task.Demand, task.Requested, task.Running)

if err != nil {
Expand All @@ -29,8 +30,8 @@ func TestToyScheduler(t *testing.T) {
t.Fatalf("Requested should have been updated")
}

err = m.CountAllTasks(tasks)
for name, task := range tasks {
err = m.CountAllTasks(&tasks)
for name, task := range tasks.Tasks {
if task.Running != task.Requested || task.Running != task.Demand {
t.Fatalf("Task %s running is not what was requested or demanded", name)
}
Expand Down
9 changes: 6 additions & 3 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func getScheduler(st settings) (scheduler.Scheduler, error) {
return s, nil
}

func getTasks(st settings) map[string]demand.Task {
func getTasks(st settings) (tasks *demand.Tasks) {
var t map[string]demand.Task

// Get the tasks that have been configured by this user
Expand All @@ -101,8 +101,11 @@ func getTasks(st settings) map[string]demand.Task {
log.Errorf("Error getting tasks: %v", err)
}

log.Debug(t)
return t
log.Debugf("Tasks: %v", t)

tasks = new(demand.Tasks)
tasks.Tasks = t
return tasks
}

func getEnvOrDefault(name string, defaultValue string) string {
Expand Down

0 comments on commit 22fa0a3

Please sign in to comment.