Skip to content

[APIE-24] Add support for CP Flink SQL Shell #3110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 70 commits into
base: main
Choose a base branch
from
Open

Conversation

channingdong
Copy link
Contributor

@channingdong channingdong commented May 27, 2025

Release Notes

Breaking Changes

  • PLACEHOLDER

New Features

  • Add the on-premises confluent flink shell command

Bug Fixes

  • PLACEHOLDER

Checklist

  • I have successfully built and used a custom CLI binary, without linter issues from this PR.
  • I have clearly specified in the What section below whether this PR applies to Confluent Cloud, Confluent Platform, or both.
  • I have verified this PR in Confluent Cloud pre-prod or production environment, if applicable.
  • I have verified this PR in Confluent Platform on-premises environment, if applicable.
  • I have attached manual CLI verification results or screenshots in the Test & Review section below.
  • I have added appropriate CLI integration or unit tests for any new or updated commands and functionality.
  • I confirm that this PR introduces no breaking changes or backward compatibility issues.
  • I have indicated the potential customer impact if something goes wrong in the Blast Radius section below.
  • I have put checkmarks below confirming that the feature associated with this PR is enabled in:
    • Confluent Cloud prod
    • Confluent Cloud stag
    • Confluent Platform
    • Check this box if the feature is enabled for certain organizations only

What

Add an on-premises version of the confluent flink shell command.

Blast Radius

Any potential impacts would be limited to the cloud confluent flink shell command since some small code changes were made to the cloud shell code to make it reusable in the on-premises version.

References

CP Flink SQL Shell Setup:
https://confluent.slack.com/archives/C078W406JQN/p1748430281177909

How to run CP Docker Image:
https://confluentinc.atlassian.net/wiki/spaces/~7120200093b08d224f47afa6c9f99df824f505/pages/4392780091/How+to+run+the+CP+Flink+Docker+Images

Test & Review

New unit and integration tests

Manual testing results, part 1: https://docs.google.com/document/d/1LLgPD9s-IN4srYvvwI2swaLnhclH4JE7tSp-2oCibeM/edit?usp=sharing

Manual testing results, part 2: https://confluentinc.atlassian.net/wiki/spaces/~629aabc3e000550067e8491f/pages/4550725061/Testing+Interactive+CP+Flink+SQL+Shell

@Copilot Copilot AI review requested due to automatic review settings May 27, 2025 06:51
@channingdong channingdong requested a review from a team as a code owner May 27, 2025 06:51
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds Confluent Platform (on-prem) support for Flink compute pools and catalogs, refactors command registration to branch on cloud vs on-prem login, and streamlines web UI forwarding logic.

  • Introduce on-prem create/describe/delete commands for Flink compute pools
  • Refactor newComputePoolCommand to register cloud or on-prem subcommands based on cfg.IsCloudLogin()
  • Add catalog commands and centralize Flink web UI forwarding via a new handler

Reviewed Changes

Copilot reviewed 60 out of 60 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
internal/flink/command_compute_pool_create_onprem.go New on-prem compute-pool create command and output formatting
internal/flink/command_compute_pool.go Refactored newComputePoolCommand to take cfg and branch logic
internal/flink/command.go Reordered root commands, added catalog, compute-pool, shell, etc.
go.mod Added a local replace directive for development
internal/flink/command_application_list.go Updated application list logic to parse and validate job status
Comments suppressed due to low confidence (2)

internal/flink/command_application_list.go:52

  • [nitpick] This error message is repetitive and may confuse users. Consider rephrasing to something like "missing jobStatus field in application status", or include the application name for context.
return fmt.Errorf("job status not found in flink job status")

go.mod:286

  • This local filesystem replace directive should not be committed. Remove or convert it to a replace pointing at a versioned module reference to avoid breaking CI and other developers' builds.
replace github.com/confluentinc/cmf-sdk-go => /Users/channingdong/cmf-sdk-go

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent

This comment has been minimized.

@sgagniere sgagniere changed the title [APIE-24] Add support for CP Flink SQL Shell (WIP) [APIE-24] Add support for CP Flink SQL Shell Jun 26, 2025
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

Copy link
Contributor Author

@channingdong channingdong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, only a few nits and comments to clarify, thanks.

Comment on lines 171 to 180
if cloud {
completer.AddCompleter(autocomplete.ExamplesCompleterCloud)
}
completer.
AddCompleter(autocomplete.ExamplesCompleter).
AddCompleter(autocomplete.SetCompleter).
AddCompleter(autocomplete.ShowCompleter)
AddCompleter(autocomplete.ExamplesCompleterCommon).
AddCompleter(autocomplete.SetCompleterCommon)
if cloud {
completer.AddCompleter(autocomplete.SetCompleterCloud)
}
completer.AddCompleter(autocomplete.ShowCompleter)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can combine these 2 if statement

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autocomplete suggestions actually show up in the same order that they're added here, so I split this up to preserve the same ordering that the Cloud currently has.

}

func (s *StoreOnPrem) ProcessLocalStatement(statement string) (*types.ProcessedStatement, *types.StatementError) {
switch statementType := parseStatementType(statement); statementType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have a defer s.persistUserProperties() for the cloud counterpart but not here, is there a particular reason to exclude this for the on-prem function?

Copy link
Member

@sgagniere sgagniere Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's setting Cloud context parameters, which we generally don't have access to for on-prem login contexts. And in the case where we're not using this while logged in, we don't have a login context at all.

Comment on lines 485 to 493
for _, test := range tests {
test.onprem = true
s.runFlinkShellTest(test)
}
s.loginOnPrem(s.T())
for _, test := range tests {
test.onprem = true
s.runFlinkShellTest(test)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are testing with 2 conditions, not log in and log in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup!

@sonarqube-confluent
Copy link

Passed

Analysis Details

8 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 8 Code Smells

Coverage and Duplications

  • Coverage 91.10% Coverage (77.60% Estimated after merge)
  • Duplications No duplication information (0.10% Estimated after merge)

Project ID: cli

View in SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants