Skip to content

Commit

Permalink
Merge pull request #148 from TykTechnologies/add-before-execute-hook
Browse files Browse the repository at this point in the history
TT-3151 add before execute hook
  • Loading branch information
Sergey Petrunin committed Aug 26, 2021
2 parents 470c3bd + 372c8e9 commit 7375417
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 72 deletions.
18 changes: 16 additions & 2 deletions pkg/graphql/execution_engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

type EngineV2Configuration struct {
schema *Schema
plannerConfig plan.Configuration
schema *Schema
plannerConfig plan.Configuration
websocketBeforeStartHook WebsocketBeforeStartHook
}

func NewEngineV2Configuration(schema *Schema) EngineV2Configuration {
Expand Down Expand Up @@ -52,6 +53,11 @@ func (e *EngineV2Configuration) SetFieldConfigurations(fieldConfigs plan.FieldCo
e.plannerConfig.Fields = fieldConfigs
}

// SetWebsocketBeforeStartHook - sets before start hook which will be called before processing any operation sent over websockets
func (e *EngineV2Configuration) SetWebsocketBeforeStartHook(hook WebsocketBeforeStartHook) {
e.websocketBeforeStartHook = hook
}

type EngineResultWriter struct {
buf *bytes.Buffer
flushCallback func(data []byte)
Expand Down Expand Up @@ -157,6 +163,10 @@ type ExecutionEngineV2 struct {
executionPlanCache *lru.Cache
}

type WebsocketBeforeStartHook interface {
OnBeforeStart(reqCtx context.Context, operation *Request) error
}

type ExecutionOptionsV2 func(ctx *internalExecutionContext)

func WithBeforeFetchHook(hook resolve.BeforeFetchHook) ExecutionOptionsV2 {
Expand Down Expand Up @@ -268,6 +278,10 @@ func (e *ExecutionEngineV2) getCachedPlan(ctx *internalExecutionContext, operati
return p
}

func (e *ExecutionEngineV2) GetWebsocketBeforeStartHook() WebsocketBeforeStartHook {
return e.config.websocketBeforeStartHook
}

func (e *ExecutionEngineV2) getExecutionCtx() *internalExecutionContext {
return e.internalExecutionContextPool.Get().(*internalExecutionContext)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/subscription/executor_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ import (
"github.com/jensneuse/graphql-go-tools/pkg/graphql"
)

// ExecutorV2Pool - provides reusable executors
type ExecutorV2Pool struct {
engine *graphql.ExecutionEngineV2
executorPool *sync.Pool
engine *graphql.ExecutionEngineV2
executorPool *sync.Pool
connectionInitReqCtx context.Context // connectionInitReqCtx - holds original request context used to establish websocket connection
}

func NewExecutorV2Pool(engine *graphql.ExecutionEngineV2) *ExecutorV2Pool {
func NewExecutorV2Pool(engine *graphql.ExecutionEngineV2, connectionInitReqCtx context.Context) *ExecutorV2Pool {
return &ExecutorV2Pool{
engine: engine,
executorPool: &sync.Pool{
New: func() interface{} {
return &ExecutorV2{}
},
},
connectionInitReqCtx: connectionInitReqCtx,
}
}

Expand All @@ -37,6 +40,7 @@ func (e *ExecutorV2Pool) Get(payload []byte) (Executor, error) {
engine: e.engine,
operation: &operation,
context: context.Background(),
reqCtx: e.connectionInitReqCtx,
}, nil
}

Expand All @@ -50,6 +54,7 @@ type ExecutorV2 struct {
engine *graphql.ExecutionEngineV2
operation *graphql.Request
context context.Context
reqCtx context.Context
}

func (e *ExecutorV2) Execute(writer resolve.FlushWriter) error {
Expand All @@ -73,4 +78,5 @@ func (e *ExecutorV2) Reset() {
e.engine = nil
e.operation = nil
e.context = context.Background()
e.reqCtx = context.TODO()
}
20 changes: 19 additions & 1 deletion pkg/subscription/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (h *Handler) handleStart(id string, payload []byte) {
return
}

if err = h.handleOnBeforeStart(executor); err != nil {
h.handleError(id, graphql.RequestErrorsFromError(err))
return
}

if executor.OperationType() == ast.OperationTypeSubscription {
ctx := h.subCancellations.Add(id)
go h.startSubscription(ctx, id, executor)
Expand All @@ -200,6 +205,19 @@ func (h *Handler) handleStart(id string, payload []byte) {
go h.handleNonSubscriptionOperation(id, executor)
}

func (h *Handler) handleOnBeforeStart(executor Executor) error {
switch e := executor.(type) {
case *ExecutorV2:
if hook := e.engine.GetWebsocketBeforeStartHook(); hook != nil {
return hook.OnBeforeStart(e.reqCtx, e.operation)
}
case *ExecutorV1:
// do nothing
}

return nil
}

// handleNonSubscriptionOperation will handle a non-subscription operation like a query or a mutation.
func (h *Handler) handleNonSubscriptionOperation(id string, executor Executor) {
defer func() {
Expand Down Expand Up @@ -427,4 +445,4 @@ func (h *Handler) handleError(id string, errors graphql.RequestErrors) {
// ActiveSubscriptions will return the actual number of active subscriptions for that client.
func (h *Handler) ActiveSubscriptions() int {
return len(h.subCancellations)
}
}
Loading

0 comments on commit 7375417

Please sign in to comment.