Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrating the whole project from xcontext to go-belt. #164

Merged
merged 5 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions SPAG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@ In this example we see three steps, one of which uses `cmd` step plugin and two

Step plugin is a Go struct which implements `TestStep` interface. The interface is defined in [pkg/test/step.go](https://github.com/linuxboot/contest/blob/master/pkg/test/step.go) and it looks like this:

```
```go
// TestStep is the interface that all steps need to implement to be executed
// by the TestRunner
type TestStep interface {
// Name returns the name of the step
Name() string
// Run runs the test step. The test step is expected to be synchronous.
Run(ctx xcontext.Context, ch TestStepChannels, params TestStepParameters, ev testevent.Emitter,
Run(ctx context.Context, ch TestStepChannels, params TestStepParameters, ev testevent.Emitter,
resumeState json.RawMessage) (json.RawMessage, error)
// ValidateParameters checks that the parameters are correct before passing
// them to Run.
ValidateParameters(ctx xcontext.Context, params TestStepParameters) error
ValidateParameters(ctx context.Context, params TestStepParameters) error
}
```

`Name` method returns a string - name of the plugin we saw in the descriptor. There is no hard limit for the name’s length, but it should be concise and expose plugin’s purpose. It is **case insensitive** and must be **unique** among registered step plugins.
`Run` is the heart of the plugin. It’s a method where the main code lives. It’s discussed in the “Implementing Run()” chapter. `ValidateParameters`, yeah, validates parameters and we will discuss it in the “Working with parameters” chapter.
There are two additional functions, which also should be implemented: `New` and `Load`. Usually they reside in the same module with the interface implementation. Function `New` returns a reference to a new instance of the plugin (implementation of the TestStep interface) and will be used by Contest to create plugin instances. Function Load is an entry point of the plugin. It’s used during registration process and reports everything Contest needs to know about our plugin. Implementation of the Load function could look like this:

```
```go
// Load returns the name, factory and events which are needed to register the step.
func Load() (string, test.TestStepFactory, []event.Name) {
return Name, New, Events
Expand Down Expand Up @@ -91,8 +91,8 @@ Plugin specific parameters can be passed to a plugin instance through `parameter

Sometimes Contest may need to pause a job. The most common reason for this is restart of the contest process itself. As a plugin author you may need to assist in orderly shutdown if possible. There are two things that can happen to your test step plugin:

* A **cancellation** may come in (`ctx.Done()`). This means the job is cancelled or failed entirely and nothing can be done to save it. Plugins SHOULD immediately cease all actions and return. Plugins MUST return within roughly 30s. Plugins do not have to deal with targets/DUTs, you don’t need to put them into any channels or similar - the job is dead. Return `xcontext.ErrCancelled`
* A **pause request** may come in. This means the contest server wants to restart. Not all plugins can support this easily, you can choose to ignore the pause request and continue as normal, however we want to minimize or completely get rid of these cases in the future. Plugin can either ignore the signal or react to it by serializing the state and returning it as `json.RawMessage` together with `xcontext.ErrPaused`. If you do this, the framework will call Run again after the server has restarted and give your json back in `resumeState`. Note targets will not be re-injected, you need to remember them. However, new targets can still arrive after the resumption. You don’t need to remember or care about targets that you already returned with pass/fail, the framework knows and will send them to the following step if required.
* A **cancellation** may come in (`ctx.Done()`). This means the job is cancelled or failed entirely and nothing can be done to save it. Plugins SHOULD immediately cease all actions and return. Plugins MUST return within roughly 30s. Plugins do not have to deal with targets/DUTs, you don’t need to put them into any channels or similar - the job is dead. Return `context.Canceled`
* A **pause request** may come in. This means the contest server wants to restart. Not all plugins can support this easily, you can choose to ignore the pause request and continue as normal, however we want to minimize or completely get rid of these cases in the future. Plugin can either ignore the signal or react to it by serializing the state and returning it as `json.RawMessage` together with `signals.Paused`. If you do this, the framework will call Run again after the server has restarted and give your json back in `resumeState`. Note targets will not be re-injected, you need to remember them. However, new targets can still arrive after the resumption. You don’t need to remember or care about targets that you already returned with pass/fail, the framework knows and will send them to the following step if required.
mimir-d marked this conversation as resolved.
Show resolved Hide resolved

Further details on the semantics of Pause.

Expand All @@ -102,9 +102,9 @@ Further details on the semantics of Pause.
* An example of a good case for implementing pause: a plugin that launches an external job and wait for it to complete, polling for its status. In this case, upon receiving pause, it should serialize all the targets in flight at the moment along with job tokens, return them, and continue waiting in the next instance with the same targets and tokens.
* When pausing, ConTest will close the plugin’s input channel signaling that no more targets will be coming during lifetime of this Step object. This is done in addition to asserting the pause signal so the simplest plugins do not need to handle it separately. (Note: more targets can come in after resumption)
* Paused plugin retains responsibility for the targets in flight at the time of pausing, i.e. ones that were injected but for which no result was received yet.
* Successful pause is communicated by returning `ErrPaused` from the Run method. If the plugin was responsible for any targets at the time, any other return value (including `nil`) will be considered a failure to pause and will abort the job.
* Successful pause is communicated by returning `signals.Paused` from the Run method. If the plugin was responsible for any targets at the time, any other return value (including `nil`) will be considered a failure to pause and will abort the job.
* Cancellation will follow pause if timeout is about to expire, so both conditions may be asserted on a context, keep that in mind when testing for them.
* When there are no targets in flight and pause is requested, returning `ErrPaused` is allowed (for simplicity). It may be accompanied by `nil` . In general, return value of a plugin not responsible for any targets does not matter.
* When there are no targets in flight and pause is requested, returning `signals.Paused` is allowed (for simplicity). It may be accompanied by `nil` . In general, return value of a plugin not responsible for any targets does not matter.
* When successfully paused, a new instance of the plugin may be resumed with the state returned but it is not a guarantee: another step may have failed to pause correctly in which case entire job would have been aborted and steps that did pause correctly will not be revived.
* On resumption, targets the plugin was responsible for will not be reinjected but plugin is still expected to produce results for them. Therefore, state returned by the plugin must include everything necessary for the new instance to produce results for targets that had been injected into the previous instance.
* Cancel signal may be asserted together with pause, in which case cancellation takes precedence. If both pause and cancel are asserted, there is no need to perform pause-related activities anymore.
Expand Down
11 changes: 6 additions & 5 deletions cmds/admin_server/job/rdb/rdb.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package rdb

import (
"context"
"fmt"

"github.com/google/go-safeweb/safesql"
adminServerJob "github.com/linuxboot/contest/cmds/admin_server/job"
"github.com/linuxboot/contest/pkg/xcontext"
"github.com/linuxboot/contest/pkg/logging"
)

var (
Expand Down Expand Up @@ -39,7 +40,7 @@ func New(dbURI, driveName string) (*Storage, error) {
}

// GetTags returns tags that has a tag matches tagPattern
func (r *Storage) GetTags(ctx xcontext.Context, tagPattern string) ([]adminServerJob.Tag, error) {
func (r *Storage) GetTags(ctx context.Context, tagPattern string) ([]adminServerJob.Tag, error) {
var resultErr error
res := []adminServerJob.Tag{}
doneChan := make(chan struct{})
Expand All @@ -57,7 +58,7 @@ func (r *Storage) GetTags(ctx xcontext.Context, tagPattern string) ([]adminServe
defer func() {
err = rows.Close()
if err != nil {
ctx.Errorf("error while closing the rows reader: %w", err)
logging.Errorf(ctx, "error while closing the rows reader: %w", err)
}
}()

Expand Down Expand Up @@ -87,7 +88,7 @@ func (r *Storage) GetTags(ctx xcontext.Context, tagPattern string) ([]adminServe
}

// GetJobs returns jobs with final report if exists that are under a given tagName
func (r *Storage) GetJobs(ctx xcontext.Context, tagName string) ([]adminServerJob.Job, error) {
func (r *Storage) GetJobs(ctx context.Context, tagName string) ([]adminServerJob.Job, error) {
var resultErr error
res := []adminServerJob.Job{}
doneChan := make(chan struct{})
Expand All @@ -105,7 +106,7 @@ func (r *Storage) GetJobs(ctx xcontext.Context, tagName string) ([]adminServerJo
defer func() {
err = rows.Close()
if err != nil {
ctx.Errorf("error while closing the rows reader: %w", err)
logging.Errorf(ctx, "error while closing the rows reader: %w", err)
}
}()

Expand Down
6 changes: 3 additions & 3 deletions cmds/admin_server/job/storage.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package job

import (
"context"
"time"

"github.com/linuxboot/contest/pkg/types"
"github.com/linuxboot/contest/pkg/xcontext"
)

// DB wraps a job database
type Storage interface {
GetTags(ctx xcontext.Context, tagPattern string) ([]Tag, error)
GetJobs(ctx xcontext.Context, projectName string) ([]Job, error)
GetTags(ctx context.Context, tagPattern string) ([]Tag, error)
GetJobs(ctx context.Context, projectName string) ([]Job, error)
}

// Tag contains metadata about jobs under a given tag
Expand Down
26 changes: 10 additions & 16 deletions cmds/admin_server/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
Expand All @@ -15,9 +16,8 @@ import (
"github.com/linuxboot/contest/cmds/admin_server/server"
mongoStorage "github.com/linuxboot/contest/cmds/admin_server/storage/mongo"
"github.com/linuxboot/contest/pkg/logging"
"github.com/linuxboot/contest/pkg/xcontext"
"github.com/linuxboot/contest/pkg/xcontext/bundles/logrusctx"
"github.com/linuxboot/contest/pkg/xcontext/logger"

"github.com/facebookincubator/go-belt/tool/logger"
)

var (
Expand All @@ -27,7 +27,7 @@ var (
flagContestDBURI *string
flagTLSCert *string
flagTLSKey *string
flagLogLevel *string
logLevel = logger.LevelDebug
)

func initFlags(cmd string) {
Expand All @@ -37,7 +37,7 @@ func initFlags(cmd string) {
flagContestDBURI = flagSet.String("contestdbURI", "contest:contest@tcp(localhost:3306)/contest_integ?parseTime=true", "Contest Database URI")
flagTLSCert = flagSet.String("tlsCert", "", "Path to the tls cert file")
flagTLSKey = flagSet.String("tlsKey", "", "Path to the tls key file")
flagLogLevel = flagSet.String("logLevel", "debug", "A log level, possible values: debug, info, warning, error, panic, fatal")
flagSet.Var(&logLevel, "logLevel", "A log level, possible values: debug, info, warning, error, panic, fatal")
mimir-d marked this conversation as resolved.
Show resolved Hide resolved

}

Expand All @@ -58,27 +58,21 @@ func main() {
exitWithError(err, 1)
}

logLevel, err := logger.ParseLogLevel(*flagLogLevel)
if err != nil {
exitWithError(err, 1)
}
ctx := logging.WithBelt(context.Background(), logLevel)

ctx, cancel := logrusctx.NewContext(logLevel, logging.DefaultOptions()...)
defer cancel()

storageCtx, cancel := xcontext.WithTimeout(ctx, 10*time.Second)
defer cancel()
storageCtx, storageCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer storageCtxCancel()

storage, err := mongoStorage.NewMongoStorage(storageCtx, *flagDBURI)
if err != nil {
exitWithError(err, 1)
}
closeCtx, cancel := xcontext.WithTimeout(xcontext.Background(), 10*time.Second)
closeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
defer storage.Close(closeCtx)

var jobStorage *rdb.Storage
ctx.Debugf("init contest db connection %v \n", *flagContestDBURI)
logging.Debugf(ctx, "init contest db connection %v \n", *flagContestDBURI)
jobStorage, err = rdb.New(*flagContestDBURI, "mysql")
if err != nil {
exitWithError(err, 1)
Expand Down
32 changes: 17 additions & 15 deletions cmds/admin_server/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"crypto/tls"
"errors"
"fmt"
Expand All @@ -12,9 +13,10 @@ import (
adminServerJob "github.com/linuxboot/contest/cmds/admin_server/job"
"github.com/linuxboot/contest/cmds/admin_server/storage"
"github.com/linuxboot/contest/pkg/job"
"github.com/linuxboot/contest/pkg/logging"
"github.com/linuxboot/contest/pkg/types"
"github.com/linuxboot/contest/pkg/xcontext"
"github.com/linuxboot/contest/pkg/xcontext/logger"

"github.com/facebookincubator/go-belt/tool/logger"
)

var (
Expand Down Expand Up @@ -174,9 +176,9 @@ func (r *RouteHandler) addLogs(c *gin.Context) {
storageLogs = append(storageLogs, log.ToStorageLog())
}

ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBAccessTimeout)
defer cancel()
ctx = ctx.WithLogger(r.log)
ctx = logger.CtxWithLogger(ctx, r.log)
err := r.storage.StoreLogs(ctx, storageLogs)
if err != nil {
r.log.Errorf("Err while storing logs: %v", err)
Expand All @@ -203,9 +205,9 @@ func (r *RouteHandler) getLogs(c *gin.Context) {
return
}

ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBAccessTimeout)
defer cancel()
ctx = ctx.WithLogger(r.log)
ctx = logger.CtxWithLogger(ctx, r.log)
result, err := r.storage.GetLogs(ctx, query.ToStorageQuery())
if err != nil {
c.JSON(http.StatusInternalServerError, makeRestErr("error while getting the logs"))
Expand All @@ -227,9 +229,9 @@ func (r *RouteHandler) getTags(c *gin.Context) {
return
}

ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBAccessTimeout)
defer cancel()
ctx = ctx.WithLogger(r.log)
ctx = logger.CtxWithLogger(ctx, r.log)
res, err := r.jobStorage.GetTags(ctx, query.Text)
if err != nil {
c.JSON(http.StatusInternalServerError, makeRestErr("error while getting the projects"))
Expand All @@ -247,9 +249,9 @@ func (r *RouteHandler) getJobs(c *gin.Context) {
return
}

ctx, cancel := xcontext.WithTimeout(xcontext.Background(), DefaultDBAccessTimeout)
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBAccessTimeout)
defer cancel()
ctx = ctx.WithLogger(r.log)
ctx = logger.CtxWithLogger(ctx, r.log)
mimir-d marked this conversation as resolved.
Show resolved Hide resolved
res, err := r.jobStorage.GetJobs(ctx, projectName)
if err != nil {
c.JSON(http.StatusInternalServerError, makeRestErr("error while getting the jobs"))
Expand All @@ -263,7 +265,7 @@ func makeRestErr(format string, args ...any) gin.H {
return gin.H{"status": "err", "msg": fmt.Sprintf(format, args...)}
}

func initRouter(ctx xcontext.Context, rh RouteHandler, middlewares []gin.HandlerFunc) *gin.Engine {
func initRouter(ctx context.Context, rh RouteHandler, middlewares []gin.HandlerFunc) *gin.Engine {

r := gin.New()
r.Use(gin.Logger())
Expand Down Expand Up @@ -292,11 +294,11 @@ func initRouter(ctx xcontext.Context, rh RouteHandler, middlewares []gin.Handler
return r
}

func Serve(ctx xcontext.Context, port int, storage storage.Storage, jobStorage adminServerJob.Storage, middlewares []gin.HandlerFunc, tlsConfig *tls.Config) error {
func Serve(ctx context.Context, port int, storage storage.Storage, jobStorage adminServerJob.Storage, middlewares []gin.HandlerFunc, tlsConfig *tls.Config) error {
routeHandler := RouteHandler{
storage: storage,
jobStorage: jobStorage,
log: ctx.Logger(),
log: logger.FromCtx(ctx),
}
router := initRouter(ctx, routeHandler, middlewares)
server := &http.Server{
Expand All @@ -308,9 +310,9 @@ func Serve(ctx xcontext.Context, port int, storage storage.Storage, jobStorage a
go func() {
<-ctx.Done()
// on cancel close the server
ctx.Debugf("Closing the server")
logging.Debugf(ctx, "Closing the server")
if err := server.Close(); err != nil {
ctx.Errorf("Error closing the server: %v", err)
logging.Errorf(ctx, "Error closing the server: %v", err)
}
}()

Expand Down
19 changes: 10 additions & 9 deletions cmds/admin_server/storage/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"time"

"github.com/linuxboot/contest/cmds/admin_server/storage"
"github.com/linuxboot/contest/pkg/xcontext"
"github.com/linuxboot/contest/pkg/logging"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -27,7 +28,7 @@ type MongoStorage struct {
collection *mongo.Collection
}

func NewMongoStorage(ctx xcontext.Context, uri string) (*MongoStorage, error) {
func NewMongoStorage(ctx context.Context, uri string) (*MongoStorage, error) {
client, err := mongo.NewClient(options.Client().ApplyURI(uri))
if err != nil {
return nil, err
Expand Down Expand Up @@ -106,25 +107,25 @@ func toMongoQuery(query storage.Query) bson.D {
return q
}

func (s *MongoStorage) StoreLogs(ctx xcontext.Context, logs []storage.Log) error {
func (s *MongoStorage) StoreLogs(ctx context.Context, logs []storage.Log) error {
var mongoLogs []interface{}
for _, log := range logs {
mongoLogs = append(mongoLogs, toMongoLog(&log))
}
_, err := s.collection.InsertMany(ctx, mongoLogs)
if err != nil {
ctx.Errorf("Error while inserting a batch of logs: %v", err)
logging.Errorf(ctx, "Error while inserting a batch of logs: %v", err)
return storage.ErrInsert
}
return nil
}

func (s *MongoStorage) GetLogs(ctx xcontext.Context, query storage.Query) (*storage.Result, error) {
func (s *MongoStorage) GetLogs(ctx context.Context, query storage.Query) (*storage.Result, error) {
q := toMongoQuery(query)
//get the count of the logs
count, err := s.collection.CountDocuments(ctx, q)
if err != nil {
ctx.Errorf("Error while performing count query: %v", err)
logging.Errorf(ctx, "Error while performing count query: %v", err)
return nil, storage.ErrQuery
}

Expand All @@ -134,14 +135,14 @@ func (s *MongoStorage) GetLogs(ctx xcontext.Context, query storage.Query) (*stor

cur, err := s.collection.Find(ctx, q, opts)
if err != nil {
ctx.Errorf("Error while querying logs from db: %v", err)
logging.Errorf(ctx, "Error while querying logs from db: %v", err)
return nil, storage.ErrQuery
}

var logs []Log
err = cur.All(ctx, &logs)
if err != nil {
ctx.Errorf("Error while reading query result from db: %v", err)
logging.Errorf(ctx, "Error while reading query result from db: %v", err)
return nil, storage.ErrQuery
}
// convert to storage logs
Expand All @@ -158,7 +159,7 @@ func (s *MongoStorage) GetLogs(ctx xcontext.Context, query storage.Query) (*stor
}, nil
}

func (s *MongoStorage) Close(ctx xcontext.Context) error {
func (s *MongoStorage) Close(ctx context.Context) error {
return s.dbClient.Disconnect(ctx)
}

Expand Down
Loading
Loading