Skip to content

Commit

Permalink
feat(api): add external service (#3055)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored and fsamin committed Jul 23, 2018
1 parent 6573e5d commit 578ae13
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 199 deletions.
14 changes: 5 additions & 9 deletions engine/api/admin.go
Expand Up @@ -45,13 +45,12 @@ func (api *API) deleteAdminMaintenanceHandler() Handler {

func (api *API) getAdminServicesHandler() Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
q := services.Querier(api.mustDB(), api.Cache)
srvs := []sdk.Service{}
var err error
if r.FormValue("type") != "" {
srvs, err = q.FindByType(r.FormValue("type"))
srvs, err = services.FindByType(api.mustDB(), r.FormValue("type"))
} else {
srvs, err = q.All()
srvs, err = services.All(api.mustDB())
}

if err != nil {
Expand All @@ -65,8 +64,7 @@ func (api *API) getAdminServiceHandler() Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
name := vars["name"]
q := services.Querier(api.mustDB(), api.Cache)
srv, err := q.FindByName(name)
srv, err := services.FindByName(api.mustDB(), name)
if err != nil {
return sdk.WrapError(err, "getAdminServiceHandler")
}
Expand All @@ -92,8 +90,7 @@ func (api *API) putAdminServiceCallHandler() Handler {

func selectDeleteAdminServiceCallHandler(api *API, method string) Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
q := services.Querier(api.mustDB(), api.Cache)
srvs, err := q.FindByType(r.FormValue("type"))
srvs, err := services.FindByType(api.mustDB(), r.FormValue("type"))
if err != nil {
return sdk.WrapError(err, "selectDeleteAdminServiceCallHandler")
}
Expand All @@ -117,8 +114,7 @@ func selectDeleteAdminServiceCallHandler(api *API, method string) Handler {

func putPostAdminServiceCallHandler(api *API, method string) Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
q := services.Querier(api.mustDB(), api.Cache)
srvs, err := q.FindByType(r.FormValue("type"))
srvs, err := services.FindByType(api.mustDB(), r.FormValue("type"))
if err != nil {
return sdk.WrapError(err, "putPostAdminServiceCallHandler")
}
Expand Down
34 changes: 33 additions & 1 deletion engine/api/api.go
Expand Up @@ -143,6 +143,7 @@ type Configuration struct {
ConfigurationKey string `toml:"configurationKey"`
} `toml:"vault"`
Providers []ProviderConfiguration `toml:"providers" comment:"###########################\n CDS Providers Settings \n##########################"`
Services []ServiceConfiguration `toml:"services" comment:"###########################\n CDS Providers Settings \n##########################"`
Tracing tracing.Configuration `toml:"tracing" comment:"###########################\n CDS Tracing Settings \n##########################"`
DefaultOS string `toml:"defaultOS" default:"linux" comment:"if no model and os/arch is specified in your job's requirements then spawn worker on this operating system (example: freebsd, linux, windows)"`
DefaultArch string `toml:"defaultArch" default:"amd64" comment:"if no model and no os/arch is specified in your job's requirements then spawn worker on this architecture (example: amd64, arm, 386)"`
Expand All @@ -154,6 +155,16 @@ type ProviderConfiguration struct {
Token string `toml:"token"`
}

// ServiceConfiguration is the configuration of external service
type ServiceConfiguration struct {
Name string `toml:"name"`
URL string `toml:"url"`
Port int `toml:"port"`
HealthPath string `toml:"healthPath"`
HealthPort int `toml:"healthPort"`
Type string `toml:"type"`
}

// DefaultValues is the struc for API Default configuration default values
type DefaultValues struct {
ServerSecretsKey string
Expand Down Expand Up @@ -597,7 +608,7 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine("repositoriesmanager.ReceiveEvents", func() { repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("action.RequirementsCacheLoader", func() { action.RequirementsCacheLoader(ctx, 5*time.Second, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("hookRecoverer(ctx", func() { hookRecoverer(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("services.KillDeadServices", func() { services.KillDeadServices(ctx, services.NewRepository(a.mustDB, a.Cache)) })
sdk.GoRoutine("services.KillDeadServices", func() { services.KillDeadServices(ctx, a.mustDB) })
sdk.GoRoutine("poller.Initialize", func() { poller.Initialize(ctx, a.Cache, 10, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("migrate.CleanOldWorkflow", func() { migrate.CleanOldWorkflow(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Config.URL.API) })
sdk.GoRoutine("migrate.KeyMigration", func() { migrate.KeyMigration(a.Cache, a.DBConnectionFactory.GetDBMap, &sdk.User{Admin: true}) })
Expand All @@ -615,6 +626,27 @@ func (a *API) Serve(ctx context.Context) error {
}()
}

// Init Services
externalServices := make([]sdk.ExternalService, 0, len(a.Config.Services))
for _, s := range a.Config.Services {
serv := sdk.ExternalService{
Service: sdk.Service{
Name: s.Name,
Type: s.Type,
HTTPURL: fmt.Sprintf("%s:%d", s.URL, s.Port),
},
HealthPort: s.HealthPort,
HealthPath: s.HealthPath,
Port: s.Port,
URLWithoutPort: s.URL,
}
externalServices = append(externalServices, serv)
}
if err := services.InitExternal(a.mustDB, a.Cache, externalServices); err != nil {
return fmt.Errorf("unable to init external service: %v", err)
}
sdk.GoRoutine("pings-external-services", func() { services.Pings(ctx, a.mustDB, externalServices) })

// TODO: to delete after migration
if os.Getenv("CDS_MIGRATE_GIT_CLONE") == "true" {
go func() {
Expand Down
28 changes: 9 additions & 19 deletions engine/api/ascode_test.go
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"testing"

"github.com/go-gorp/gorp"
"github.com/ovh/cds/sdk"
izanami "github.com/ovhlabs/izanami-go-client"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -58,15 +57,12 @@ func Test_postImportAsCodeHandler(t *testing.T) {
},
}))

repositoryService := services.NewRepository(func() *gorp.DbMap {
return db
}, api.Cache)
mockService := &sdk.Service{Name: "Test_postImportAsCodeHandler", Type: services.TypeRepositories}
repositoryService.Delete(mockService)
test.NoError(t, repositoryService.Insert(mockService))
_ = services.Delete(api.mustDB(), mockService)
test.NoError(t, services.Insert(api.mustDB(), mockService))

mockVCSservice := &sdk.Service{Name: "Test_VCSService", Type: services.TypeVCS}
test.NoError(t, repositoryService.Insert(mockVCSservice))
test.NoError(t, services.Insert(api.mustDB(), mockVCSservice))

//This is a mock for the repositories service
services.HTTPClient = mock(
Expand Down Expand Up @@ -167,12 +163,9 @@ func Test_getImportAsCodeHandler(t *testing.T) {
api, db, _ := newTestAPI(t)
u, pass := assets.InsertAdminUser(db)

repositoryService := services.NewRepository(func() *gorp.DbMap {
return db
}, api.Cache)
mockService := &sdk.Service{Name: "Test_getImportAsCodeHandler", Type: services.TypeRepositories}
repositoryService.Delete(mockService)
test.NoError(t, repositoryService.Insert(mockService))
_ = services.Delete(db, mockService)
test.NoError(t, services.Insert(db, mockService))

UUID := sdk.UUID()

Expand Down Expand Up @@ -233,16 +226,13 @@ func Test_postPerformImportAsCodeHandler(t *testing.T) {
pkey := sdk.RandomString(10)
_ = assets.InsertTestProject(t, db, api.Cache, pkey, pkey, u)

repositoryService := services.NewRepository(func() *gorp.DbMap {
return db
}, api.Cache)
mockService := &sdk.Service{Name: "Test_postPerformImportAsCodeHandler_Repo", Type: services.TypeRepositories}
repositoryService.Delete(mockService)
test.NoError(t, repositoryService.Insert(mockService))
_ = services.Delete(db, mockService)
test.NoError(t, services.Insert(db, mockService))

mockService = &sdk.Service{Name: "Test_postPerformImportAsCodeHandler_VCS", Type: services.TypeHooks}
repositoryService.Delete(mockService)
test.NoError(t, repositoryService.Insert(mockService))
services.Delete(db, mockService)
test.NoError(t, services.Insert(db, mockService))

UUID := sdk.UUID()

Expand Down
3 changes: 1 addition & 2 deletions engine/api/auth/auth.go
Expand Up @@ -118,8 +118,7 @@ func GetService(db *gorp.DbMap, store cache.Store, hash string) (*sdk.Service, e
// Else load it from DB
if !store.Get(key, srv) {
var err error
repo := services.NewRepository(func() *gorp.DbMap { return db }, store)
srv, err = repo.FindByHash(hash)
srv, err = services.FindByHash(db, hash)
if err != nil {
return nil, fmt.Errorf("cannot load service: %s", err)
}
Expand Down
8 changes: 2 additions & 6 deletions engine/api/event/elasticsearch.go
Expand Up @@ -15,8 +15,6 @@ import (

// PushInElasticSearch pushes event to an elasticsearch
func PushInElasticSearch(c context.Context, db gorp.SqlExecutor, store cache.Store) {
querier := services.Querier(db, store)

eventChan := make(chan sdk.Event, 10)
Subscribe(eventChan)

Expand All @@ -28,8 +26,7 @@ func PushInElasticSearch(c context.Context, db gorp.SqlExecutor, store cache.Sto
return
}
case e := <-eventChan:

esServices, errS := querier.FindByType(services.TypeElasticsearch)
esServices, errS := services.FindByType(db, services.TypeElasticsearch)
if errS != nil {
log.Error("PushInElasticSearch> Unable to get elasticsearch service: %v", errS)
continue
Expand All @@ -55,8 +52,7 @@ func PushInElasticSearch(c context.Context, db gorp.SqlExecutor, store cache.Sto

// GetEvent retrieves events from elasticsearch
func GetEvents(db gorp.SqlExecutor, store cache.Store, filters sdk.EventFilter) ([]json.RawMessage, error) {
querier := services.Querier(db, store)
srvs, err := querier.FindByType(services.TypeElasticsearch)
srvs, err := services.FindByType(db, services.TypeElasticsearch)
if err != nil {
return nil, sdk.WrapError(err, "GetEvent> Unable to get elasticsearch service")
}
Expand Down
15 changes: 5 additions & 10 deletions engine/api/repositoriesmanager/repositories_manager.go
Expand Up @@ -21,8 +21,7 @@ import (

//LoadAll Load all RepositoriesManager from the database
func LoadAll(db *gorp.DbMap, store cache.Store) ([]string, error) {
serviceDAO := services.NewRepository(func() *gorp.DbMap { return db }, store)
srvs, err := serviceDAO.FindByType(services.TypeVCS)
srvs, err := services.FindByType(db, services.TypeVCS)
if err != nil {
return nil, sdk.WrapError(err, "repositoriesmanager.LoadAll> Unable to load services")
}
Expand Down Expand Up @@ -77,8 +76,7 @@ func NewVCSServerConsumer(dbFunc func() *gorp.DbMap, store cache.Store, name str
}

func (c *vcsConsumer) AuthorizeRedirect() (string, string, error) {
srvDAO := services.Querier(c.dbFunc(), c.cache)
srv, err := srvDAO.FindByType(services.TypeVCS)
srv, err := services.FindByType(c.dbFunc(), services.TypeVCS)
if err != nil {
return "", "", err
}
Expand All @@ -94,8 +92,7 @@ func (c *vcsConsumer) AuthorizeRedirect() (string, string, error) {
}

func (c *vcsConsumer) AuthorizeToken(token string, secret string) (string, string, error) {
srvDAO := services.Querier(c.dbFunc(), c.cache)
srv, err := srvDAO.FindByType(services.TypeVCS)
srv, err := services.FindByType(c.dbFunc(), services.TypeVCS)
if err != nil {
return "", "", err
}
Expand All @@ -120,8 +117,7 @@ func (c *vcsConsumer) GetAuthorizedClient(token string, secret string) (sdk.VCSA
return nil, sdk.ErrNoReposManagerClientAuth
}

servicesDao := services.Querier(c.dbFunc(), c.cache)
srvs, err := servicesDao.FindByType(services.TypeVCS)
srvs, err := services.FindByType(c.dbFunc(), services.TypeVCS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,8 +170,7 @@ func AuthorizedClient(db gorp.SqlExecutor, store cache.Store, repo *sdk.ProjectV
return vcs, nil
}

servicesDao := services.Querier(db, store)
srvs, err := servicesDao.FindByType(services.TypeVCS)
srvs, err := services.FindByType(db, services.TypeVCS)
if err != nil {
return nil, err
}
Expand Down
41 changes: 19 additions & 22 deletions engine/api/services.go
Expand Up @@ -5,6 +5,8 @@ import (
"net/http"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/event"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/services"
Expand Down Expand Up @@ -34,13 +36,14 @@ func (api *API) postServiceRegisterHandler() Handler {
}

//Insert or update the service
repo := services.NewRepository(api.mustDB, api.Cache)
if err := repo.Begin(); err != nil {
tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "postServiceRegisterHandler")
}
defer tx.Rollback() // nolint

//Try to find the service, and keep; else generate a new one
oldSrv, errOldSrv := repo.FindByName(srv.Name)
oldSrv, errOldSrv := services.FindByName(tx, srv.Name)
if oldSrv != nil {
srv.Hash = oldSrv.Hash
} else if errOldSrv == sdk.ErrNotFound {
Expand All @@ -57,19 +60,17 @@ func (api *API) postServiceRegisterHandler() Handler {
srv.LastHeartbeat = time.Now()
srv.Token = ""

defer repo.Rollback()

if oldSrv != nil {
if err := repo.Update(srv); err != nil {
if err := services.Update(tx, srv); err != nil {
return sdk.WrapError(err, "postServiceRegisterHandler> Unable to update service %s", srv.Name)
}
} else {
if err := repo.Insert(srv); err != nil {
if err := services.Insert(tx, srv); err != nil {
return sdk.WrapError(err, "postServiceRegisterHandler> Unable to insert service %s", srv.Name)
}
}

if err := repo.Commit(); err != nil {
if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "postServiceRegisterHandler")
}

Expand All @@ -80,16 +81,14 @@ func (api *API) postServiceRegisterHandler() Handler {
func (api *API) serviceAPIHeartbeat(c context.Context) {
tick := time.NewTicker(30 * time.Second).C

repo := services.NewRepository(api.mustDB, api.Cache)

hash, errsession := sessionstore.NewSessionKey()
if errsession != nil {
log.Error("serviceAPIHeartbeat> Unable to create session:%v", errsession)
return
}

// first call
api.serviceAPIHeartbeatUpdate(c, repo, hash)
api.serviceAPIHeartbeatUpdate(c, api.mustDB(), hash)

for {
select {
Expand All @@ -99,16 +98,18 @@ func (api *API) serviceAPIHeartbeat(c context.Context) {
return
}
case <-tick:
api.serviceAPIHeartbeatUpdate(c, repo, hash)
api.serviceAPIHeartbeatUpdate(c, api.mustDB(), hash)
}
}
}

func (api *API) serviceAPIHeartbeatUpdate(c context.Context, repo *services.Repository, hash sessionstore.SessionKey) {
if err := repo.Begin(); err != nil {
func (api *API) serviceAPIHeartbeatUpdate(c context.Context, db *gorp.DbMap, hash sessionstore.SessionKey) {
tx, err := db.Begin()
if err != nil {
log.Error("serviceAPIHeartbeat> error on repo.Begin:%v", err)
return
}
defer tx.Rollback() // nolint

srv := &sdk.Service{
Name: event.GetCDSName(),
Expand All @@ -119,30 +120,26 @@ func (api *API) serviceAPIHeartbeatUpdate(c context.Context, repo *services.Repo
}

//Try to find the service, and keep; else generate a new one
oldSrv, errOldSrv := repo.FindByName(srv.Name)
oldSrv, errOldSrv := services.FindByName(tx, srv.Name)
if errOldSrv != nil && errOldSrv != sdk.ErrNotFound {
log.Error("serviceAPIHeartbeat> Unable to find by name:%v", errOldSrv)
repo.Rollback()
return
}

if oldSrv != nil {
if err := repo.Update(srv); err != nil {
if err := services.Update(tx, srv); err != nil {
log.Error("serviceAPIHeartbeat> Unable to update service %s: %v", srv.Name, err)
repo.Rollback()
return
}
} else {
if err := repo.Insert(srv); err != nil {
if err := services.Insert(tx, srv); err != nil {
log.Error("serviceAPIHeartbeat> Unable to insert service %s: %v", srv.Name, err)
repo.Rollback()
return
}
}

if err := repo.Commit(); err != nil {
if err := tx.Commit(); err != nil {
log.Error("serviceAPIHeartbeat> error on repo.Commit: %v", err)
repo.Rollback()
return
}
}

0 comments on commit 578ae13

Please sign in to comment.