Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(protobuf): support any for protobuf message source #12291

Merged
merged 36 commits into from
Oct 31, 2023

Conversation

xzhseh
Copy link
Contributor

@xzhseh xzhseh commented Sep 13, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

resolve #12246

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Now we support any type for protobuf source, the relevant documentation maybe updated.

@xzhseh

This comment was marked as outdated.

@xzhseh xzhseh requested a review from fuyufjh September 13, 2023 22:50
@xzhseh xzhseh self-assigned this Sep 13, 2023
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

src/connector/src/parser/protobuf/parser.rs Outdated Show resolved Hide resolved
src/connector/src/parser/protobuf/parser.rs Show resolved Hide resolved
@tabVersion
Copy link
Contributor

I'm curious where's the encoding part of the message

RisingWave doesn't wrap messages in Any. The producer may have such logic.

and which semantic of decoding we should follow when dealing with payload

I don't get the question

@xzhseh

This comment was marked as outdated.

@xzhseh
Copy link
Contributor Author

xzhseh commented Sep 20, 2023

Now any type is supported, the returned type would be a ScalarImpl::Struct(...).
PTAL if this is expected, cc @fuyufjh @tabVersion.

@albertoforcato
Copy link

@BugenZhao When will this PR be merged ?

@xzhseh xzhseh requested a review from fuyufjh September 21, 2023 15:41
@BugenZhao
Copy link
Member

@BugenZhao When will this PR be merged ?

We are currently awaiting approval from the module owners for this PR. We appreciate your patience, and you can subscribe to the notifications of this PR to get informed.

@codecov
Copy link

codecov bot commented Sep 22, 2023

Codecov Report

Merging #12291 (ffb9e87) into main (c97e08c) will decrease coverage by 0.11%.
Report is 2 commits behind head on main.
The diff coverage is 49.90%.

@@            Coverage Diff             @@
##             main   #12291      +/-   ##
==========================================
- Coverage   68.26%   68.16%   -0.11%     
==========================================
  Files        1505     1506       +1     
  Lines      254901   255327     +426     
==========================================
+ Hits       174015   174034      +19     
- Misses      80886    81293     +407     
Flag Coverage Δ
rust 68.16% <49.90%> (-0.11%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/common/src/metrics.rs 13.15% <ø> (ø)
src/connector/src/parser/unified/protobuf.rs 86.36% <100.00%> (+3.03%) ⬆️
src/connector/src/source/monitor/metrics.rs 95.23% <ø> (-0.87%) ⬇️
src/frontend/src/expr/pure.rs 87.69% <ø> (ø)
src/jni_core/src/macros.rs 100.00% <ø> (ø)
src/stream/src/executor/monitor/streaming_stats.rs 95.69% <ø> (-0.12%) ⬇️
src/frontend/src/binder/expr/binary_op.rs 70.40% <0.00%> (ø)
src/expr/impl/src/scalar/jsonb_access.rs 0.00% <0.00%> (ø)
src/stream/src/executor/actor.rs 56.81% <0.00%> (+1.26%) ⬆️
src/stream/src/executor/source/source_executor.rs 78.22% <0.00%> (+0.58%) ⬆️
... and 6 more

... and 20 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@tabVersion
Copy link
Contributor

Now any type is supported, the returned type would be a ScalarImpl::Struct(...). PTAL if this is expected, cc @fuyufjh @tabVersion.

I think it is not expected.
When literally creating a source, RisingWave does not get any actual payload. You can only get Any in the schema but know nothing inside.

src/connector/src/parser/protobuf/parser.rs Outdated Show resolved Hide resolved
src/connector/src/parser/protobuf/parser.rs Outdated Show resolved Hide resolved
src/connector/src/parser/protobuf/parser.rs Outdated Show resolved Hide resolved
@xzhseh xzhseh requested a review from a team as a code owner October 18, 2023 01:32
@Rossil2012 Rossil2012 self-requested a review October 19, 2023 09:33
@xzhseh
Copy link
Contributor Author

xzhseh commented Oct 26, 2023

There is a potential chance that naming conflict could happen, let's say user names the int64 field of B C as the same, i.e.,

message A {
  any v1 = 1;
  int64 v2 = 2;
}
message B {
  any v3 = 1;
  int64 v4 = 2;
}
message C {
  int64 v4 = 1;
  string v5 = 2;
  bytes v6 = 3;
}

In the above case, there could potentially exist two kinds of ingest messages:
1. A.v1 -> B.v4 ...
2. A.v1 -> C.v4 ...
If we let the user extract the field only based on field name, in this case, select ((msg).A -> 'v1' -> 'v4')::bigint, the result is ambiguous and indeterministic.
But this is the matter of how to restrict / control users' behavior.
Do you prefer to let users themselves prevent naming conflict like this at the very beginning on their own, and add a hidden _type_ field for name hint, or we help them handle these cases by specifying full name, which would turn out to be, select ((msg).A -> 'A.v1' -> 'B.v4' or 'C.v4')::bigint.

@xzhseh
Copy link
Contributor Author

xzhseh commented Oct 26, 2023

In addition, any idea on the gitguardian error?

@tabVersion
Copy link
Contributor

In addition, any idea on the gitguardian error?

Just ignore it, it is not related to the topic

@tabVersion
Copy link
Contributor

tabVersion commented Oct 26, 2023

There is a potential chance that naming conflict could happen, let's say user names the int64 field of B C as the same, i.e.,

message A {
  any v1 = 1;
  int64 v2 = 2;
}
message B {
  any v3 = 1;
  int64 v4 = 2;
}
message C {
  int64 v4 = 1;
  string v5 = 2;
  bytes v6 = 3;
}

In the above case, there could potentially exist two kinds of ingest messages: 1. A.v1 -> B.v4 ... 2. A.v1 -> C.v4 ... If we let the user extract the field only based on field name, in this case, select ((msg).A -> 'v1' -> 'v4')::bigint, the result is ambiguous and indeterministic. But this is the matter of how to restrict / control users' behavior. Do you prefer to let users themselves prevent naming conflict like this at the very beginning on their own, and add a hidden _type_ field for name hint, or we help them handle these cases by specifying full name, which would turn out to be, select ((msg).A -> 'A.v1' -> 'B.v4' or 'C.v4')::bigint.

Yes, you are right, it can be ambiguous if the message names collide. But we should do as little change to the original message content as possible, giving a prefix to each field is opposite to the idea.
we encourage users to match the message name and do the analysis, it matches the practice I've seen from my last company.

select case (msg).A -> '_type_'
  when 'messageB' then ... 
  when 'messageC' then ...
end

I think it is not confusing.

Rw gets both messageA and messageB wrapped in messageC. Rw will extract the inner fields without telling it is messageA or messageB

Yes, the customer told us that they will ensure only one message type in the any field. So I did not realize a name collide problem. After a second thought, I think adding a _type_ field is reasonable and enough to fix the problem.

@fuyufjh
Copy link
Member

fuyufjh commented Oct 26, 2023

It sounds like the conclusion will be this, right?

{
  "__type": B.full_name(),
  "v3": {
    "__type": C.full_name(),
    "v5": 2,
    "v6": "some string",
    "v7": NULL // <-- json cannon handle bytes type, I think it is ok to leave NULL here and give a warning
  },
  "v4": 1,
}

LGTM 👍

A little suggestion: a single underscore (_type) is my favorite style, considering our hidden column _row_id follows this style, too. (Another common style is rw_, but it seems a bit weird here)

@xzhseh
Copy link
Contributor Author

xzhseh commented Oct 26, 2023

The current message will be displayed like this.
CleanShot 2023-10-26 at 15 26 07@2x

The example use case is as below.
CleanShot 2023-10-26 at 15 37 39@2x

The schema of the messages is as below.

message TestAny {
  int32 id = 1;
  google.protobuf.Any any_value = 2;
}
message StringStringInt32Value {
  string first = 1;
  StringInt32Value second = 2;
  Float32StringValue third = 3;
}

One thing to note is that, we now support _type hint and name mapping for nested any message (e.g., A.v1 -> B.v3 -> C from the discussion above), since the decoding part in from_protobuf_value will recursively handle any type resolution from bottom to top. See test_any_recursive for details.

match fields[1].clone() {
    Some(ScalarImpl::Jsonb(jv)) => {
        assert_eq!(
            jv,
            JsonbVal::from(json!({
                "_type": "test.AnyValue",
                "any_value_1": {
                    "_type": "test.StringValue",
                    "value": "114514",
                },
                "any_value_2": {
                    "_type": "test.Int32Value",
                    "value": 114514,
                }
            }))
        );
    }
    _ => panic!("Expected ScalarImpl::Jsonb"),
}

This PR could be merged after final reviews, cc @tabVersion @fuyufjh @Rossil2012.

@xzhseh xzhseh dismissed Rossil2012’s stale review October 27, 2023 05:40

Dismiss stale reviews.

@xzhseh xzhseh added the user-facing-changes Contains changes that are visible to users label Oct 27, 2023
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for finishing the pr🥹

Comment on lines +297 to +299
if key.is_empty() {
key = "Int16".to_string();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we always get a struct here, when will we encounter the missing key scene?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When encountering any type holding a "nested" struct, we could not get the actual field name inside the struct.
e.g., For the proto below, we could only get the type_url and value for the second and third field in StringStringInt32Value, rather than the actual field name in those structs. In this case, the current workaround is use anonymous type name as the key, that's when the full_name_vec turns `None.

message TestAny {
  int32 id = 1;
  google.protobuf.Any any_value = 2;
}
message StringStringInt32Value {
  string first = 1;
  StringInt32Value second = 2;
  Float32StringValue third = 3;
}

src/connector/src/parser/protobuf/parser.rs Outdated Show resolved Hide resolved
@xzhseh xzhseh added this pull request to the merge queue Oct 31, 2023
Merged via the queue into main with commit 99fe0eb Oct 31, 2023
24 of 27 checks passed
@xzhseh xzhseh deleted the xzhseh/feat-protobuf-any branch October 31, 2023 20:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Any for protobuf message Source
6 participants