Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to invoke Thrift service impls from BlockingTaskExecutor #5619

Merged
merged 19 commits into from
Jun 20, 2024

Conversation

ChangguHan
Copy link
Contributor

@ChangguHan ChangguHan commented Apr 18, 2024

Related issue: #4917

Motivation:

There's currently no way to make Thrift services run from
the BlockingTaskExecutor.

Modifications:

  • Added useBlockingTaskExecutor property
  • Added ThriftCallServiceBuilder to build ThriftCallService fluently
  • ThriftCallService now calls the service implementation from
    the BlockingTaskExecutor if configured so.

Result:

@CLAassistant
Copy link

CLAassistant commented Apr 18, 2024

CLA assistant check
All committers have signed the CLA.

Comment on lines 621 to 624
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(
() -> invoke(ctx, serializationFormat, seqId, f, decodedReq, httpRes)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we think invoke() is only thing that we should wrap with ctx.blockingTaskExecutor().

If not, please share to us~! 🙇

@ikhoon ikhoon added new feature sprint Issues for OSS Sprint participants labels Apr 22, 2024
@Override
public void get(TestServiceRequest request,
AsyncMethodCallback resultHandler) throws TException {
resultHandler.onComplete(new TestServiceResponse(request.getMessage()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could we check whether the current thread is the blocking task executor?
  • Should we also check TestService.Iface?

Copy link
Contributor Author

@ChangguHan ChangguHan Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check whether the current thread is the blocking task executor?

I checked it here

Should we also check TestService.Iface?

Fixed it. Because of Iface, only verified the current thread is blocking executor without adding useBlockingTaskExecutor

thrift/thrift0.13/src/test/thrift/Test.thrift Outdated Show resolved Hide resolved
Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. 👍
Left minor suggestions. 😉

Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me, @ChangguHan 👍 👍 👍
Thanks!

@minwoox
Copy link
Member

minwoox commented Apr 23, 2024

Oops, I realized that we also need to fix ThriftCallService not to use blocking task executor twice:

private static void invoke(
        ServiceRequestContext ctx,
        Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

    try {
        final TBase<?, ?> tArgs = func.newArgs(args);
        if (func.isAsync()) {
            invokeAsynchronously(impl, func, tArgs, reply);
        } else if (ctx.eventLoop().inEventLoop()) {
            invokeSynchronously(ctx, impl, func, tArgs, reply);
        } else {
            invokeSynchronously0(ctx, impl, func, tArgs, reply);
        }
    } catch (Throwable t) {
        reply.completeExceptionally(t);
    }
}

private static void invokeSynchronously(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ctx.blockingTaskExecutor().execute(() -> invokeSynchronously0(ctx, impl, func, args, reply));
}

private static void invokeSynchronously0(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ...
}

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether to delegate the call to the blocking task executor should be decided by ThriftCallService, not THttpService.

@ChangguHan
Copy link
Contributor Author

@trustin
I fix to decide whether to check useBlockingTaskExecutor in ThriftCallService instead of THttpService.
Would you check this?

if (func.isAsync()) {
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(() -> {
try {
invokeAsynchronously(impl, func, tArgs, reply);
} catch (Throwable t) {
reply.completeExceptionally(t);
}
});
} else {
invokeAsynchronously(impl, func, tArgs, reply);
}
} else {
invokeSynchronously(ctx, impl, func, tArgs, reply);
}

@ChangguHan
Copy link
Contributor Author

Oops, I realized that we also need to fix ThriftCallService not to use blocking task executor twice:

private static void invoke(
        ServiceRequestContext ctx,
        Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

    try {
        final TBase<?, ?> tArgs = func.newArgs(args);
        if (func.isAsync()) {
            invokeAsynchronously(impl, func, tArgs, reply);
        } else if (ctx.eventLoop().inEventLoop()) {
            invokeSynchronously(ctx, impl, func, tArgs, reply);
        } else {
            invokeSynchronously0(ctx, impl, func, tArgs, reply);
        }
    } catch (Throwable t) {
        reply.completeExceptionally(t);
    }
}

private static void invokeSynchronously(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ctx.blockingTaskExecutor().execute(() -> invokeSynchronously0(ctx, impl, func, args, reply));
}

private static void invokeSynchronously0(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ...
}

@minwoox Thank you for your review.
I fix to check in ThriftCallService!

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks almost done once @trustin 's comments are addressed 👍

*/
public ThriftCallServiceBuilder addService(String key, Object implementation) {
requireNonNull(implementation, "implementation");
this.implementations.put(key, ImmutableList.of(implementation));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.implementations.put(key, ImmutableList.of(implementation));
implementations.put(key, ImmutableList.of(implementation));

@ChangguHan ChangguHan requested a review from trustin May 17, 2024 06:59
@ChangguHan ChangguHan requested a review from jrhee17 May 17, 2024 06:59
Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Left minor comments. 👍

assertTrue(service.entries().containsKey(""));
final Iterator<?> defaultIterator = service.entries().get("").implementations.iterator();
assertEquals(defaultServiceImpl, defaultIterator.next());
assertFalse(defaultIterator.hasNext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assertion logic of this test method is not easy to read. How about building a map that contains the expected keys and values and just compare it to the return value of service.entries()? e.g.

final Map<String, List<Object>> actualEntries = service.entries().entrySet().stream()....collect(...);
final Map<String, List<Object>> expectedEntries = ImmutableMap.builder()...build();
assertThat(actualEntries).isEqualsTo(expectedEntries);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it.
Would you check this?

@ChangguHan ChangguHan requested a review from trustin June 13, 2024 09:38
Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me once the build reliably passes 👍 👍 👍

Comment on lines 115 to 116
resultHandler.onComplete(name);
blocking.set(Thread.currentThread().getName().startsWith(BLOCKING_EXECUTOR_PREFIX));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; swapping these two may help with flakiness. Otherwise, the response may be sent back and the main test thread may proceed before the blocking value is set

Suggested change
resultHandler.onComplete(name);
blocking.set(Thread.currentThread().getName().startsWith(BLOCKING_EXECUTOR_PREFIX));
blocking.set(Thread.currentThread().getName().startsWith(BLOCKING_EXECUTOR_PREFIX));
resultHandler.onComplete(name);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhee17 Thank you for your comment. I applied it.
By the way, how can I know which test is failed from failed build?
I cannot find the failed tests from CI/CD

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChangguHan You can check the build scan reports here: #5619 (comment) which are updated when builds are complete. Open the report and go to the 'Tests' section to browse the test result report.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, there's only one failed job and it failed due to a flaky test which seems irrelevant to your work, so it's all good! 🟢 https://ge.armeria.dev/s/yq5kpsjeqryju/tests/overview

Copy link
Contributor

@ikhoon ikhoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ChangguHan! 🙇‍♂️👍

"barMap", ImmutableList.of(barServiceImpl),
"fooIterableMap", ImmutableList.of(fooServiceImpl, barServiceImpl));

assertThat(actualEntries).isEqualTo(expectedEntries);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for cleaning this up!

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job, @ChangguHan! 🙇

@trustin trustin changed the title [Issue-4917] Add blockingTaskExecutor in Thrift services Provide a way to invoke Thrift service impls from BlockingTaskExecutor Jun 20, 2024
@trustin trustin merged commit 52114ed into line:main Jun 20, 2024
14 of 15 checks passed
@trustin
Copy link
Member

trustin commented Jun 20, 2024

Revised and cleaned up the PR description and commit message.

@injae-kim
Copy link
Contributor

Nice work @ChangguHan !!! 👍👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new feature sprint Issues for OSS Sprint participants
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow useBlockingTaskExecutor in Thrift services
7 participants