Skip to content

Commit

Permalink
Make task channel smaller:
Browse files Browse the repository at this point in the history
It was too big and made memory
consumption way too high. Also,
the typical/general use-case will be
high number of hosts and low number of
tasks that are requested close together.

Signed-off-by: Jacob Weinstock <jakobweinstock@gmail.com>
  • Loading branch information
jacobweinstock committed Sep 10, 2023
1 parent e9d0f2f commit 73de1b2
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions grpc/taskrunner/manager.go
@@ -1,5 +1,7 @@
package taskrunner

// copied and modified from https://github.com/zenthangplus/goccm

import "sync/atomic"

type concurrencyManager struct {
Expand Down
20 changes: 10 additions & 10 deletions grpc/taskrunner/run.go
Expand Up @@ -14,9 +14,9 @@ type orchestrator struct {
manager *concurrencyManager
workerIdleTimeout time.Duration
fifoChan chan string
// perIDQueue is a map of hostID to a channel of tasks.
perIDQueue sync.Map
ingestChan chan Task
// perHostChan is a map of hostID to a channel of tasks.
perHostChan sync.Map
ingestChan chan Task
}

// ingest take a task off the ingestion queue and puts it on the perID queue
Expand All @@ -32,8 +32,8 @@ func (r *Runner) ingest(ctx context.Context) {
case t := <-r.orchestrator.ingestChan:

// 2. enqueue to perID queue
ch := make(chan Task, 5000)
q, exists := r.orchestrator.perIDQueue.LoadOrStore(t.Host, ch)
ch := make(chan Task, 10)
q, exists := r.orchestrator.perHostChan.LoadOrStore(t.Host, ch)
v, ok := q.(chan Task)
if !ok {
fmt.Println("bad type: IngestQueue")
Expand All @@ -59,22 +59,22 @@ func (r *Runner) orchestrate(ctx context.Context) {
// 1. dequeue from fcfs queue
// 2. start workers
for {
time.Sleep(time.Second * 2)
// time.Sleep(time.Second * 3) - this potential helps with ingestion
r.orchestrator.workers.Range(func(key, value interface{}) bool {
// if worker id exists in o.workers, then move on because the worker is already running.
if value.(bool) {
if value.(bool) { //nolint: forcetypeassert // values are always certain.
return true
}

// wait for a worker to become available
r.orchestrator.manager.Wait()

r.orchestrator.workers.Store(key.(string), true)
v, found := r.orchestrator.perIDQueue.Load(key.(string))
r.orchestrator.workers.Store(key.(string), true) //nolint: forcetypeassert // values are always certain.
v, found := r.orchestrator.perHostChan.Load(key.(string))
if !found {
return false
}
go r.worker(ctx, key.(string), v.(chan Task))
go r.worker(ctx, key.(string), v.(chan Task)) //nolint: forcetypeassert // values are always certain.
return true
})
}
Expand Down
4 changes: 2 additions & 2 deletions grpc/taskrunner/taskrunner.go
Expand Up @@ -44,8 +44,8 @@ func NewRunner(repo repository.Actions, maxWorkers int, workerIdleTimeout time.D
o := &orchestrator{
workers: sync.Map{},
fifoChan: make(chan string, 10000),
// perIDQueue is a map of hostID to a channel of tasks.
perIDQueue: sync.Map{},
// perHostChan is a map of hostID to a channel of tasks.
perHostChan: sync.Map{},
manager: newManager(maxWorkers),
workerIdleTimeout: workerIdleTimeout,
ingestChan: make(chan Task, 10000),
Expand Down
6 changes: 5 additions & 1 deletion pkg/http/http.go
Expand Up @@ -50,7 +50,11 @@ func (h *Server) Run(ctx context.Context) error {
_ = svr.Shutdown(ctx)
}()

return svr.ListenAndServe()
if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}

return nil
}

func NewServer(addr string) *Server {
Expand Down

0 comments on commit 73de1b2

Please sign in to comment.