Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a PTransform implementation to support Flink SQL. #90

Merged
merged 3 commits into from
Jan 18, 2024

Conversation

becketqin
Copy link
Collaborator

This patch introduces a new PTransform implementation to support Flink SQL.

So far the Flink SQL is only supported in batch mode. This is because the beam windowing strategy is difficult to apply to the Flink SQL operators.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@becketqin
Copy link
Collaborator Author

@yananhao12 Will you help take a look? Thanks!

Copy link

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Have a few comments about simplifying the APIs a bit. Also want to see whether we can keep the API with types in Beam so we don't expose Flink types in the mix.

* Transform.
* @see StreamTableEnvironment#createTemporaryView(String, DataStream)
*/
FlinkSql<InputT, OutputT> withMainInputTableTypeInformation(TypeInformation<InputT> typeInfo) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we expose this as Beam API, shall we stick to Beam's TypeDescriptor instead of Flink's TypeInformation directly? It seems we can converte between these two classes. Same for the other usage of TypeInformation in the public api methods in this class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not exposed to the end user. But the user facing API does ask for the TypeInformation. The purpose of letting users to provide TypeInformation is to make Beam record -> Flink Table row conversion more efficient.

I agree that it would be good if we can hide this, but that may come at a cost.

More specifically, Flink has many different types of TypeInformation, including AtomicType, ObjectArrayType, BasicArrayType, ListType, CompositeType, etc. Using a specific type of TypeInformation will make the java object to row conversion more efficient.

For example, if a class is a POJO, then PojoTypeInfo will be used, and it is a CompositeType, so when performing DataStream -> Table conversion, Flink will be able to convert the POJO objects to Flink Table Rows directly by having each POJO field as a Table column. For Avro records, we can use AvroTypeInfo, which is also a CompositeType.

If we let user only specify the Beam TypeDescriptor/ Coder here, then we will only have AtomicType because the CoderTypeInformation which wraps the Beam Coder is an AtomicType. That means the each record in Beam PCollection will be put as a whole into a single column with the column data type of RAW. And users will need to use a UDTF to expand a RAW record into multiple columns, which is an extra step. This introduces additional data format conversion cost.

There might be one way to hide TypeInformation without compromising performance for a set of records type is following (taking Avro record as an example):

  1. let users pass in a Coder here
  2. Introduce a new Coder type called AvroCoder which wraps an AvroTypeInfo. Users will call AvroCoder.of(MyAvroSpecificRecord.class) to create the AvroCoder.
  3. In the SQL transform translator, check and see if the Coder is an instance of AvroCoder. If so, use the TypeInformation of the specific record.

The downside of this solution is any record type that wants to be accelerated needs a framework level code change. And the acceleration contract is kind of implicit.

We will be facing similar situation for DataType of the output table as well, if we want to also hide DataType from users.

I am leaving the code unchanged as of now. Once we decide which way to go, I can update the patch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I was thinking since for avro, we will have the coder (oss AvroCoder) defined, which will contain schema, so in theory we can generate the corresponding Flink TypeInfo object. But it requires specific logic for avro (as you mentioned above), so probably only for LinkedIn and not worth the extra effort.

* @param <InputT> the input data type.
* @param <OutputT> the output data type.
*/
private static class FlinkSQLTransformTranslator<InputT, OutputT>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this to a separate class under package ...flink.translation.sql? Not sure whether we will have slightly variations of this in the future version of flink. i think it might be better to separate this from the FlinkSql API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can separate this class out.

Package wise, at this point, all the translators are in FlinkStreamingTransformTranslators class. So theoretically speaking we should put this class there as well. However, if we do that, we will need to put the entire FlinkStreamingTransformTranslators class into the source-override directory. I am a little reluctant to do so because this makes the critical code distributed all over the places.

I actually tried a few ways to organize the packages and ended up the current way, which puts all the sql related stuff in the same package. It seems making the code more readable because I don't have to jump over places, and also supporting multiple Flink versions becomes easier as I just need to add a single package to the source-override dir.

So it seems better to keep FlinkSQLTransformTranslator in the current package, but make it a separate class. In the future, if we need to support multiple versions, potentially we just need to overwrite this class.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. In beam typically the transform and translator are separated due to the transform is engine-agnostic. Since this transform is for Flink only, I guess making it in the same package doesn't matter too much.

