Skip to content

Commit

Permalink
Merge pull request #34 from sev-2/feature/scheduler-enhance
Browse files Browse the repository at this point in the history
Feature : scheduler improvement
  • Loading branch information
toopay committed Jun 23, 2024
2 parents 68b2a40 + c12f66d commit f308b68
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 98 deletions.
19 changes: 16 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package raiden
import (
"context"
"encoding/json"
"errors"

"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -30,6 +31,7 @@ type (
SetSpan(span trace.Span)

Tracer() trace.Tracer
NewJobCtx() (JobContext, error)

Write(data []byte)
WriteError(err error)
Expand All @@ -42,9 +44,10 @@ type (
Ctx struct {
context.Context
*fasthttp.RequestCtx
config *Config
span trace.Span
tracer trace.Tracer
config *Config
span trace.Span
tracer trace.Tracer
jobChan chan JobParams
}
)

Expand Down Expand Up @@ -81,6 +84,16 @@ func (c *Ctx) Tracer() trace.Tracer {
return c.tracer
}

func (c *Ctx) NewJobCtx() (JobContext, error) {
if c.jobChan != nil {
jobCtx := newJobCtx(c.config, c.jobChan, make(JobData))
spanCtx := trace.SpanContextFromContext(c.Context)
jobCtx.SetContext(trace.ContextWithSpanContext(context.Background(), spanCtx))
return jobCtx, nil
}
return nil, errors.New(("event channel not available, enable scheduler to use this feature"))
}

func (c *Ctx) Ctx() context.Context {
return c.Context
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type MockContext struct {
SpanFn func() trace.Span
SetSpanFn func(span trace.Span)
TracerFn func() trace.Tracer
NewJobCtxFn func() (raiden.JobContext, error)
WriteFn func(data []byte)
WriteErrorFn func(err error)
}
Expand Down Expand Up @@ -73,6 +74,10 @@ func (c *MockContext) Tracer() trace.Tracer {
return c.TracerFn()
}

func (c *MockContext) NewJobCtx() (raiden.JobContext, error) {
return c.NewJobCtxFn()
}

func (c *MockContext) Write(data []byte) {
c.WriteFn(data)
}
Expand Down
56 changes: 29 additions & 27 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/sev-2/raiden/pkg/logger"
"github.com/sev-2/raiden/pkg/utils"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

fs_router "github.com/fasthttp/router"
Expand Down Expand Up @@ -47,12 +46,6 @@ const (
func NewRouter(config *Config) *router {
engine := fs_router.New()
groups := createRouteGroups(engine)

var tracer trace.Tracer
if config.TraceEnable {
tracer = otel.Tracer(fmt.Sprintf("%s tracer", config.ProjectName))
}

// register native controller
defaultRoutes := []*Route{
{
Expand All @@ -67,7 +60,6 @@ func NewRouter(config *Config) *router {
engine: engine,
config: config,
groups: groups,
tracer: tracer,
routes: defaultRoutes,
}
}
Expand All @@ -79,6 +71,15 @@ type router struct {
middlewares []MiddlewareFn
routes []*Route
tracer trace.Tracer
jobChan chan JobParams
}

func (r *router) SetJobChan(jobChan chan JobParams) {
r.jobChan = jobChan
}

func (r *router) SetTracer(tracer trace.Tracer) {
r.tracer = tracer
}

func (r *router) RegisterMiddlewares(middlewares []MiddlewareFn) *router {
Expand Down Expand Up @@ -165,19 +166,19 @@ func (r *router) bindRoute(chain Chain, route *Route) {
handler := chain.Then(m, route.Type, route.Controller)
switch strings.ToUpper(m) {
case fasthttp.MethodGet:
r.engine.GET(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.GET(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodPost:
r.engine.POST(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.POST(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodPut:
r.engine.PUT(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.PUT(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodPatch:
r.engine.PATCH(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.PATCH(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodDelete:
r.engine.DELETE(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.DELETE(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodOptions:
r.engine.OPTIONS(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.OPTIONS(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
case fasthttp.MethodHead:
r.engine.HEAD(route.Path, buildHandler(r.config, r.tracer, handler))
r.engine.HEAD(route.Path, buildHandler(r.config, r.tracer, r.jobChan, handler))
}
}
}
Expand Down Expand Up @@ -207,7 +208,7 @@ func (r *router) registerRpcAndFunctionHandler(route *Route) {
chain = r.buildAppMiddleware(chain)
}
group.POST(route.Path, buildHandler(
r.config, r.tracer, chain.Then(fasthttp.MethodPost, route.Type, route.Controller),
r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPost, route.Type, route.Controller),
))
}
}
Expand Down Expand Up @@ -235,11 +236,11 @@ func (r *router) registerRestHandler(route *Route) {
TableName: GetTableName(route.Model),
}

group.GET(route.Path, buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodGet, route.Type, restController)))
group.POST(route.Path, buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPost, route.Type, restController)))
group.PUT(route.Path, buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPut, route.Type, restController)))
group.PATCH(route.Path, buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPatch, route.Type, restController)))
group.DELETE(route.Path, buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodDelete, route.Type, restController)))
group.GET(route.Path, buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodGet, route.Type, restController)))
group.POST(route.Path, buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPost, route.Type, restController)))
group.PUT(route.Path, buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPut, route.Type, restController)))
group.PATCH(route.Path, buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPatch, route.Type, restController)))
group.DELETE(route.Path, buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodDelete, route.Type, restController)))
}
}

Expand All @@ -257,11 +258,11 @@ func (r *router) registerStorageHandler(route *Route) {
RoutePath: route.Path,
}

group.GET(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodGet, route.Type, restController)))
group.POST(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPost, route.Type, restController)))
group.PUT(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPut, route.Type, restController)))
group.PATCH(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodPatch, route.Type, restController)))
group.DELETE(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, chain.Then(fasthttp.MethodDelete, route.Type, restController)))
group.GET(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodGet, route.Type, restController)))
group.POST(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPost, route.Type, restController)))
group.PUT(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPut, route.Type, restController)))
group.PATCH(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodPatch, route.Type, restController)))
group.DELETE(route.Path+"/{path:*}", buildHandler(r.config, r.tracer, r.jobChan, chain.Then(fasthttp.MethodDelete, route.Type, restController)))
}
}

Expand Down Expand Up @@ -300,13 +301,14 @@ func createRouteGroups(engine *fs_router.Router) map[RouteType]*fs_router.Group

// The function "buildHandler" creates a fasthttp.RequestHandler that executes a given RouteHandlerFn
// with a provided Config and trace.Tracer.
func buildHandler(config *Config, tracer trace.Tracer, handler RouteHandlerFn) fasthttp.RequestHandler {
func buildHandler(config *Config, tracer trace.Tracer, jobChan chan JobParams, handler RouteHandlerFn) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {
appContext := &Ctx{
RequestCtx: ctx,
config: config,
tracer: tracer,
Context: context.Background(),
jobChan: jobChan,
}
// execute actual handler from controller
if err := handler(appContext); err != nil {
Expand Down
Loading

0 comments on commit f308b68

Please sign in to comment.