Skip to content

Commit

Permalink
Clean up unused code:
Browse files Browse the repository at this point in the history
Update the metric server to handle
slow loris attacks.

Signed-off-by: Jacob Weinstock <jakobweinstock@gmail.com>
  • Loading branch information
jacobweinstock committed Sep 10, 2023
1 parent 4877220 commit e9d0f2f
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 223 deletions.
3 changes: 0 additions & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
Expand Down Expand Up @@ -110,8 +109,6 @@ var (
opts = append(opts, grpcsvr.WithSkipRedfishVersions(versions))
}

fmt.Println("maxWorkers", maxWorkers)

if err := grpcsvr.RunServer(ctx, logger, grpcServer, port, httpServer, opts...); err != nil {
logger.Error(err, "error running server")
os.Exit(1)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/tinkerbell/pbnj
go 1.20

require (
github.com/adrianbrad/queue v1.2.1
github.com/bmc-toolbox/bmclib v0.5.7
github.com/bmc-toolbox/bmclib/v2 v2.0.1-0.20230515164712-2714c7479477
github.com/cristalhq/jwt/v3 v3.1.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ github.com/VictorLowther/simplexml v0.0.0-20180716164440-0bff93621230 h1:t95Grn2
github.com/VictorLowther/simplexml v0.0.0-20180716164440-0bff93621230/go.mod h1:t2EzW1qybnPDQ3LR/GgeF0GOzHUXT5IVMLP2gkW1cmc=
github.com/VictorLowther/soap v0.0.0-20150314151524-8e36fca84b22 h1:a0MBqYm44o0NcthLKCljZHe1mxlN6oahCQHHThnSwB4=
github.com/VictorLowther/soap v0.0.0-20150314151524-8e36fca84b22/go.mod h1:/B7V22rcz4860iDqstGvia/2+IYWXf3/JdQCVd/1D2A=
github.com/adrianbrad/queue v1.2.1 h1:CEVsjFQyuR0s5Hty/HJGWBZHsJ3KMmii0kEgLeam/mk=
github.com/adrianbrad/queue v1.2.1/go.mod h1:wYiPC/3MPbyT45QHLrPR4zcqJWPePubM1oEP/xTwhUs=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion grpc/rpc/bmc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func setup() {
Ctx: ctx,
}

tr = taskrunner.NewRunner(repo, 100, 100, time.Second)
tr = taskrunner.NewRunner(repo, 100, time.Second)
tr.Start(ctx)
bmcService = BmcService{
TaskRunner: tr,
Expand Down
8 changes: 4 additions & 4 deletions grpc/rpc/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (m *MachineService) BootDevice(ctx context.Context, in *v1.DeviceRequest) (
defer cancel()
return mbd.BootDeviceSet(taskCtx, in.BootDevice.String(), in.Persistent, in.EfiBoot)
}
go m.TaskRunner.Execute(ctx, l, "setting boot device", taskID, in.GetAuthn().GetDirectAuthn().GetHost().GetHost(), execFunc)
m.TaskRunner.Execute(ctx, l, "setting boot device", taskID, in.GetAuthn().GetDirectAuthn().GetHost().GetHost(), execFunc)

return &v1.DeviceResponse{TaskId: taskID}, nil
}
Expand All @@ -64,14 +64,14 @@ func (m *MachineService) Power(ctx context.Context, in *v1.PowerRequest) (*v1.Po
l := logging.ExtractLogr(ctx)
taskID := xid.New().String()
l = l.WithValues("taskID", taskID, "bmcIP", in.GetAuthn().GetDirectAuthn().GetHost().GetHost())
/*l.Info(
l.Info(
"start Power request",
"username", in.GetAuthn().GetDirectAuthn().GetUsername(),
"vendor", in.Vendor.GetName(),
"powerAction", in.GetPowerAction().String(),
"softTimeout", in.SoftTimeout,
"OffDuration", in.OffDuration,
)*/
)

execFunc := func(s chan string) (string, error) {
mp, err := machine.NewPowerSetter(
Expand All @@ -89,7 +89,7 @@ func (m *MachineService) Power(ctx context.Context, in *v1.PowerRequest) (*v1.Po
defer cancel()
return mp.PowerSet(taskCtx, in.PowerAction.String())
}
go m.TaskRunner.Execute(ctx, l, "power action: "+in.GetPowerAction().String(), taskID, in.GetAuthn().GetDirectAuthn().GetHost().GetHost(), execFunc)
m.TaskRunner.Execute(ctx, l, "power action: "+in.GetPowerAction().String(), taskID, in.GetAuthn().GetDirectAuthn().GetHost().GetHost(), execFunc)

return &v1.PowerResponse{TaskId: taskID}, nil
}
3 changes: 2 additions & 1 deletion grpc/rpc/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func TestTaskFound(t *testing.T) {
t.Fatalf("expected no error, got: %v", err)
}

time.Sleep(time.Second)
time.Sleep(time.Second * 3)
taskReq := &v1.StatusRequest{TaskId: resp.TaskId}
taskResp, _ := taskService.Status(context.Background(), taskReq)
t.Logf("Got response: %+v", taskResp)
if taskResp.Id != resp.TaskId {
t.Fatalf("got: %+v", taskResp)
}
Expand Down
33 changes: 5 additions & 28 deletions grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package grpc

import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -42,9 +40,6 @@ type Server struct {
maxWorkers int
// workerIdleTimeout is the idle timeout for workers. If no tasks are received within the timeout, the worker will exit.
workerIdleTimeout time.Duration
// maxIngestionWorkers is the maximum number of concurrent workers that will be allowed.
// These are the workers that handle ingesting tasks from RPC endpoints and writing them to the map of per Host ID queues.
maxIngestionWorkers int
}

// ServerOption for setting optional values.
Expand Down Expand Up @@ -77,12 +72,6 @@ func WithWorkerIdleTimeout(t time.Duration) ServerOption {
return func(args *Server) { args.workerIdleTimeout = t }
}

// WithMaxIngestionWorkers sets the max number of concurrent workers that will be allowed.
// These are the workers that handle ingesting tasks from RPC endpoints and writing them to the map of per Host ID queues.
func WithMaxIngestionWorkers(max int) ServerOption {
return func(args *Server) { args.maxIngestionWorkers = max }
}

// RunServer registers all services and runs the server.
func RunServer(ctx context.Context, log logr.Logger, grpcServer *grpc.Server, port string, httpServer *http.Server, opts ...ServerOption) error {
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -97,19 +86,17 @@ func RunServer(ctx context.Context, log logr.Logger, grpcServer *grpc.Server, po
}

defaultServer := &Server{
Actions: repo,
bmcTimeout: oob.DefaultBMCTimeout,
maxWorkers: 1000,
workerIdleTimeout: time.Second * 30,
maxIngestionWorkers: 1000,
Actions: repo,
bmcTimeout: oob.DefaultBMCTimeout,
maxWorkers: 1000,
workerIdleTimeout: time.Second * 30,
}

for _, opt := range opts {
opt(defaultServer)
}
fmt.Printf("maxWorkers: %d\n", defaultServer.maxWorkers)

tr := taskrunner.NewRunner(repo, defaultServer.maxIngestionWorkers, defaultServer.maxWorkers, defaultServer.workerIdleTimeout)
tr := taskrunner.NewRunner(repo, defaultServer.maxWorkers, defaultServer.workerIdleTimeout)
tr.Start(ctx)

ms := rpc.MachineService{
Expand Down Expand Up @@ -165,16 +152,6 @@ func RunServer(ctx context.Context, log logr.Logger, grpcServer *grpc.Server, po
}
}()

msgChan := make(chan os.Signal)
signal.Notify(msgChan, syscall.SIGUSR1)
go func() {
for range msgChan {
fmt.Println("======")
tr.Print()
fmt.Println("======")
}
}()

go func() {
<-ctx.Done()
log.Info("ctx cancelled, shutting down PBnJ")
Expand Down
128 changes: 0 additions & 128 deletions grpc/taskrunner/queue.go

This file was deleted.

44 changes: 10 additions & 34 deletions grpc/taskrunner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,15 @@ type orchestrator struct {
workers sync.Map
manager *concurrencyManager
workerIdleTimeout time.Duration

ingestManager *concurrencyManager

fifoQueue *hostQueue
fifoChan chan host
ingestionQueue *IngestQueue
fifoChan chan string
// perIDQueue is a map of hostID to a channel of tasks.
perIDQueue sync.Map

//testing new stuff
ingestChan chan Task
}

func (r *Runner) Print() {
one := r.orchestrator.ingestionQueue.Size()
two := r.orchestrator.fifoQueue.Size()
var three int
r.orchestrator.perIDQueue.Range(func(key, value interface{}) bool {
three++
return true
})
fmt.Printf("ingestion queue size: %d\n", one)
fmt.Printf("fcfs queue size: %d\n", two)
fmt.Printf("perID queue size: %d\n", three)
}

// ingest take a task off the ingestion queue and puts it on the perID queue
// and adds the host ID to the fcfs queue.
func (r *Runner) ingest(ctx context.Context) {
//func (o *orchestrator) ingest(ctx context.Context) {
// dequeue from ingestion queue
// enqueue to perID queue
// enqueue to fcfs queue
Expand Down Expand Up @@ -81,7 +60,6 @@ func (r *Runner) orchestrate(ctx context.Context) {
// 2. start workers
for {
time.Sleep(time.Second * 2)
// r.orchestrator.perIDQueue.Range(func(key, value interface{}) bool {
r.orchestrator.workers.Range(func(key, value interface{}) bool {
// if worker id exists in o.workers, then move on because the worker is already running.
if value.(bool) {
Expand All @@ -92,7 +70,10 @@ func (r *Runner) orchestrate(ctx context.Context) {
r.orchestrator.manager.Wait()

r.orchestrator.workers.Store(key.(string), true)
v, _ := r.orchestrator.perIDQueue.Load(key.(string))
v, found := r.orchestrator.perIDQueue.Load(key.(string))
if !found {
return false
}
go r.worker(ctx, key.(string), v.(chan Task))
return true
})
Expand All @@ -101,25 +82,20 @@ func (r *Runner) orchestrate(ctx context.Context) {

func (r *Runner) worker(ctx context.Context, hostID string, q chan Task) {
defer r.orchestrator.manager.Done()
defer func() {
r.orchestrator.workers.Range(func(key, value interface{}) bool {
if key.(string) == hostID { //nolint:forcetypeassert // good
r.orchestrator.workers.Delete(key.(string))
return true //nolint:revive // this is needed to satisfy the func parameter
}
return true //nolint:revive // this is needed to satisfy the func parameter
})

}()
defer r.orchestrator.workers.Delete(hostID)

for {
select {
case <-ctx.Done():
// TODO: check queue length before returning maybe?
// For 175000 tasks, i found there would occasionally be 1 or 2 that didnt get processed.
// still seemed to be in the queue/chan.
return
case t := <-q:
r.process(ctx, t.Log, t.Description, t.ID, t.Action)
metrics.PerIDQueue.WithLabelValues(hostID).Dec()
case <-time.After(r.orchestrator.workerIdleTimeout):
// TODO: check queue length returning maybe?
return
}
}
Expand Down

0 comments on commit e9d0f2f

Please sign in to comment.