*
* @param name the name of the table to be set as the main output of this Sql transform.
*/
FlinkSql<InputT, OutputT> withMainOutputTableName(String name) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented above, seems using a default should be enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only needs to be called if there are multiple queries. In that case, users will need specify a result table from a query as the main output. If there is only one query in the transform, the result table of that query will be treated as the main output by default, so users don't need to call this method.

schemaRegistry.getFromRowFunction(outputTypeDescriptor));
} catch (NoSuchSchemaException e) {
try {
out.setCoder(coderRegistry.getCoder(outputTypeDescriptor));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the type is Beam row or not to set up either the schema coder or the specific type coder? Seems if the user uses the specific types, the schema coder will do extra serde to get to the types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure I understand that you mean. This piece of logic is the same as in ParDo.SingleOutput#expand(). From what I understand, it looks up the coder in the SchemaRegistry first, then CoderRegistry. If the type is a Beam row, would the coder be available in the SchemaRegistry already?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the thing I thought about: after applying the FlinkSQLTransform. there can be two apis to use:

  1. use typed api:
    input.apply(FlinkSQLTransform).apply(MapElements.of(....)).apply(Count.perKey())

  2. use schema api:
    input.apply(FlinkSQLTransform).apply(Select.fieldNames(..)).apply(Group.byField())

This patch will set the coder to use schema coder, so the second case should be supported naturally. I was thinking about the first case. It still should work as normal given Beam can use schema to serde the record for grouping.

RecordsVerifier<T> recordsVerifier = new RecordsVerifier<>(file, clazz);
final int expectedNumRecords = recordsVerifier.expectedRecords.size();

pCollection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we are not using PAssert to verify the content of pCollection? Seems easier that way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I forgot that.

import org.junit.Test;

/** The unit tests for the Flink SQL PTransform. */
public class FlinkSqlPTransformTest {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add another test to have Row based transform, e..g Beam schema transform like Select or Filter, to make sure we can use those too? Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those are supported because Beam Row is not supported in Flink SQL to begin with.

If the records of the input PCollection are Beam Rows, the entire record will be converted to a single column of type RAW, and it is up the user to further parse this Beam Row to a Flink Row. We can potentially provide a built-in UDTF to do this, but that is something orthogonal to the SQL PTransform itself. And after the Beam Row is converted to a Flink Row, all the select or filter will be using Flink SQL instead of Beam Select / Filter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems you already registered the schema coder for the output PCollection. That's all needed for supporting schema Transforms. I asked here so we can verify that the output schema.

Copy link

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! In the middle still...
Two main comments/questions:
a) if FlinkSQL is an end-user facing PTransform, would suggest keeping it engine agnostic
b) if FlinkSQL is always reading a physical data source as the input, shouldn't we model it as a BeamIO?

* <p>See {@link SingleOutputSqlTransform} for more details regarding how to specify the input
* table, output table and SQL logic.
*/
public class FlinkSql<InputT, OutputT> implements Serializable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: is this FlinkSql class exposed to Beam API users? If it is, my concern is that this is leakage of abstraction of underneath engine details to the user. Shouldn't we go w/ a more generic SqlPTransform name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. Will update the class name to Sql instead.

* .withAdditionalOutputTable(new TupleTag<ProductAndSales>("SalesByProduct") {});
*
* PCollectionTuple outputs = pipeline
* .apply("DummyInput", Create.empty(TextualIntegerCoder.of()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we require a dummy input in this pipeline? As I realize from earlier discussion on the reader functions needed to implement this SQLPTransform, this SQLPTransform is designed to directly read from data sources, right? Shouldn't we just provide a SqlIO instead?

* @param ddl the table definition
* @return this {@link SingleOutputSqlTransform} itself.
*/
public SingleOutputSqlTransform<InputT, OutputT> withDDL(String ddl) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, we should add withCatalog() API as well.

Copy link
Collaborator Author

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuiscool @nickpan47 Thanks for the review. I just updated the patch to address most of the comments, including:

  1. Rename FlinkSql to SqlTransform
  2. Add catalog support (both via DDL and via catalog instance)
  3. Add UDF support.
  4. Avoid letting user create dummy input PCollection.
  5. Some other class structure clean up.

The main thing left is whether we should expose TypeInformation. I don't have a strong opinion on this. There are pros and cons. Once we make the decision I'll update the patch.

* <p>See {@link SingleOutputSqlTransform} for more details regarding how to specify the input
* table, output table and SQL logic.
*/
public class FlinkSql<InputT, OutputT> implements Serializable {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. Will update the class name to Sql instead.

*
* @param name the name of the table to be set as the main output of this Sql transform.
*/
FlinkSql<InputT, OutputT> withMainOutputTableName(String name) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only needs to be called if there are multiple queries. In that case, users will need specify a result table from a query as the main output. If there is only one query in the transform, the result table of that query will be treated as the main output by default, so users don't need to call this method.

schemaRegistry.getFromRowFunction(outputTypeDescriptor));
} catch (NoSuchSchemaException e) {
try {
out.setCoder(coderRegistry.getCoder(outputTypeDescriptor));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure I understand that you mean. This piece of logic is the same as in ParDo.SingleOutput#expand(). From what I understand, it looks up the coder in the SchemaRegistry first, then CoderRegistry. If the type is a Beam row, would the coder be available in the SchemaRegistry already?

import org.junit.Test;

/** The unit tests for the Flink SQL PTransform. */
public class FlinkSqlPTransformTest {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those are supported because Beam Row is not supported in Flink SQL to begin with.

If the records of the input PCollection are Beam Rows, the entire record will be converted to a single column of type RAW, and it is up the user to further parse this Beam Row to a Flink Row. We can potentially provide a built-in UDTF to do this, but that is something orthogonal to the SQL PTransform itself. And after the Beam Row is converted to a Flink Row, all the select or filter will be using Flink SQL instead of Beam Select / Filter.

RecordsVerifier<T> recordsVerifier = new RecordsVerifier<>(file, clazz);
final int expectedNumRecords = recordsVerifier.expectedRecords.size();

pCollection
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I forgot that.

* Transform.
* @see StreamTableEnvironment#createTemporaryView(String, DataStream)
*/
FlinkSql<InputT, OutputT> withMainInputTableTypeInformation(TypeInformation<InputT> typeInfo) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not exposed to the end user. But the user facing API does ask for the TypeInformation. The purpose of letting users to provide TypeInformation is to make Beam record -> Flink Table row conversion more efficient.

I agree that it would be good if we can hide this, but that may come at a cost.

More specifically, Flink has many different types of TypeInformation, including AtomicType, ObjectArrayType, BasicArrayType, ListType, CompositeType, etc. Using a specific type of TypeInformation will make the java object to row conversion more efficient.

For example, if a class is a POJO, then PojoTypeInfo will be used, and it is a CompositeType, so when performing DataStream -> Table conversion, Flink will be able to convert the POJO objects to Flink Table Rows directly by having each POJO field as a Table column. For Avro records, we can use AvroTypeInfo, which is also a CompositeType.

If we let user only specify the Beam TypeDescriptor/ Coder here, then we will only have AtomicType because the CoderTypeInformation which wraps the Beam Coder is an AtomicType. That means the each record in Beam PCollection will be put as a whole into a single column with the column data type of RAW. And users will need to use a UDTF to expand a RAW record into multiple columns, which is an extra step. This introduces additional data format conversion cost.

There might be one way to hide TypeInformation without compromising performance for a set of records type is following (taking Avro record as an example):

  1. let users pass in a Coder here
  2. Introduce a new Coder type called AvroCoder which wraps an AvroTypeInfo. Users will call AvroCoder.of(MyAvroSpecificRecord.class) to create the AvroCoder.
  3. In the SQL transform translator, check and see if the Coder is an instance of AvroCoder. If so, use the TypeInformation of the specific record.

The downside of this solution is any record type that wants to be accelerated needs a framework level code change. And the acceleration contract is kind of implicit.

We will be facing similar situation for DataType of the output table as well, if we want to also hide DataType from users.

I am leaving the code unchanged as of now. Once we decide which way to go, I can update the patch.

Copy link

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@xinyuiscool xinyuiscool merged commit 4551072 into linkedin:li_trunk Jan 18, 2024
3 checks passed
ajothomas added a commit that referenced this pull request Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants