Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarification on workflow.Interceptor #374

Closed
ostafen opened this issue Nov 24, 2022 · 16 comments
Closed

Clarification on workflow.Interceptor #374

ostafen opened this issue Nov 24, 2022 · 16 comments

Comments

@ostafen
Copy link
Contributor

ostafen commented Nov 24, 2022

Is it safe to use a grpc client for which I have installed a workflow.Interceptor also outside a workflow definition?
Or will all calls to the client be intercepted and cached by the interceptor?

@yedf2
Copy link
Contributor

yedf2 commented Nov 24, 2022

it will only intercept request with context of workflow

@ostafen
Copy link
Contributor Author

ostafen commented Nov 24, 2022

I'm trying to use custom metadata inside a worflow context. However, it seems that dtm is not sending headers when using the interceptor.

Even if I try to put myself metadata on the context object, the interceptor is replacing any existing metadata with the call to NewOutgoingContext here:

// TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(ctx context.Context, gid, transType, branchID, op, dtm string) context.Context {
	md := metadata.Pairs(
		dtmpre+"gid", gid,
		dtmpre+"trans_type", transType,
		dtmpre+"branch_id", branchID,
		dtmpre+"op", op,
		dtmpre+"dtm", dtm,
	)
	nctx := ctx
	if ctx == nil {
		nctx = context.Background()
	}
	return metadata.NewOutgoingContext(nctx, md)
}

I think that we should replace the call to metadata.NewOutgoingContext(nctx, md) with metadata.AppendToOutgoingContext in order to keep existing metadata which have been set on the context object.

The following piece of code is not working also:
I'm setting the headers in the following way:

workflow.Register(s.Name, func(wf *workflow.Workflow, data []byte) error {
    wf.BranchHeaders = myHeaders
}

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

client v1.16.4 has been released. In this new version, TransInfo2Ctx use AppendToOutgoingContext

@ostafen
Copy link
Contributor Author

ostafen commented Nov 25, 2022

Thank you, but now if I try to extend the workflow context with my headers

wf.Context = metadata.NewOutgoingContext(wf.Context, metadata.Pairs(pairs...))

, then I get:

dtmimp.PanicIf(wf.currentActionAdded, fmt.Errorf("one branch can have only on action"))

@ostafen
Copy link
Contributor Author

ostafen commented Nov 25, 2022

The interceptor has currently another problem:

func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req))
	wf := ctx.Value(wfMeta{}).(*Workflow)

	origin := func() error {
		ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm)
		err := invoker(ctx1, method, req, reply, cc, opts...)
		res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v",
			cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err)
		if err != nil {
			logger.Errorf("%s", res)
		} else {
			logger.Debugf("%s", res)
		}
		return err
	}
	if wf.currentOp != dtmimp.OpAction {
		return origin()
	}
	sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
		err := origin()
		return wf.stepResultFromGrpc(reply, err)
	})
	return wf.stepResultToGrpc(sr, reply)
}

Basically, if a context without the wfMeta{} key is intercepted, the code will panic, 'cause there is no null check for wf.
I think fix should be:

wf := ctx.Value(wfMeta{}).(*Workflow)
if wf == nil {
       return nil
}

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

can you make a pr to dtm?

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

you can pass a ctx to workflow's constructor

@ostafen
Copy link
Contributor Author

ostafen commented Nov 25, 2022

can you make a pr to dtm?

Of course.

you can pass a ctx to workflow's constructor

What do you mean?

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

Thank you, but now if I try to extend the workflow context with my headers

wf.Context = metadata.NewOutgoingContext(wf.Context, metadata.Pairs(pairs...))

, then I get:

dtmimp.PanicIf(wf.currentActionAdded, fmt.Errorf("one branch can have only on action"))

This error hint is saying that, you should call NewBranch if you are adding another action
I have check your code, add added to my test, and tests passed. you can search it in the newest code in dtm's test

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

you can pass a ctx to workflow's constructor

What do you mean?

I am sorry, please ignore it

@ostafen
Copy link
Contributor Author

ostafen commented Nov 25, 2022

Oh, I see, so basically every branch should be followed by at most one grpc call, shouldn't it?
Like so:

wf.NewBranch()...
myOp1
wf.NewBranch()...
myOp2
...

While the following is wrong:

wf.NewBranch()...
myOp1
myOp2
...

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

yes

@ostafen
Copy link
Contributor Author

ostafen commented Nov 25, 2022

Great, I confirm you that this is working. Submitted PR #375. Can you create a release with the fix when it will be merged?

@yedf2
Copy link
Contributor

yedf2 commented Nov 25, 2022

I will create a release with this fix tomorrow

@ostafen ostafen closed this as completed Nov 25, 2022
@yedf2
Copy link
Contributor

yedf2 commented Nov 26, 2022

released

@yedf2
Copy link
Contributor

yedf2 commented Nov 26, 2022

When you deploy dtm in your production, please help to fill the user list
#7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants