Skip to content

Commit

Permalink
feat(engine): hatchery as a uservice - auth (#3326)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault committed Sep 20, 2018
1 parent f8b1e57 commit 5470fc5
Show file tree
Hide file tree
Showing 86 changed files with 1,857 additions and 4,927 deletions.
11 changes: 0 additions & 11 deletions cli/cdsctl/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (
health = cli.NewCommand(healthCmd, nil,
[]*cobra.Command{
cli.NewListCommand(healthStatusCmd, healthStatusRun, nil),
cli.NewGetCommand(healthMonDBTimesCmd, healthMonDBTimesRun, nil),
cli.NewListCommand(healthMonDBMigrateCmd, healthMonDBMigrateRun, nil),
})
)
Expand All @@ -33,16 +32,6 @@ func healthStatusRun(v cli.Values) (cli.ListResult, error) {
return cli.AsListResult(s.Lines), nil
}

var healthMonDBTimesCmd = cli.Command{
Name: "dbtimes",
Short: "Show some DB Times",
}

func healthMonDBTimesRun(v cli.Values) (interface{}, error) {
s, err := client.MonDBTimes()
return *s, err
}

var healthMonDBMigrateCmd = cli.Command{
Name: "dbmigrate",
Short: "Show DB Migrate status",
Expand Down
6 changes: 3 additions & 3 deletions cli/cdsctl/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (ui *Termui) computeStatusHatcheriesWorkers(workers []sdk.Worker) {

for _, w := range workers {
var name string
if w.HatcheryID == 0 {
if w.HatcheryName == "" {
name = "Without hatchery"
} else {
name = w.HatcheryName
Expand Down Expand Up @@ -444,7 +444,7 @@ func (ui *Termui) updateQueue(baseURL string) string {
idx++
}
start = time.Now()
nWJobs, errw := client.QueueCountWorkflowNodeJobRun(nil, nil)
nWJobs, errw := client.QueueCountWorkflowNodeJobRun(nil, nil, "", nil)
elapsed = time.Since(start)
if errw != nil {
ui.msg = fmt.Sprintf("[%s](bg-red)", errw.Error())
Expand All @@ -464,7 +464,7 @@ func (ui *Termui) updateQueue(baseURL string) string {
return msg
}

func (ui *Termui) updateQueueJob(idx int, maxQueued time.Duration, id int64, isWJob bool, parameters []sdk.Parameter, executedJob sdk.ExecutedJob, queued time.Time, bookedBy sdk.Hatchery, baseURL, status string) (string, time.Duration) {
func (ui *Termui) updateQueueJob(idx int, maxQueued time.Duration, id int64, isWJob bool, parameters []sdk.Parameter, executedJob sdk.ExecutedJob, queued time.Time, bookedBy sdk.Service, baseURL, status string) (string, time.Duration) {
req := ""
for _, r := range executedJob.Job.Action.Requirements {
req += fmt.Sprintf("%s:%s ", r.Type, r.Value)
Expand Down
10 changes: 4 additions & 6 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/ovh/cds/engine/api/database"
"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/feature"
"github.com/ovh/cds/engine/api/hatchery"
"github.com/ovh/cds/engine/api/hook"
"github.com/ovh/cds/engine/api/mail"
"github.com/ovh/cds/engine/api/metrics"
Expand Down Expand Up @@ -377,9 +376,9 @@ func getAgent(r *http.Request) string {
return r.Header.Get("User-Agent")
}

func isHatcheryOrWorker(r *http.Request) bool {
func isServiceOrWorker(r *http.Request) bool {
switch getAgent(r) {
case sdk.HatcheryAgent:
case sdk.ServiceAgent:
return true
case sdk.WorkerAgent:
return true
Expand All @@ -400,12 +399,12 @@ func getWorker(c context.Context) *sdk.Worker {
return u
}

func getHatchery(c context.Context) *sdk.Hatchery {
func getHatchery(c context.Context) *sdk.Service {
i := c.Value(auth.ContextHatchery)
if i == nil {
return nil
}
u, ok := i.(*sdk.Hatchery)
u, ok := i.(*sdk.Service)
if !ok {
return nil
}
Expand Down Expand Up @@ -635,7 +634,6 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine("warning.Start", func() { warning.Start(ctx, a.DBConnectionFactory.GetDBMap, a.warnChan) })
sdk.GoRoutine("queue.Pipelines", func() { queue.Pipelines(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("pipeline.AWOLPipelineKiller", func() { pipeline.AWOLPipelineKiller(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("hatchery.Heartbeat", func() { hatchery.Heartbeat(ctx, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("auditCleanerRoutine(ctx", func() { auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("metrics.Initialize", func() { metrics.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.Name) })
sdk.GoRoutine("repositoriesmanager.ReceiveEvents", func() { repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
Expand Down
3 changes: 0 additions & 3 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ func (api *API) InitRouter() {
r.Handle("/group/{permGroupName}/token/{tokenid}", r.DELETE(api.deleteTokenHandler))

// Hatchery
r.Handle("/hatchery", r.POST(api.registerHatcheryHandler, Auth(false)))
r.Handle("/hatchery/count/{workflowNodeRunID}", r.GET(api.hatcheryCountHandler))
r.Handle("/hatchery/{id}", r.PUT(api.refreshHatcheryHandler))

// Hooks
r.Handle("/hook", r.POST(api.receiveHookHandler, Auth(false) /* Public handler called by third parties */))
Expand All @@ -100,7 +98,6 @@ func (api *API) InitRouter() {
r.Handle("/mon/smtp/ping", r.GET(api.smtpPingHandler, Auth(true)))
r.Handle("/mon/version", r.GET(VersionHandler, Auth(false)))
r.Handle("/mon/db/migrate", r.GET(api.getMonDBStatusMigrateHandler, NeedAdmin(true)))
r.Handle("/mon/db/times", r.GET(api.getMonDBTimesDBHandler, NeedAdmin(true)))
r.Handle("/mon/building", r.GET(api.getBuildingPipelinesHandler))
r.Handle("/mon/building/{hash}", r.GET(api.getPipelineBuildingCommitHandler))
r.Handle("/mon/metrics", r.GET(api.getMetricsHandler, Auth(false)))
Expand Down
32 changes: 12 additions & 20 deletions engine/api/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/hatchery"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/api/worker"
Expand Down Expand Up @@ -122,9 +122,12 @@ func GetService(db *gorp.DbMap, store cache.Store, hash string) (*sdk.Service, e
if err != nil {
return nil, fmt.Errorf("cannot load service: %s", err)
}
if srv.GroupID != nil && group.SharedInfraGroup.ID == *srv.GroupID {
srv.IsSharedInfra = true
srv.Uptodate = srv.Version == sdk.VERSION
}
store.Set(key, srv)
}

return srv, nil
}

Expand Down Expand Up @@ -165,31 +168,20 @@ func CheckServiceAuth(ctx context.Context, db *gorp.DbMap, store cache.Store, he
}

serviceHash := string(id)
if serviceHash == "" {
return ctx, fmt.Errorf("bad service id")
}

srv, err := GetService(db, store, serviceHash)
if err != nil {
return ctx, err
}

ctx = context.WithValue(ctx, ContextUser, &sdk.User{Username: srv.Name})
ctx = context.WithValue(ctx, ContextService, srv)
return ctx, nil
}

// CheckHatcheryAuth checks hatchery authentication
func CheckHatcheryAuth(ctx context.Context, db *gorp.DbMap, headers http.Header) (context.Context, error) {
uid, err := base64.StdEncoding.DecodeString(headers.Get(sdk.AuthHeader))
if err != nil {
return ctx, fmt.Errorf("bad worker key syntax: %s", err)
}

name := headers.Get(cdsclient.RequestedNameHeader)
h, err := hatchery.LoadHatchery(db, string(uid), name)
if err != nil {
return ctx, fmt.Errorf("Invalid Hatchery UID:%s name:%s err:%s", string(uid), name, err)
if srv.Type == services.TypeHatchery {
ctx = context.WithValue(ctx, ContextHatchery, srv)
} else {
ctx = context.WithValue(ctx, ContextService, srv)
}

ctx = context.WithValue(ctx, ContextUser, &sdk.User{Username: h.Name})
ctx = context.WithValue(ctx, ContextHatchery, h)
return ctx, nil
}
7 changes: 5 additions & 2 deletions engine/api/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,11 @@ func (api *API) getQueueHandler() service.Handler {
var queue []sdk.PipelineBuildJob
var errQ error
switch getAgent(r) {
case sdk.HatcheryAgent:
queue, errQ = pipeline.LoadGroupWaitingQueue(api.mustDB(), api.Cache, getHatchery(ctx).GroupID)
case sdk.ServiceAgent:
h := getHatchery(ctx)
if h != nil && h.GroupID != nil {
queue, errQ = pipeline.LoadGroupWaitingQueue(api.mustDB(), api.Cache, *h.GroupID)
}
case sdk.WorkerAgent:
queue, errQ = pipeline.LoadGroupWaitingQueue(api.mustDB(), api.Cache, getWorker(ctx).GroupID)
default:
Expand Down
25 changes: 17 additions & 8 deletions engine/api/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import (
"strconv"
"testing"

"github.com/ovh/cds/sdk/namesgenerator"
"github.com/stretchr/testify/assert"

"github.com/ovh/cds/engine/api/application"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/hatchery"
"github.com/ovh/cds/engine/api/pipeline"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/api/test"
"github.com/ovh/cds/engine/api/test/assets"
"github.com/ovh/cds/engine/api/token"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/namesgenerator"
)

func Test_updateStepStatusHandler(t *testing.T) {
Expand Down Expand Up @@ -188,7 +189,7 @@ func Test_addSpawnInfosPipelineBuildJobHandler(t *testing.T) {
}

h := http.Header{}
h.Set("User-Agent", string(sdk.HatcheryAgent))
h.Set("User-Agent", string(sdk.ServiceAgent))

tk, errg := token.GenerateToken()
if errg != nil {
Expand All @@ -198,12 +199,20 @@ func Test_addSpawnInfosPipelineBuildJobHandler(t *testing.T) {
t.Fatal(err)
}

//Generate a hash
hash, errsession := sessionstore.NewSessionKey()
if errsession != nil {
t.Fatal(errsession)
}

name := "HATCHERY_TEST_" + namesgenerator.GetRandomNameCDS(0)
hatch := sdk.Hatchery{
hatch := sdk.Service{
Name: name,
GroupID: g.ID,
Type: services.TypeHatchery,
GroupID: &g.ID,
Hash: string(hash),
}
if err := hatchery.InsertHatchery(api.mustDB(), &hatch); err != nil {
if err := services.Insert(api.mustDB(), &hatch); err != nil {
t.Fatal(err)
}

Expand All @@ -218,11 +227,11 @@ func Test_addSpawnInfosPipelineBuildJobHandler(t *testing.T) {
uri := router.GetRoute("POST", api.addSpawnInfosPipelineBuildJobHandler, vars)
req, err := http.NewRequest("POST", uri, body)
test.NoError(t, err)
basedHash := base64.StdEncoding.EncodeToString([]byte(hatch.UID))
basedHash := base64.StdEncoding.EncodeToString([]byte(hatch.Hash))
req.Header.Add(sdk.AuthHeader, basedHash)
req.Header.Add(cdsclient.RequestedNameHeader, name)
req.Header.Add(sdk.SessionTokenHeader, tk)
req.Header.Add("User-Agent", sdk.HatcheryAgent)
req.Header.Add("User-Agent", sdk.ServiceAgent)

// Do the request
w := httptest.NewRecorder()
Expand Down
60 changes: 2 additions & 58 deletions engine/api/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,19 @@ import (
"context"
"net/http"

"github.com/gorilla/mux"

"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/hatchery"
"github.com/ovh/cds/engine/api/token"
"github.com/ovh/cds/engine/api/services"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

func (api *API) registerHatcheryHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
hatch := sdk.Hatchery{}
if err := UnmarshalBody(r, &hatch); err != nil {
return err
}

// Load token
tk, err := token.LoadToken(api.mustDB(), hatch.UID)
if err != nil {
return sdk.WrapError(sdk.ErrUnauthorized, "registerHatcheryHandler> Invalid token")
}
hatch.GroupID = tk.GroupID
hatch.IsSharedInfra = tk.GroupID == group.SharedInfraGroup.ID

oldH, errL := hatchery.LoadHatcheryByNameAndToken(api.mustDB(), hatch.Name, tk.Token)
if errL != nil && errL != sdk.ErrNoHatchery {
return sdk.WrapError(err, "registerHatcheryHandler> Cannot load hatchery %s", hatch.Name)
}

if oldH != nil {
hatch.ID = oldH.ID
hatch.Model.ID = oldH.Model.ID
if err := hatchery.Update(api.mustDB(), hatch); err != nil {
return sdk.WrapError(err, "registerHatcheryHandler> Cannot insert new hatchery")
}
} else {
if err := hatchery.InsertHatchery(api.mustDB(), &hatch); err != nil {
return sdk.WrapError(err, "registerHatcheryHandler> Cannot insert new hatchery")
}
}

hatch.Uptodate = hatch.Version == sdk.VERSION

log.Debug("registerHatcheryHandler> Welcome %d", hatch.ID)
return service.WriteJSON(w, hatch, http.StatusOK)
}
}

func (api *API) refreshHatcheryHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
hatcheryID := vars["id"]

if err := hatchery.RefreshHatchery(api.mustDB(), hatcheryID); err != nil {
return sdk.WrapError(err, "refreshHatcheryHandler> cannot refresh last beat of %s", hatcheryID)
}
return nil
}
}

func (api *API) hatcheryCountHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
wfNodeRunID, err := requestVarInt(r, "workflowNodeRunID")
if err != nil {
return sdk.WrapError(err, "cannot convert workflow node run ID")
}

count, err := hatchery.CountHatcheries(api.mustDB(), wfNodeRunID)
count, err := services.CountHatcheries(api.mustDB(), wfNodeRunID)
if err != nil {
return sdk.WrapError(err, "hatcheryCountHandler> cannot get hatcheries count")
}
Expand Down
Loading

0 comments on commit 5470fc5

Please sign in to comment.