Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit pushdown for Kafka connectors, and further refactor and enhance predicate pushdown in Kafka connector #20146

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

xxzhky
Copy link

@xxzhky xxzhky commented Dec 17, 2023

Description

Add limit pushdown for Kafka connectors, and further refactor and enhance predicate pushdown in Kafka connector

Additional context and related issues

Add limit pushdown, and enhance predicate pushdown in Kafka connector

Significant changes have been made to improve predicate push-down support in Kafka connector. The code has been refactored to better handle intersection of old and new domains.

Additionally, the functionality to override partition end and begin offsets with range has been enhanced.

Lastly, an option to enable or disable predicate pushdown at runtime through session properties has been introduced.
By default, it supports predicate push-down. Push-down also can be disabled via kafka.predicate-force-push-down-enabled config prop or kafka.predicate_force_push_down_enabled session prop.

I try my test to make your life easier so that extensive comments have been added across multiple files in the Kafka connector for better readability and understanding of the code.

Release notes

(1) Add limit pushdown for Kafka connectors,
(2) Enhance predicate pushdown in Kafka connector.

@cla-bot cla-bot bot added the cla-signed label Dec 17, 2023
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jan 10, 2024
@xxzhky
Copy link
Author

xxzhky commented Jan 11, 2024 via email

@mosabua
Copy link
Member

mosabua commented Jan 11, 2024

Could @wendigo @findepi @elonazoulay or anybody else with more knowledge about the Kafka connector help @xxzhky with a review please to move this PR forward.

Also @xxzhky can you confirm that the build passes locally and the CI failures are wrong, or otherwise fix these issues.

@github-actions github-actions bot removed the stale label Jan 11, 2024
Copy link

github-actions bot commented Feb 2, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Feb 2, 2024
@mosabua
Copy link
Member

mosabua commented Feb 2, 2024

@xxzhky could you rebase and fix any build issues

@xxzhky
Copy link
Author

xxzhky commented Feb 4, 2024

@mosabua yes, I confirm that the build passes locally, however, some irrational checking issues happen to me. i.g.
ci / maven-checks

@wendigo
Copy link
Contributor

wendigo commented Feb 4, 2024

I've rebased this PR locally and checked the dependency scope failure and here's the fix:

commit d896b0cf12d7b4c16818467166f225dbe38546fc
Author: Mateusz "Serafin" Gajewski <mateusz.gajewski@gmail.com>
Date:   Sun Feb 4 11:19:06 2024 +0100

    Fix

diff --git a/plugin/trino-kafka/pom.xml b/plugin/trino-kafka/pom.xml
index ce1fd85e77..6099a921f3 100644
--- a/plugin/trino-kafka/pom.xml
+++ b/plugin/trino-kafka/pom.xml
@@ -274,6 +274,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-parser</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-record-decoder</artifactId>
diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
index 3f98d51064..47e7a597c2 100644
--- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
+++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestConstraintExtractor.java
@@ -29,6 +29,7 @@ import io.trino.spi.type.TimestampType;
 import io.trino.spi.type.TimestampWithTimeZoneType;
 import io.trino.spi.type.Type;
 import io.trino.sql.planner.ConnectorExpressionTranslator;
+import io.trino.sql.planner.IrTypeAnalyzer;
 import io.trino.sql.planner.LiteralEncoder;
 import io.trino.sql.planner.Symbol;
 import io.trino.sql.planner.TypeProvider;
@@ -69,7 +70,6 @@ import static io.trino.spi.type.VarcharType.createVarcharType;
 import static io.trino.sql.analyzer.TypeSignatureProvider.fromTypes;
 import static io.trino.sql.analyzer.TypeSignatureTranslator.toSqlType;
 import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT;
-import static io.trino.sql.planner.TypeAnalyzer.createTestingTypeAnalyzer;
 import static io.trino.sql.tree.ComparisonExpression.Operator.EQUAL;
 import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN;
 import static io.trino.sql.tree.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
@@ -743,7 +743,7 @@ public class TestConstraintExtractor
                         TypeProvider.viewOf(symbolTypes.entrySet().stream()
                                 .collect(toImmutableMap(entry -> new Symbol(entry.getKey()), Map.Entry::getValue))),
                         PLANNER_CONTEXT,
