-
Notifications
You must be signed in to change notification settings - Fork 18
[APIE-24] Add support for CP Flink SQL Shell #3110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds Confluent Platform (on-prem) support for Flink compute pools and catalogs, refactors command registration to branch on cloud vs on-prem login, and streamlines web UI forwarding logic.
- Introduce on-prem create/describe/delete commands for Flink compute pools
- Refactor
newComputePoolCommand
to register cloud or on-prem subcommands based oncfg.IsCloudLogin()
- Add
catalog
commands and centralize Flink web UI forwarding via a new handler
Reviewed Changes
Copilot reviewed 60 out of 60 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
internal/flink/command_compute_pool_create_onprem.go | New on-prem compute-pool create command and output formatting |
internal/flink/command_compute_pool.go | Refactored newComputePoolCommand to take cfg and branch logic |
internal/flink/command.go | Reordered root commands, added catalog, compute-pool, shell, etc. |
go.mod | Added a local replace directive for development |
internal/flink/command_application_list.go | Updated application list logic to parse and validate job status |
Comments suppressed due to low confidence (2)
internal/flink/command_application_list.go:52
- [nitpick] This error message is repetitive and may confuse users. Consider rephrasing to something like "missing jobStatus field in application status", or include the application name for context.
return fmt.Errorf("job status not found in flink job status")
go.mod:286
- This local filesystem replace directive should not be committed. Remove or convert it to a replace pointing at a versioned module reference to avoid breaking CI and other developers' builds.
replace github.com/confluentinc/cmf-sdk-go => /Users/channingdong/cmf-sdk-go
…application options struct
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
…add new integration tests
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, only a few nits and comments to clarify, thanks.
if cloud { | ||
completer.AddCompleter(autocomplete.ExamplesCompleterCloud) | ||
} | ||
completer. | ||
AddCompleter(autocomplete.ExamplesCompleter). | ||
AddCompleter(autocomplete.SetCompleter). | ||
AddCompleter(autocomplete.ShowCompleter) | ||
AddCompleter(autocomplete.ExamplesCompleterCommon). | ||
AddCompleter(autocomplete.SetCompleterCommon) | ||
if cloud { | ||
completer.AddCompleter(autocomplete.SetCompleterCloud) | ||
} | ||
completer.AddCompleter(autocomplete.ShowCompleter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine these 2 if statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The autocomplete suggestions actually show up in the same order that they're added here, so I split this up to preserve the same ordering that the Cloud currently has.
} | ||
|
||
func (s *StoreOnPrem) ProcessLocalStatement(statement string) (*types.ProcessedStatement, *types.StatementError) { | ||
switch statementType := parseStatementType(statement); statementType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we have a defer s.persistUserProperties()
for the cloud counterpart but not here, is there a particular reason to exclude this for the on-prem function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's setting Cloud context parameters, which we generally don't have access to for on-prem login contexts. And in the case where we're not using this while logged in, we don't have a login context at all.
test/flink_onprem_test.go
Outdated
for _, test := range tests { | ||
test.onprem = true | ||
s.runFlinkShellTest(test) | ||
} | ||
s.loginOnPrem(s.T()) | ||
for _, test := range tests { | ||
test.onprem = true | ||
s.runFlinkShellTest(test) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are testing with 2 conditions, not log in and log in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup!
Release Notes
Breaking Changes
New Features
confluent flink shell
commandBug Fixes
Checklist
What
section below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Review
section below.Blast Radius
section below.What
Add an on-premises version of the
confluent flink shell
command.Blast Radius
Any potential impacts would be limited to the cloud
confluent flink shell
command since some small code changes were made to the cloud shell code to make it reusable in the on-premises version.References
CP Flink SQL Shell Setup:
https://confluent.slack.com/archives/C078W406JQN/p1748430281177909
How to run CP Docker Image:
https://confluentinc.atlassian.net/wiki/spaces/~7120200093b08d224f47afa6c9f99df824f505/pages/4392780091/How+to+run+the+CP+Flink+Docker+Images
Test & Review
New unit and integration tests
Manual testing results, part 1: https://docs.google.com/document/d/1LLgPD9s-IN4srYvvwI2swaLnhclH4JE7tSp-2oCibeM/edit?usp=sharing
Manual testing results, part 2: https://confluentinc.atlassian.net/wiki/spaces/~629aabc3e000550067e8491f/pages/4550725061/Testing+Interactive+CP+Flink+SQL+Shell