Skip to content

Commit

Permalink
rpk: list virtual cluster on cloud login
Browse files Browse the repository at this point in the history
Now rpk cloud login will display a list of
virtual clusters and clusters when you succesfully
logged in. Then you can pick one of them and rpk
will create a profile with the selection.

If there is just one cluster, it will select that
cluster by default and create a profile.
  • Loading branch information
r-vasquez committed Jun 14, 2023
1 parent f9de4c6 commit c2c9ef2
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 43 deletions.
6 changes: 1 addition & 5 deletions src/go/rpk/pkg/api/admin/api_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ type newUser struct {
}

const (
// Redpanda supports only SCRAM at the moment, which has two varieties.
//
// Both of the below technically go against the Go naming conventions
// for acronyms, but 8 uppercase letters for two merged acronyms is a
// bit odd.
ScramSha256 = "SCRAM-SHA-256"
ScramSha512 = "SCRAM-SHA-512"
CloudOIDC = "oidc_from_cloud_auth"
)

// CreateUser creates a user with the given username and password using the
Expand Down
132 changes: 106 additions & 26 deletions src/go/rpk/pkg/cli/cloud/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"os"
"sort"
"strings"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/profile"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

func newLoginCommand(fs afero.Fs, p *config.Params) *cobra.Command {
Expand Down Expand Up @@ -117,6 +119,15 @@ and then re-specify the client credentials next time you log in.`)
return cmd
}

// nameAndCluster it's a temporary type used to describe a cluster name in the
// form of <namespace>/<cluster-name> and the cluster content (either a virtual
// cluster or a normal cluster).
type nameAndCluster struct {
name string
c *cloudapi.Cluster
vc *cloudapi.VirtualCluster
}

func loginProfileFlow(ctx context.Context, fs afero.Fs, y *config.RpkYaml, auth *config.RpkCloudAuth, overrideCloudURL string) (string, error) {
// If our current profile is a cloud cluster, we exit.
// If one cloud profile exists, we switch to it.
Expand Down Expand Up @@ -161,16 +172,28 @@ func loginProfileFlow(ctx context.Context, fs afero.Fs, y *config.RpkYaml, auth
// * Multiple clusters exist: prompt which to choose and swap.
cl := cloudapi.NewClient(overrideCloudURL, auth.AuthToken, httpapi.ReqTimeout(10*time.Second))

cs, err := cl.Clusters(ctx)
vcs, cs, err := clusterList(ctx, cl)
if err != nil {
return "", fmt.Errorf("unable to get list of clusters: %w", err)
return "", err
}

var c cloudapi.Cluster
if len(cs) == 0 {
if len(cs) == 0 && len(vcs) == 0 {
return `You currently have no cloud clusters, when you create one you can run
'rpk profile create --from-cloud {cluster_id}' to create a profile for it.`, nil
} else if len(cs) > 0 {
}

// If there is just 1 cluster/virtual-cluster we go ahead and select that.
var selected nameAndCluster
if len(cs) == 1 && len(vcs) == 0 {
selected = nameAndCluster{
name: cs[0].Name,
c: &cs[0],
}
} else if len(vcs) == 1 && len(cs) == 0 {
selected = nameAndCluster{
name: vcs[0].Name,
vc: &vcs[0],
}
} else {
ns, err := cl.Namespaces(ctx)
if err != nil {
return "", fmt.Errorf("unable to get list of namespaces: %w", err)
Expand All @@ -179,21 +202,8 @@ func loginProfileFlow(ctx context.Context, fs afero.Fs, y *config.RpkYaml, auth
for _, n := range ns {
nsIDToName[n.ID] = n.Name
}
type nameAndC struct {
name string
c cloudapi.Cluster
}
var nameAndCs []nameAndC
for _, c := range cs {
c := c
nameAndCs = append(nameAndCs, nameAndC{
name: fmt.Sprintf("%s/%s", nsIDToName[c.NamespaceUUID], c.Name),
c: c,
})
}
sort.Slice(nameAndCs, func(i, j int) bool {
return nameAndCs[i].name < nameAndCs[j].name
})
nameAndCs := combineClusterNames(vcs, cs, nsIDToName)

var names []string
for _, nc := range nameAndCs {
names = append(names, nc.name)
Expand All @@ -202,17 +212,29 @@ func loginProfileFlow(ctx context.Context, fs afero.Fs, y *config.RpkYaml, auth
if err != nil {
return "", err
}
c = nameAndCs[idx].c
selected = nameAndCs[idx]
}

// We have a cluster selected, but the list response does not return
// all information we need. We need to now directly request this
// cluster's information.
c, err = cl.Cluster(ctx, c.ID)
if err != nil {
return "", fmt.Errorf("unable to get cluster %q information: %w", c.ID, err)
var (
requiresMTLS, requiresSASL bool
p config.RpkProfile
)
if selected.c != nil {
c, err := cl.Cluster(ctx, selected.c.ID)
if err != nil {
return "", fmt.Errorf("unable to get cluster %q information: %w", c.ID, err)
}
p, requiresMTLS, requiresSASL = profile.FromCloudCluster(c)
} else {
c, err := cl.VirtualCluster(ctx, selected.vc.ID)
if err != nil {
return "", fmt.Errorf("unable to get cluster %q information: %w", c.ID, err)
}
p, requiresMTLS, requiresSASL = profile.FromVirtualCluster(c)
}
p, requiresMTLS, requiresSASL := profile.FromCloudCluster(c)

// Before pushing this profile, we first check if the name exists. If
// so, we prompt.
Expand All @@ -237,3 +259,61 @@ func loginProfileFlow(ctx context.Context, fs afero.Fs, y *config.RpkYaml, auth
}
return msg, y.Write(fs)
}

func clusterList(ctx context.Context, cl *cloudapi.Client) (vcs []cloudapi.VirtualCluster, cs []cloudapi.Cluster, err error) {
g, egCtx := errgroup.WithContext(ctx)
g.Go(func() (rerr error) {
vcs, rerr = cl.VirtualClusters(egCtx)
if rerr != nil {
return fmt.Errorf("unable to get the list of virtual clusters: %v", rerr)
}
return nil
})
g.Go(func() (rerr error) {
cs, rerr = cl.Clusters(egCtx)
if rerr != nil {
return fmt.Errorf("unable to get the list of clusters: %v", rerr)
}
return nil
})
if err := g.Wait(); err != nil {
return nil, nil, err
}
return
}

// combineClusterNames combines the names of Virtual Clusters and Clusters,
// sorted alphabetically, and returns a list of nameAndCluster structs
// representing the combined clusters (VClusters first, then Clusters).
func combineClusterNames(vcs []cloudapi.VirtualCluster, cs []cloudapi.Cluster, nsIDToName map[string]string) []nameAndCluster {
// First we display the Virtual Clusters
var vNameAndCs []nameAndCluster
for _, vc := range vcs {
vc := vc
if strings.ToLower(vc.State) != cloudapi.ClusterStateReady {
continue
}
vNameAndCs = append(vNameAndCs, nameAndCluster{
name: fmt.Sprintf("%s/%s", nsIDToName[vc.NamespaceUUID], vc.Name),
vc: &vc,
})
}
sort.Slice(vNameAndCs, func(i, j int) bool {
return vNameAndCs[i].name < vNameAndCs[j].name
})

// Then we append the cluster names
var nameAndCs []nameAndCluster
for _, c := range cs {
c := c
nameAndCs = append(nameAndCs, nameAndCluster{
name: fmt.Sprintf("%s/%s", nsIDToName[c.NamespaceUUID], c.Name),
c: &c,
})
}
sort.Slice(nameAndCs, func(i, j int) bool {
return nameAndCs[i].name < nameAndCs[j].name
})

return append(vNameAndCs, nameAndCs...)
}
12 changes: 12 additions & 0 deletions src/go/rpk/pkg/cli/profile/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cloudapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/oauth"
Expand Down Expand Up @@ -252,6 +253,17 @@ func FromCloudCluster(c cloudapi.Cluster) (p config.RpkProfile, isMTLS, isSASL b
return p, isMTLS, isSASL
}

func FromVirtualCluster(vc cloudapi.VirtualCluster) (p config.RpkProfile, isMTLS, isSASL bool) {
p = config.RpkProfile{
Name: vc.Name,
FromCloud: true,
}
p.KafkaAPI.Brokers = vc.Status.Created.VirtualRedpandaDetails.SeedAddresses
p.KafkaAPI.SASL.Mechanism = admin.CloudOIDC
// isSASL is false here since we don't need to print the SASL-required msg.
return p, false, false
}

// RequiresMTLSMessage returns the message to print if the cluster requires
// mTLS.
func RequiresMTLSMessage() string {
Expand Down
65 changes: 65 additions & 0 deletions src/go/rpk/pkg/cloudapi/api_virtual_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package cloudapi

import (
"context"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/httpapi"
)

const (
virtualClusterPath = "/api/v1/virtual-clusters"
ClusterStateReady = "ready"
)

type (
VirtualClusterSpec struct {
ServerlessRegionID string `json:"serverlessRegionId"`
}
VirtualClusterStatus struct {
Created struct {
VirtualRedpandaDetails struct {
AdminAPIURL string `json:"adminApiUrl"`
SeedAddresses []string `json:"seedAddresses"`
} `json:"virtualRedpandaDetails"`
Metadata struct {
Source string `json:"source"`
ReportedAt string `json:"reportedAt"`
} `json:"metadata"`
} `json:"created"`
}

VirtualCluster struct {
NameID
NamespaceUUID string `json:"namespaceUuid"`
CreatedAt time.Time `json:"createdAt"`
State string `json:"state"`

Spec VirtualClusterSpec `json:"spec"`
Status VirtualClusterStatus `json:"status"`
}
// VirtualClusters is a set of Redpanda virtual clusters.
VirtualClusters []VirtualCluster
)

func (cl *Client) VirtualClusters(ctx context.Context) (VirtualClusters, error) {
var cs []VirtualCluster
err := cl.cl.Get(ctx, virtualClusterPath, nil, &cs)
return cs, err
}

func (cl *Client) VirtualCluster(ctx context.Context, vClusterID string) (VirtualCluster, error) {
path := httpapi.Pathfmt(virtualClusterPath+"/%s", vClusterID)
var vc VirtualCluster
err := cl.cl.Get(ctx, path, nil, &vc)
return vc, err
}
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cloudapi/cloudapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewClient(host, authToken string, hopts ...httpapi.Opt) *Client {
return &Client{cl: httpapi.NewClient(opts...)}
}

// NameID is a common type used in may endpoints / many structs.
// NameID is a common type used in many endpoints / many structs.
type NameID struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions src/go/rpk/pkg/config/rpk_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (p *RpkProfile) Defaults() *RpkDefaults {
return &p.c.rpkYaml.Defaults
}

// CurrentAuth returns the current cloud Auth.
func (p *RpkProfile) CurrentAuth() *RpkCloudAuth {
return p.c.rpkYaml.Auth(p.c.rpkYaml.CurrentCloudAuth)
}

// HasClientCredentials returns if both ClientID and ClientSecret are empty.
func (a *RpkCloudAuth) HasClientCredentials() bool {
k, _ := a.Kind()
Expand Down
34 changes: 23 additions & 11 deletions src/go/rpk/pkg/kafka/client_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"strings"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/spf13/afero"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
)
Expand Down Expand Up @@ -93,17 +95,27 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
}

if k.SASL != nil {
mech := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
opts = append(opts, kgo.SASL(mech.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(mech.AsSha512Mechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512]", name)
if k.SASL.Mechanism == admin.CloudOIDC {
a := p.CurrentAuth()
if a == nil || a.AuthToken == "" {
return nil, fmt.Errorf("please login to our cloud using 'rpk cloud login'") // TODO
}
opts = append(opts, kgo.SASL((oauth.Auth{
Token: a.AuthToken,
}).AsMechanism()))
} else {
mech := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
opts = append(opts, kgo.SASL(mech.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(mech.AsSha512Mechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512]", name)
}
}
}

Expand Down

0 comments on commit c2c9ef2

Please sign in to comment.