diff --git a/src/go/rpk/pkg/adminapi/api_broker.go b/src/go/rpk/pkg/adminapi/api_broker.go index 4fc87b23ee85..e2340c78460e 100644 --- a/src/go/rpk/pkg/adminapi/api_broker.go +++ b/src/go/rpk/pkg/adminapi/api_broker.go @@ -159,6 +159,12 @@ func (a *AdminAPI) DisableMaintenanceMode(ctx context.Context, nodeID int, useLe } } +// MaintenanceStatus returns the maintenance status of a node. +func (a *AdminAPI) MaintenanceStatus(ctx context.Context) (MaintenanceStatus, error) { + var response MaintenanceStatus + return response, a.sendAny(ctx, http.MethodGet, "/v1/maintenance", nil, nil) +} + func (a *AdminAPI) CancelNodePartitionsMovement(ctx context.Context, node int) ([]PartitionsMovementResult, error) { var response []PartitionsMovementResult return response, a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s/%d/cancel_partition_moves", brokersEndpoint, node), nil, &response) diff --git a/src/go/rpk/pkg/adminapi/api_cloud_storage.go b/src/go/rpk/pkg/adminapi/api_cloud_storage.go index cdc0389c0d2d..1723ecf50568 100644 --- a/src/go/rpk/pkg/adminapi/api_cloud_storage.go +++ b/src/go/rpk/pkg/adminapi/api_cloud_storage.go @@ -61,6 +61,24 @@ type CloudStorageStatus struct { MetadataUpdatePending bool `json:"metadata_update_pending"` // If true, the remote metadata may not yet include all segments that have been uploaded. } +// CloudStorageLifecycle are the lifecycle markers for topics pending deletion. +type CloudStorageLifecycle struct { + Markers []LifecycleMarker `json:"markers"` +} + +// LifecycleMarker Is the lifecycle status of a topic (e.g. during deletion). +type LifecycleMarker struct { + Ns string `json:"ns"` + Topic string `json:"topic"` + RevisionID int `json:"revision_id"` + Status string `json:"status"` +} + +type ( + CloudStorageManifest map[string]any + CloudStorageAnomalies map[string]any +) + // StartAutomatedRecovery starts the automated recovery process by sending a request to the automated recovery API endpoint. func (a *AdminAPI) StartAutomatedRecovery(ctx context.Context, topicNamesPattern string) (RecoveryStartResponse, error) { requestParams := &RecoveryRequestParams{ @@ -82,3 +100,21 @@ func (a *AdminAPI) CloudStorageStatus(ctx context.Context, topic, partition stri path := fmt.Sprintf("/v1/cloud_storage/status/%s/%s", topic, partition) return response, a.sendAny(ctx, http.MethodGet, path, http.NoBody, &response) } + +// CloudStorageLifecycle returns lifecycle markers for topics pending deletion. +func (a *AdminAPI) CloudStorageLifecycle(ctx context.Context) (CloudStorageLifecycle, error) { + var response CloudStorageLifecycle + return response, a.sendToLeader(ctx, http.MethodGet, "/v1/cloud_storage/lifecycle", nil, &response) +} + +func (a *AdminAPI) CloudStorageManifest(ctx context.Context, topic string, partition int) (CloudStorageManifest, error) { + var response CloudStorageManifest + path := fmt.Sprintf("/v1/cloud_storage/manifest/%v/%v", topic, partition) + return response, a.sendAny(ctx, http.MethodGet, path, nil, &response) +} + +func (a *AdminAPI) CloudStorageAnomalies(ctx context.Context, namespace, topic string, partition int) (CloudStorageAnomalies, error) { + var response CloudStorageAnomalies + path := fmt.Sprintf("/v1/cloud_storage/anomalies/%v/%v/%v", namespace, topic, partition) + return response, a.sendAny(ctx, http.MethodGet, path, nil, &response) +} diff --git a/src/go/rpk/pkg/adminapi/api_cluster.go b/src/go/rpk/pkg/adminapi/api_cluster.go index 3c18cb221920..67f69b388c9f 100644 --- a/src/go/rpk/pkg/adminapi/api_cluster.go +++ b/src/go/rpk/pkg/adminapi/api_cluster.go @@ -69,6 +69,10 @@ type PartitionsMovementResult struct { Result string `json:"result,omitempty"` } +type ClusterUUID struct { + UUID string `json:"cluster_uuid"` +} + // ClusterView represents a cluster view as seen by one node. There are // many keys returned, so the raw response is just unmarshalled into an // interface. @@ -94,3 +98,10 @@ func (a *AdminAPI) ClusterView(ctx context.Context) (ClusterView, error) { var response ClusterView return response, a.sendOne(ctx, http.MethodGet, "/v1/cluster_view", nil, &response, true) } + +// ClusterUUID returns the UUID of the cluster this node belongs to. Not to be +// confused with the configurable cluster identifier used in metrics. +func (a *AdminAPI) ClusterUUID(ctx context.Context) (ClusterUUID, error) { + var response ClusterUUID + return response, a.sendToLeader(ctx, http.MethodGet, "/v1/cluster/uuid", nil, &response) +} diff --git a/src/go/rpk/pkg/adminapi/api_debug.go b/src/go/rpk/pkg/adminapi/api_debug.go index e0f188e1b393..9557229f58f0 100644 --- a/src/go/rpk/pkg/adminapi/api_debug.go +++ b/src/go/rpk/pkg/adminapi/api_debug.go @@ -100,6 +100,37 @@ type SelfTestRequest struct { Nodes []int `json:"nodes,omitempty"` } +// PartitionLeaderTable is the information about leaders, the information comes +// from the Redpanda's partition_leaders_table. +type PartitionLeaderTable struct { + Ns string `json:"ns"` + Topic string `json:"topic"` + PartitionID int `json:"partition_id"` + Leader int `json:"leader"` + PreviousLeader int `json:"previous_leader"` + LastStableLeaderTerm int `json:"last_stable_leader_term"` + UpdateTerm int `json:"update_term"` + PartitionRevision int `json:"partition_revision"` +} + +// ControllerStatus is the status of a controller, as seen by a node. +type ControllerStatus struct { + StartOffset int `json:"start_offset"` + LastAppliedOffset int `json:"last_applied_offset"` + CommittedIndex int `json:"committed_index"` + DirtyOffset int `json:"dirty_offset"` +} + +// ReplicaState is the partition state of a replica. There are many keys +// returned, so the raw response is just unmarshalled into an interface. +type ReplicaState map[string]any + +// DebugPartition is the low level debug information of a partition. +type DebugPartition struct { + Ntp string `json:"ntp"` + Replicas []ReplicaState `json:"replicas"` +} + func (a *AdminAPI) StartSelfTest(ctx context.Context, nodeIds []int, params []any) (string, error) { var testID string body := SelfTestRequest{ @@ -129,3 +160,29 @@ func (a *AdminAPI) SelfTestStatus(ctx context.Context) ([]SelfTestNodeReport, er err := a.sendAny(ctx, http.MethodGet, fmt.Sprintf("%s/status", debugEndpoint), nil, &response) return response, err } + +// PartitionLeaderTable returns the partitions leader table for the requested +// node. +func (a *AdminAPI) PartitionLeaderTable(ctx context.Context) ([]PartitionLeaderTable, error) { + var response []PartitionLeaderTable + return response, a.sendAny(ctx, http.MethodGet, "/v1/debug/partition_leaders_table", nil, &response) +} + +func (a *AdminAPI) IsNodeIsolated(ctx context.Context) (bool, error) { + var isIsolated bool + return isIsolated, a.sendAny(ctx, http.MethodGet, "/v1/debug/is_node_isolated", nil, &isIsolated) +} + +// ControllerStatus returns the controller status, as seen by the requested +// node. +func (a *AdminAPI) ControllerStatus(ctx context.Context) (ControllerStatus, error) { + var response ControllerStatus + return response, a.sendAny(ctx, http.MethodGet, "/v1/debug/controller_status", nil, &response) +} + +// DebugPartition returns low level debug information (on any node) of all +// replicas of a given partition. +func (a *AdminAPI) DebugPartition(ctx context.Context, namespace, topic string, partitionID int) (DebugPartition, error) { + var response DebugPartition + return response, a.sendAny(ctx, http.MethodGet, fmt.Sprintf("/v1/debug/partition/%v/%v/%v", namespace, topic, partitionID), nil, &response) +} diff --git a/src/go/rpk/pkg/adminapi/api_features.go b/src/go/rpk/pkg/adminapi/api_features.go index 95a4fb2803bd..5d498b77350e 100644 --- a/src/go/rpk/pkg/adminapi/api_features.go +++ b/src/go/rpk/pkg/adminapi/api_features.go @@ -54,7 +54,7 @@ type LicenseProperties struct { // GetFeatures returns information about the available features. func (a *AdminAPI) GetFeatures(ctx context.Context) (FeaturesResponse, error) { var features FeaturesResponse - return features, a.sendAny( + return features, a.sendToLeader( ctx, http.MethodGet, "/v1/features", @@ -64,7 +64,7 @@ func (a *AdminAPI) GetFeatures(ctx context.Context) (FeaturesResponse, error) { func (a *AdminAPI) GetLicenseInfo(ctx context.Context) (License, error) { var license License - return license, a.sendAny(ctx, http.MethodGet, "/v1/features/license", nil, &license) + return license, a.sendToLeader(ctx, http.MethodGet, "/v1/features/license", nil, &license) } func (a *AdminAPI) SetLicense(ctx context.Context, license interface{}) error { diff --git a/src/go/rpk/pkg/adminapi/api_raft.go b/src/go/rpk/pkg/adminapi/api_raft.go new file mode 100644 index 000000000000..961bb5c6b716 --- /dev/null +++ b/src/go/rpk/pkg/adminapi/api_raft.go @@ -0,0 +1,27 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package adminapi + +import ( + "context" + "net/http" +) + +type RaftRecoveryStatus struct { + PartitionsToRecover int `json:"partitions_to_recover"` + PartitionsActive int `json:"partitions_active"` + OffsetsPending int `json:"offsets_pending"` +} + +// RaftRecoveryStatus returns the node's recovery status. +func (a *AdminAPI) RaftRecoveryStatus(ctx context.Context) (RaftRecoveryStatus, error) { + var status RaftRecoveryStatus + return status, a.sendOne(ctx, http.MethodGet, "/v1/raft/recovery/status", nil, &status, false) +} diff --git a/src/go/rpk/pkg/adminapi/api_storage.go b/src/go/rpk/pkg/adminapi/api_storage.go new file mode 100644 index 000000000000..a67a0a93e6d7 --- /dev/null +++ b/src/go/rpk/pkg/adminapi/api_storage.go @@ -0,0 +1,29 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package adminapi + +import ( + "context" + "net/http" +) + +// DiskStatInfo is the disk data returned by the /disk_stat endpoint of the +// admin API. +type DiskStatInfo map[string]any + +func (a *AdminAPI) DiskCache(ctx context.Context) (DiskStatInfo, error) { + var response DiskStatInfo + return response, a.sendOne(ctx, http.MethodGet, "/v1/debug/storage/disk_stat/cache", nil, &response, false) +} + +func (a *AdminAPI) DiskData(ctx context.Context) (DiskStatInfo, error) { + var response DiskStatInfo + return response, a.sendOne(ctx, http.MethodGet, "/v1/debug/storage/disk_stat/data", nil, &response, false) +} diff --git a/src/go/rpk/pkg/cli/cluster/partitions/toggle.go b/src/go/rpk/pkg/cli/cluster/partitions/toggle.go index ccfdcc44f138..69f6cbe85345 100644 --- a/src/go/rpk/pkg/cli/cluster/partitions/toggle.go +++ b/src/go/rpk/pkg/cli/cluster/partitions/toggle.go @@ -13,10 +13,6 @@ import ( "context" "fmt" "os" - "regexp" - "strconv" - "strings" - "sync" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -178,7 +174,7 @@ func runToggle(ctx context.Context, cl *adminapi.AdminAPI, all bool, topicArg, p } g, egCtx := errgroup.WithContext(ctx) for _, ntp := range partitionFlag { - ns, topicName, partitions, err := parsePartition(ntp) + ns, topicName, partitions, err := out.ParsePartitionString(ntp) if err != nil { return err } @@ -204,40 +200,3 @@ func runToggle(ctx context.Context, cl *adminapi.AdminAPI, all bool, topicArg, p } return g.Wait() } - -var ( - partitionRe *regexp.Regexp - partitionReOnce sync.Once -) - -// parsePartition parses the partition flag with the format: -// {namespace}/{topic}/[partitions...] -// where namespace and topic are optionals, and partitions are comma-separated -// partitions ID. If namespace is not provided, the function assumes 'kafka'. -func parsePartition(ntp string) (ns, topic string, partitions []int, rerr error) { - partitionReOnce.Do(func() { - // Matches {namespace}/{topic}/[partitions...] - // - Index 0: Full Match. - // - Index 1: Namespace, if present. - // - Index 2: Topic, if present. - // - Index 3: Comma-separated partitions. - partitionRe = regexp.MustCompile(`^(?:(?:([^/]+)/)?([^/]+)/)?(\d+(?:,\d+)*)$`) - }) - match := partitionRe.FindStringSubmatch(ntp) - if len(match) == 0 { - return "", "", nil, fmt.Errorf("unable to parse %q: wrong format", ntp) - } - ns = match[1] - if ns == "" { - ns = "kafka" - } - partitionString := strings.Split(match[3], ",") - for _, str := range partitionString { - p, err := strconv.Atoi(str) - if err != nil { - return "", "", nil, fmt.Errorf("unable to parse partition %v in flag %v: %v", str, ntp, err) - } - partitions = append(partitions, p) - } - return ns, match[2], partitions, nil -} diff --git a/src/go/rpk/pkg/cli/cluster/partitions/toggle_test.go b/src/go/rpk/pkg/cli/cluster/partitions/toggle_test.go deleted file mode 100644 index 376a28f1466d..000000000000 --- a/src/go/rpk/pkg/cli/cluster/partitions/toggle_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2023 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package partitions - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func Test_parsePartition(t *testing.T) { - for _, tt := range []struct { - name string - input string - expNs string - expTopic string - expPartitions []int - expErr bool - }{ - { - name: "complete", - input: "_redpanda_internal/topic-foo1/2,3,1", - expNs: "_redpanda_internal", - expTopic: "topic-foo1", - expPartitions: []int{2, 3, 1}, - }, { - name: "topic and partitions", - input: "myTopic/1,2", - expNs: "kafka", - expTopic: "myTopic", - expPartitions: []int{1, 2}, - }, { - name: "topic and single partition", - input: "myTopic/12", - expNs: "kafka", - expTopic: "myTopic", - expPartitions: []int{12}, - }, { - name: "just partitions", - input: "1,2,3,5,8,13,21", - expNs: "kafka", - expTopic: "", - expPartitions: []int{1, 2, 3, 5, 8, 13, 21}, - }, { - name: "single partition", - input: "13", - expNs: "kafka", - expTopic: "", - expPartitions: []int{13}, - }, { - name: "topic with dot", - input: "my.topic.foo/1", - expNs: "kafka", - expTopic: "my.topic.foo", - expPartitions: []int{1}, - }, { - name: "wrong format 1", - input: "thirteen", - expErr: true, - }, { - name: "wrong format 2", - input: "_internal|foo|1,2,3", - expErr: true, - }, - } { - t.Run(tt.name, func(t *testing.T) { - gotNs, gotTopic, gotPartitions, err := parsePartition(tt.input) - if tt.expErr { - require.Error(t, err) - return - } - require.NoError(t, err) - - require.Equal(t, tt.expNs, gotNs) - require.Equal(t, tt.expTopic, gotTopic) - require.Equal(t, tt.expPartitions, gotPartitions) - }) - } -} diff --git a/src/go/rpk/pkg/cli/debug/bundle/bundle.go b/src/go/rpk/pkg/cli/debug/bundle/bundle.go index 2a650c16e951..e67700b95365 100644 --- a/src/go/rpk/pkg/cli/debug/bundle/bundle.go +++ b/src/go/rpk/pkg/cli/debug/bundle/bundle.go @@ -40,6 +40,13 @@ type bundleParams struct { controllerLogLimitBytes int timeout time.Duration metricsInterval time.Duration + partitions []topicPartitionFilter +} + +type topicPartitionFilter struct { + namespace string + topic string + partitionsID []int } func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { @@ -54,6 +61,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { controllerLogsSizeLimit string namespace string + partitionFlag []string timeout time.Duration metricsInterval time.Duration @@ -80,6 +88,9 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { yActual = y } + partitions, err := parsePartitionFlag(partitionFlag) + out.MaybeDie(err, "unable to parse partition flag %v: %v", partitionFlag, err) + cl, err := kafka.NewFranzClient(fs, p) out.MaybeDie(err, "unable to initialize kafka client: %v", err) defer cl.Close() @@ -103,6 +114,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { controllerLogLimitBytes: int(controllerLogsLimit), timeout: timeout, metricsInterval: metricsInterval, + partitions: partitions, } // To execute the appropriate bundle we look for @@ -136,6 +148,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { f.StringVar(&controllerLogsSizeLimit, "controller-logs-size-limit", "20MB", "The size limit of the controller logs that can be stored in the bundle (e.g. 3MB, 1GiB)") f.StringVar(&uploadURL, "upload-url", "", "If provided, where to upload the bundle in addition to creating a copy on disk") f.StringVarP(&namespace, "namespace", "n", "redpanda", "The namespace to use to collect the resources from (k8s only)") + f.StringArrayVarP(&partitionFlag, "partition", "p", nil, "Comma-separated partition IDs; when provided, rpk saves extra admin API requests for those partitions. Check help for extended usage") return cmd } @@ -158,6 +171,24 @@ func uploadBundle(ctx context.Context, filepath, uploadURL string) error { return cl.Put(ctx, uploadURL, nil, uploadFile, nil) } +func parsePartitionFlag(flags []string) (filters []topicPartitionFilter, rerr error) { + for _, flag := range flags { + ns, topic, partitions, err := out.ParsePartitionString(flag) + if err != nil { + return nil, err + } + if topic == "" { + return nil, fmt.Errorf("you must provide a topic") + } + filters = append(filters, topicPartitionFilter{ + namespace: ns, + topic: topic, + partitionsID: partitions, + }) + } + return +} + // S3EndpointError is the error that we get when calling an S3 url. type S3EndpointError struct { XMLName xml.Name `xml:"Error"` @@ -201,8 +232,9 @@ COMMON FILES - Clock drift: The ntp clock delta (using pool.ntp.org as a reference) & round trip time. - - Admin API calls: Cluster and broker configurations, cluster health data, and - license key information. + - Admin API calls: Multiple requests to gather information such as: Cluster and + broker configurations, cluster health data, balancer status, cloud storage + status, and license key information. - Broker metrics: The broker's Prometheus metrics, fetched through its admin API (/metrics and /public_metrics). @@ -242,6 +274,20 @@ KUBERNETES --logs-since is passed, only the logs within the given timeframe are included. +EXTRA REQUESTS FOR PARTITIONS + +You can provide a list of partitions to save additional admin API requests +specifically for those partitions. + +The partition flag accepts the format {namespace}/[topic]/[partitions...] +where the namespace is optional, if the namespace is not provided, rpk will +assume 'kafka'. For example: + +Topic 'foo', partitions 1, 2 and 3: + --partitions foo/1,2,3 + +Namespace _redpanda-internal, topic 'bar', partition 2 + --partitions _redpanda-internal/bar/2 If you have an upload URL from the Redpanda support team, provide it in the --upload-url flag to upload your diagnostics bundle to Redpanda. diff --git a/src/go/rpk/pkg/cli/debug/bundle/bundle_k8s_linux.go b/src/go/rpk/pkg/cli/debug/bundle/bundle_k8s_linux.go index e73a479f3fd9..cdbaacce37fb 100644 --- a/src/go/rpk/pkg/cli/debug/bundle/bundle_k8s_linux.go +++ b/src/go/rpk/pkg/cli/debug/bundle/bundle_k8s_linux.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" "time" @@ -83,7 +84,7 @@ func executeK8SBundle(ctx context.Context, bp bundleParams) error { errs = multierror.Append(errs, fmt.Errorf("skipping admin API calls, unable to get admin API addresses: %v", err)) } else { steps = append(steps, []step{ - saveClusterAdminAPICalls(ctx, ps, bp.fs, bp.p, adminAddresses), + saveClusterAdminAPICalls(ctx, ps, bp.fs, bp.p, adminAddresses, bp.partitions), saveSingleAdminAPICalls(ctx, ps, bp.fs, bp.p, adminAddresses, bp.metricsInterval), }...) } @@ -195,13 +196,9 @@ func getClusterDomain() string { return clusterDomain } -// saveClusterAdminAPICalls save the following admin API request to the zip: -// - Cluster Health: /v1/cluster/health_overview -// - Brokers: /v1/brokers -// - License Info: /v1/features/license -// - Cluster Config: /v1/cluster_config -// - Reconfigurations: /v1/partitions/reconfigurations -func saveClusterAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, p *config.RpkProfile, adminAddresses []string) step { +// saveClusterAdminAPICalls saves per-cluster Admin API requests in the 'admin/' +// directory of the bundle zip. +func saveClusterAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, p *config.RpkProfile, adminAddresses []string, partitions []topicPartitionFilter) step { return func() error { p = &config.RpkProfile{ KafkaAPI: config.RpkKafkaAPI{ @@ -218,19 +215,45 @@ func saveClusterAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, } var grp multierror.Group - for _, f := range []func() error{ + reqFuncs := []func() error{ func() error { return requestAndSave(ctx, ps, "admin/brokers.json", cl.Brokers) }, func() error { return requestAndSave(ctx, ps, "admin/health_overview.json", cl.GetHealthOverview) }, func() error { return requestAndSave(ctx, ps, "admin/license.json", cl.GetLicenseInfo) }, func() error { return requestAndSave(ctx, ps, "admin/reconfigurations.json", cl.Reconfigurations) }, + func() error { return requestAndSave(ctx, ps, "admin/features.json", cl.GetFeatures) }, + func() error { return requestAndSave(ctx, ps, "admin/uuid.json", cl.ClusterUUID) }, + func() error { + return requestAndSave(ctx, ps, "admin/automated_recovery.json", cl.PollAutomatedRecoveryStatus) + }, + func() error { + return requestAndSave(ctx, ps, "admin/cloud_storage_lifecycle.json", cl.CloudStorageLifecycle) + }, + func() error { + return requestAndSave(ctx, ps, "admin/partition_balancer_status.json", cl.GetPartitionStatus) + }, func() error { // Need to wrap this function because cl.Config receives an additional 'includeDefaults' param. f := func(ctx context.Context) (adminapi.Config, error) { return cl.Config(ctx, true) } return requestAndSave(ctx, ps, "admin/cluster_config.json", f) + }, func() error { + f := func(ctx context.Context) (adminapi.ConfigStatusResponse, error) { + return cl.ClusterConfigStatus(ctx, true) + } + return requestAndSave(ctx, ps, "admin/cluster_config_status.json", f) + }, func() error { + f := func(ctx context.Context) ([]adminapi.ClusterPartition, error) { + return cl.AllClusterPartitions(ctx, true, false) // include defaults, and include disabled. + } + return requestAndSave(ctx, ps, "admin/cluster_partitions.json", f) }, - } { + } + if partitions != nil { + extraFuncs := saveExtraFuncs(ctx, ps, cl, partitions) + reqFuncs = append(reqFuncs, extraFuncs...) + } + for _, f := range reqFuncs { grp.Go(f) } errs := grp.Wait() @@ -238,11 +261,8 @@ func saveClusterAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, } } -// saveSingleAdminAPICalls save the following per-node admin API request to the -// zip: -// - Node Config: /v1/node_config -// - Prometheus Metrics: /metrics and /public_metrics -// - Cluster View: v1/cluster_view +// saveSingleAdminAPICalls saves per-node admin API requests in the 'admin/' +// directory of the bundle zip. func saveSingleAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, p *config.RpkProfile, adminAddresses []string, metricsInterval time.Duration) step { return func() error { var rerrs *multierror.Error @@ -272,6 +292,27 @@ func saveSingleAdminAPICalls(ctx context.Context, ps *stepParams, fs afero.Fs, p func() error { return requestAndSave(ctx, ps, fmt.Sprintf("admin/cluster_view_%v.json", aName), cl.ClusterView) }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/maintenance_status_%v.json", aName), cl.MaintenanceStatus) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/raft_status_%v.json", aName), cl.RaftRecoveryStatus) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/partition_leader_table_%v.json", aName), cl.PartitionLeaderTable) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/is_node_isolated_%v.json", aName), cl.IsNodeIsolated) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/controller_status_%v.json", aName), cl.ControllerStatus) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/disk_stat_data_%v.json", aName), cl.DiskData) + }, + func() error { + return requestAndSave(ctx, ps, fmt.Sprintf("admin/disk_stat_cache_%v.json", aName), cl.DiskCache) + }, func() error { err := requestAndSave(ctx, ps, fmt.Sprintf("metrics/%v/t0_metrics.txt", aName), cl.PrometheusMetrics) if err != nil { @@ -494,3 +535,45 @@ func sanitizeName(name string) string { } return r } + +func saveExtraFuncs(ctx context.Context, ps *stepParams, cl *adminapi.AdminAPI, partitionFilters []topicPartitionFilter) (funcs []func() error) { + for _, tpf := range partitionFilters { + tpf := tpf + for _, p := range tpf.partitionsID { + p := p + funcs = append(funcs, []func() error{ + func() error { + f := func(ctx context.Context) (adminapi.Partition, error) { + return cl.GetPartition(ctx, tpf.namespace, tpf.topic, p) + } + return requestAndSave(ctx, ps, fmt.Sprintf("partitions/info_%v_%v_%v.json", tpf.namespace, tpf.topic, p), f) + }, + func() error { + f := func(ctx context.Context) (adminapi.DebugPartition, error) { + return cl.DebugPartition(ctx, tpf.namespace, tpf.topic, p) + } + return requestAndSave(ctx, ps, fmt.Sprintf("partitions/debug_%v_%v_%v.json", tpf.namespace, tpf.topic, p), f) + }, + func() error { + f := func(ctx context.Context) (adminapi.CloudStorageStatus, error) { + return cl.CloudStorageStatus(ctx, tpf.topic, strconv.Itoa(p)) + } + return requestAndSave(ctx, ps, fmt.Sprintf("partitions/cloud_status_%v_%v.json", tpf.topic, p), f) + }, + func() error { + f := func(ctx context.Context) (adminapi.CloudStorageManifest, error) { + return cl.CloudStorageManifest(ctx, tpf.topic, p) + } + return requestAndSave(ctx, ps, fmt.Sprintf("partitions/cloud_manifest_%v_%v.json", tpf.topic, p), f) + }, + func() error { + f := func(ctx context.Context) (adminapi.CloudStorageAnomalies, error) { + return cl.CloudStorageAnomalies(ctx, tpf.namespace, tpf.topic, p) + } + return requestAndSave(ctx, ps, fmt.Sprintf("partitions/cloud_anomalies_%v_%v_%v.json", tpf.namespace, tpf.topic, p), f) + }, + }...) + } + } + return +} diff --git a/src/go/rpk/pkg/cli/debug/bundle/bundle_linux.go b/src/go/rpk/pkg/cli/debug/bundle/bundle_linux.go index 7aa8339c88dc..a60337b54b57 100644 --- a/src/go/rpk/pkg/cli/debug/bundle/bundle_linux.go +++ b/src/go/rpk/pkg/cli/debug/bundle/bundle_linux.go @@ -126,7 +126,7 @@ func executeBundle(ctx context.Context, bp bundleParams) error { steps := []step{ saveCPUInfo(ps), - saveClusterAdminAPICalls(ctx, ps, bp.fs, bp.p, addrs), + saveClusterAdminAPICalls(ctx, ps, bp.fs, bp.p, addrs, bp.partitions), saveCmdLine(ps), saveConfig(ps, bp.y), saveControllerLogDir(ps, bp.y, bp.controllerLogLimitBytes), diff --git a/src/go/rpk/pkg/out/in.go b/src/go/rpk/pkg/out/in.go index ccf2a6599a7f..8fdba4080206 100644 --- a/src/go/rpk/pkg/out/in.go +++ b/src/go/rpk/pkg/out/in.go @@ -17,8 +17,10 @@ import ( "io" "path/filepath" "reflect" + "regexp" "strconv" "strings" + "sync" "github.com/spf13/afero" "gopkg.in/yaml.v3" @@ -162,3 +164,40 @@ func ParseTopicPartitions(list []string) (map[string][]int32, error) { } return tps, nil } + +var ( + partitionRe *regexp.Regexp + partitionReOnce sync.Once +) + +// ParsePartitionString parses a partition string with the format: +// {namespace}/{topic}/[partitions...] +// where namespace and topic are optionals, and partitions are comma-separated +// partitions ID. If namespace is not provided, the function assumes 'kafka'. +func ParsePartitionString(ntp string) (ns, topic string, partitions []int, rerr error) { + partitionReOnce.Do(func() { + // Matches {namespace}/{topic}/[partitions...] + // - Index 0: Full Match. + // - Index 1: Namespace, if present. + // - Index 2: Topic, if present. + // - Index 3: Comma-separated partitions. + partitionRe = regexp.MustCompile(`^(?:(?:([^/]+)/)?([^/]+)/)?(\d+(?:,\d+)*)$`) + }) + match := partitionRe.FindStringSubmatch(ntp) + if len(match) == 0 { + return "", "", nil, fmt.Errorf("unable to parse %q: wrong format", ntp) + } + ns = match[1] + if ns == "" { + ns = "kafka" + } + partitionString := strings.Split(match[3], ",") + for _, str := range partitionString { + p, err := strconv.Atoi(str) + if err != nil { + return "", "", nil, fmt.Errorf("unable to parse partition %v from string %v: %v", str, ntp, err) + } + partitions = append(partitions, p) + } + return ns, match[2], partitions, nil +} diff --git a/src/go/rpk/pkg/out/in_test.go b/src/go/rpk/pkg/out/in_test.go index 7acfdb8875e7..a5138dd41a61 100644 --- a/src/go/rpk/pkg/out/in_test.go +++ b/src/go/rpk/pkg/out/in_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/testfs" + "github.com/stretchr/testify/require" ) func TestParseFileArray(t *testing.T) { @@ -142,3 +143,77 @@ third 5 s2 true 3 0 }) } } + +func Test_parsePartition(t *testing.T) { + for _, tt := range []struct { + name string + input string + expNs string + expTopic string + expPartitions []int + expErr bool + }{ + { + name: "complete", + input: "_redpanda_internal/topic-foo1/2,3,1", + expNs: "_redpanda_internal", + expTopic: "topic-foo1", + expPartitions: []int{2, 3, 1}, + }, { + name: "topic and partitions", + input: "myTopic/1,2", + expNs: "kafka", + expTopic: "myTopic", + expPartitions: []int{1, 2}, + }, { + name: "topic and single partition", + input: "myTopic/12", + expNs: "kafka", + expTopic: "myTopic", + expPartitions: []int{12}, + }, { + name: "just partitions", + input: "1,2,3,5,8,13,21", + expNs: "kafka", + expTopic: "", + expPartitions: []int{1, 2, 3, 5, 8, 13, 21}, + }, { + name: "single partition", + input: "13", + expNs: "kafka", + expTopic: "", + expPartitions: []int{13}, + }, { + name: "topic with dot", + input: "my.topic.foo/1", + expNs: "kafka", + expTopic: "my.topic.foo", + expPartitions: []int{1}, + }, { + name: "wrong format 1", + input: "thirteen", + expErr: true, + }, { + name: "wrong format 2", + input: "_internal|foo|1,2,3", + expErr: true, + }, { + name: "empty input", + input: "", + expErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + gotNs, gotTopic, gotPartitions, err := ParsePartitionString(tt.input) + if tt.expErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, tt.expNs, gotNs) + require.Equal(t, tt.expTopic, gotTopic) + require.Equal(t, tt.expPartitions, gotPartitions) + }) + } +}