Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk debug bundle: save additional admin API requests. #15136

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/go/rpk/pkg/adminapi/api_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (a *AdminAPI) DisableMaintenanceMode(ctx context.Context, nodeID int, useLe
}
}

// MaintenanceStatus returns the maintenance status of a node.
func (a *AdminAPI) MaintenanceStatus(ctx context.Context) (MaintenanceStatus, error) {
var response MaintenanceStatus
return response, a.sendAny(ctx, http.MethodGet, "/v1/maintenance", nil, nil)
}

func (a *AdminAPI) CancelNodePartitionsMovement(ctx context.Context, node int) ([]PartitionsMovementResult, error) {
var response []PartitionsMovementResult
return response, a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s/%d/cancel_partition_moves", brokersEndpoint, node), nil, &response)
Expand Down
36 changes: 36 additions & 0 deletions src/go/rpk/pkg/adminapi/api_cloud_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ type CloudStorageStatus struct {
MetadataUpdatePending bool `json:"metadata_update_pending"` // If true, the remote metadata may not yet include all segments that have been uploaded.
}

// CloudStorageLifecycle are the lifecycle markers for topics pending deletion.
type CloudStorageLifecycle struct {
Markers []LifecycleMarker `json:"markers"`
}

// LifecycleMarker Is the lifecycle status of a topic (e.g. during deletion).
type LifecycleMarker struct {
Ns string `json:"ns"`
Topic string `json:"topic"`
RevisionID int `json:"revision_id"`
Status string `json:"status"`
}

type (
CloudStorageManifest map[string]any
CloudStorageAnomalies map[string]any
)

// StartAutomatedRecovery starts the automated recovery process by sending a request to the automated recovery API endpoint.
func (a *AdminAPI) StartAutomatedRecovery(ctx context.Context, topicNamesPattern string) (RecoveryStartResponse, error) {
requestParams := &RecoveryRequestParams{
Expand All @@ -82,3 +100,21 @@ func (a *AdminAPI) CloudStorageStatus(ctx context.Context, topic, partition stri
path := fmt.Sprintf("/v1/cloud_storage/status/%s/%s", topic, partition)
return response, a.sendAny(ctx, http.MethodGet, path, http.NoBody, &response)
}

// CloudStorageLifecycle returns lifecycle markers for topics pending deletion.
func (a *AdminAPI) CloudStorageLifecycle(ctx context.Context) (CloudStorageLifecycle, error) {
var response CloudStorageLifecycle
return response, a.sendToLeader(ctx, http.MethodGet, "/v1/cloud_storage/lifecycle", nil, &response)
}

func (a *AdminAPI) CloudStorageManifest(ctx context.Context, topic string, partition int) (CloudStorageManifest, error) {
var response CloudStorageManifest
path := fmt.Sprintf("/v1/cloud_storage/manifest/%v/%v", topic, partition)
return response, a.sendAny(ctx, http.MethodGet, path, nil, &response)
}

func (a *AdminAPI) CloudStorageAnomalies(ctx context.Context, namespace, topic string, partition int) (CloudStorageAnomalies, error) {
var response CloudStorageAnomalies
path := fmt.Sprintf("/v1/cloud_storage/anomalies/%v/%v/%v", namespace, topic, partition)
return response, a.sendAny(ctx, http.MethodGet, path, nil, &response)
}
11 changes: 11 additions & 0 deletions src/go/rpk/pkg/adminapi/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type PartitionsMovementResult struct {
Result string `json:"result,omitempty"`
}

type ClusterUUID struct {
UUID string `json:"cluster_uuid"`
}

// ClusterView represents a cluster view as seen by one node. There are
// many keys returned, so the raw response is just unmarshalled into an
// interface.
Expand All @@ -94,3 +98,10 @@ func (a *AdminAPI) ClusterView(ctx context.Context) (ClusterView, error) {
var response ClusterView
return response, a.sendOne(ctx, http.MethodGet, "/v1/cluster_view", nil, &response, true)
}

// ClusterUUID returns the UUID of the cluster this node belongs to. Not to be
// confused with the configurable cluster identifier used in metrics.
func (a *AdminAPI) ClusterUUID(ctx context.Context) (ClusterUUID, error) {
var response ClusterUUID
return response, a.sendToLeader(ctx, http.MethodGet, "/v1/cluster/uuid", nil, &response)
}
57 changes: 57 additions & 0 deletions src/go/rpk/pkg/adminapi/api_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,37 @@ type SelfTestRequest struct {
Nodes []int `json:"nodes,omitempty"`
}

// PartitionLeaderTable is the information about leaders, the information comes
// from the Redpanda's partition_leaders_table.
type PartitionLeaderTable struct {
Ns string `json:"ns"`
Topic string `json:"topic"`
PartitionID int `json:"partition_id"`
Leader int `json:"leader"`
PreviousLeader int `json:"previous_leader"`
LastStableLeaderTerm int `json:"last_stable_leader_term"`
UpdateTerm int `json:"update_term"`
PartitionRevision int `json:"partition_revision"`
}

// ControllerStatus is the status of a controller, as seen by a node.
type ControllerStatus struct {
StartOffset int `json:"start_offset"`
LastAppliedOffset int `json:"last_applied_offset"`
CommittedIndex int `json:"committed_index"`
DirtyOffset int `json:"dirty_offset"`
}

// ReplicaState is the partition state of a replica. There are many keys
// returned, so the raw response is just unmarshalled into an interface.
type ReplicaState map[string]any

// DebugPartition is the low level debug information of a partition.
type DebugPartition struct {
Ntp string `json:"ntp"`
Replicas []ReplicaState `json:"replicas"`
}

func (a *AdminAPI) StartSelfTest(ctx context.Context, nodeIds []int, params []any) (string, error) {
var testID string
body := SelfTestRequest{
Expand Down Expand Up @@ -129,3 +160,29 @@ func (a *AdminAPI) SelfTestStatus(ctx context.Context) ([]SelfTestNodeReport, er
err := a.sendAny(ctx, http.MethodGet, fmt.Sprintf("%s/status", debugEndpoint), nil, &response)
return response, err
}

// PartitionLeaderTable returns the partitions leader table for the requested
// node.
func (a *AdminAPI) PartitionLeaderTable(ctx context.Context) ([]PartitionLeaderTable, error) {
var response []PartitionLeaderTable
return response, a.sendAny(ctx, http.MethodGet, "/v1/debug/partition_leaders_table", nil, &response)
}

func (a *AdminAPI) IsNodeIsolated(ctx context.Context) (bool, error) {
var isIsolated bool
return isIsolated, a.sendAny(ctx, http.MethodGet, "/v1/debug/is_node_isolated", nil, &isIsolated)
}

// ControllerStatus returns the controller status, as seen by the requested
// node.
func (a *AdminAPI) ControllerStatus(ctx context.Context) (ControllerStatus, error) {
var response ControllerStatus
return response, a.sendAny(ctx, http.MethodGet, "/v1/debug/controller_status", nil, &response)
}

// DebugPartition returns low level debug information (on any node) of all
// replicas of a given partition.
func (a *AdminAPI) DebugPartition(ctx context.Context, namespace, topic string, partitionID int) (DebugPartition, error) {
var response DebugPartition
return response, a.sendAny(ctx, http.MethodGet, fmt.Sprintf("/v1/debug/partition/%v/%v/%v", namespace, topic, partitionID), nil, &response)
}
4 changes: 2 additions & 2 deletions src/go/rpk/pkg/adminapi/api_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type LicenseProperties struct {
// GetFeatures returns information about the available features.
func (a *AdminAPI) GetFeatures(ctx context.Context) (FeaturesResponse, error) {
var features FeaturesResponse
return features, a.sendAny(
return features, a.sendToLeader(
twmb marked this conversation as resolved.
Show resolved Hide resolved
ctx,
http.MethodGet,
"/v1/features",
Expand All @@ -64,7 +64,7 @@ func (a *AdminAPI) GetFeatures(ctx context.Context) (FeaturesResponse, error) {

func (a *AdminAPI) GetLicenseInfo(ctx context.Context) (License, error) {
var license License
return license, a.sendAny(ctx, http.MethodGet, "/v1/features/license", nil, &license)
return license, a.sendToLeader(ctx, http.MethodGet, "/v1/features/license", nil, &license)
}

func (a *AdminAPI) SetLicense(ctx context.Context, license interface{}) error {
Expand Down
27 changes: 27 additions & 0 deletions src/go/rpk/pkg/adminapi/api_raft.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package adminapi

import (
"context"
"net/http"
)

type RaftRecoveryStatus struct {
PartitionsToRecover int `json:"partitions_to_recover"`
PartitionsActive int `json:"partitions_active"`
OffsetsPending int `json:"offsets_pending"`
}

// RaftRecoveryStatus returns the node's recovery status.
func (a *AdminAPI) RaftRecoveryStatus(ctx context.Context) (RaftRecoveryStatus, error) {
var status RaftRecoveryStatus
return status, a.sendOne(ctx, http.MethodGet, "/v1/raft/recovery/status", nil, &status, false)
}
29 changes: 29 additions & 0 deletions src/go/rpk/pkg/adminapi/api_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package adminapi

import (
"context"
"net/http"
)

// DiskStatInfo is the disk data returned by the /disk_stat endpoint of the
// admin API.
type DiskStatInfo map[string]any

func (a *AdminAPI) DiskCache(ctx context.Context) (DiskStatInfo, error) {
var response DiskStatInfo
return response, a.sendOne(ctx, http.MethodGet, "/v1/debug/storage/disk_stat/cache", nil, &response, false)
}

func (a *AdminAPI) DiskData(ctx context.Context) (DiskStatInfo, error) {
var response DiskStatInfo
return response, a.sendOne(ctx, http.MethodGet, "/v1/debug/storage/disk_stat/data", nil, &response, false)
}
43 changes: 1 addition & 42 deletions src/go/rpk/pkg/cli/cluster/partitions/toggle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"context"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"sync"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -178,7 +174,7 @@ func runToggle(ctx context.Context, cl *adminapi.AdminAPI, all bool, topicArg, p
}
g, egCtx := errgroup.WithContext(ctx)
for _, ntp := range partitionFlag {
ns, topicName, partitions, err := parsePartition(ntp)
ns, topicName, partitions, err := out.ParsePartitionString(ntp)
if err != nil {
return err
}
Expand All @@ -204,40 +200,3 @@ func runToggle(ctx context.Context, cl *adminapi.AdminAPI, all bool, topicArg, p
}
return g.Wait()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to the out package. The toggle_test.go file was deleted and moved to in_test.go (inside out package)

var (
partitionRe *regexp.Regexp
partitionReOnce sync.Once
)

// parsePartition parses the partition flag with the format:
// {namespace}/{topic}/[partitions...]
// where namespace and topic are optionals, and partitions are comma-separated
// partitions ID. If namespace is not provided, the function assumes 'kafka'.
func parsePartition(ntp string) (ns, topic string, partitions []int, rerr error) {
partitionReOnce.Do(func() {
// Matches {namespace}/{topic}/[partitions...]
// - Index 0: Full Match.
// - Index 1: Namespace, if present.
// - Index 2: Topic, if present.
// - Index 3: Comma-separated partitions.
partitionRe = regexp.MustCompile(`^(?:(?:([^/]+)/)?([^/]+)/)?(\d+(?:,\d+)*)$`)
})
match := partitionRe.FindStringSubmatch(ntp)
if len(match) == 0 {
return "", "", nil, fmt.Errorf("unable to parse %q: wrong format", ntp)
}
ns = match[1]
if ns == "" {
ns = "kafka"
}
partitionString := strings.Split(match[3], ",")
for _, str := range partitionString {
p, err := strconv.Atoi(str)
if err != nil {
return "", "", nil, fmt.Errorf("unable to parse partition %v in flag %v: %v", str, ntp, err)
}
partitions = append(partitions, p)
}
return ns, match[2], partitions, nil
}
86 changes: 0 additions & 86 deletions src/go/rpk/pkg/cli/cluster/partitions/toggle_test.go

This file was deleted.

Loading