-                        createTestingTypeAnalyzer(PLANNER_CONTEXT))
+                        new IrTypeAnalyzer(PLANNER_CONTEXT))
                 .orElseThrow(() -> new RuntimeException("Translation to ConnectorExpression failed for: " + expression));
     }

@xxzhky
Copy link
Author

xxzhky commented Feb 5, 2024

@wendigo OK. Thanks very much, and could you like to help me fix it.

@xxzhky
Copy link
Author

xxzhky commented Feb 5, 2024

FYI,here attached the screenshot which the build passes locally.
image

@github-actions github-actions bot removed the stale label Feb 5, 2024
@wendigo
Copy link
Contributor

wendigo commented Feb 5, 2024

@xxzhky failure is in the dependency scope check, not tests. Tests are passing just fine on the CI.

@mosabua
Copy link
Member

mosabua commented Feb 5, 2024

@xxzhky the diff in the comment above is the fix .. just apply that locally and amend the commit and force push.. then CI should pass

@xxzhky xxzhky force-pushed the 435-limit-pushdown-09 branch 2 times, most recently from 1bea5b1 to 8917ade Compare February 10, 2024 01:01
refactor and enhance predicate pushdown in Kafka connector

Extensive comments have been added across multiple files in the Kafka connector for better readability and understanding of the code. The commit also results in better maintainability and improved performance of the code.

Significant changes have been made to improve predicate push-down support in Kafka connector. The code has been refactored to better handle intersection of old and new domains.

Additionally, the functionality to override partition end and begin offsets with range has been enhanced.

Lastly, an option to enable or disable predicate pushdown at runtime through session properties has been introduced.
1. By default, it supports predicate push-down. Push-down can be disabled via ``kafka.predicate-force-push-down-enabled`` config prop  or ``kafka.predicate_force_push_down_enabled`` session prop.

Update Kafka config in tests
…ate constraint

1.This commit introduces the ConstraintExtractor class in the Kafka connector.

This class is used to extract and manipulate constraint related data and provides static functions for different operations.

It is primarily used to handle different operations such as transforming constraints and expressions, managing conjuncts and converting and intersecting tuple domains.

Moreover, it also handles specific cases such as timestamp casting, truncation and other function(year).

2.Replace createTestingTypeAnalyzer with new IrTypeAnalyzer

The Kafka plugin test class TestConstraintExtractor has been updated, replacing `createTestingTypeAnalyzer` with a new instantiation of `IrTypeAnalyzer`. Furthermore, `trino-parser` dependency was added into `pom.xml` to ensure correct symbol translation, while enhancing the test coverage and improving the overall efficiency.
@xxzhky
Copy link
Author

xxzhky commented Feb 10, 2024

@mosabua @wendigo all checks done

Copy link

github-actions bot commented Mar 4, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Mar 4, 2024
@xxzhky
Copy link
Author

xxzhky commented Mar 5, 2024

@mosabua @wendigo why has it gone again

@github-actions github-actions bot removed the stale label Mar 5, 2024
@xxzhky xxzhky requested a review from ebyhr March 13, 2024 02:57
@Praveen2112 Praveen2112 self-requested a review March 19, 2024 17:56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to split them into smaller commits like

  • Add limit pushdown
  • Refactor
  • Enhanced pushdown

This would help to ease up the review process

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Praveen2112 ok. I'll split them into more parts. Hong long could you get the review process done?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can start reviewing it by this week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could have some test coverage in BaseConnectorTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of converting ConnectorExpression into a TupleDomain - Can we ConnectorExpressionRewriter which would support the same for nested expression as well.

@ebyhr ebyhr removed their request for review March 25, 2024 00:53
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Apr 15, 2024
@xxzhky
Copy link
Author

xxzhky commented Apr 23, 2024

@bitsondatadev @colebow @mosabua I am so busy recently, and I will handle it in the near future.

@mosabua
Copy link
Member

mosabua commented Apr 23, 2024

Sounds good @xxzhky .. adding stale-ignore label.

@mosabua mosabua added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

None yet

5 participants