Skip to content

Commit

Permalink
Sync dev as of f7869db
Browse files Browse the repository at this point in the history
  • Loading branch information
joejulian committed Apr 3, 2023
1 parent 66e350f commit 4336360
Show file tree
Hide file tree
Showing 78 changed files with 1,607 additions and 1,005 deletions.
9 changes: 8 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"net/url"
"time"

cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -170,6 +170,13 @@ type ClusterSpec struct {

// If key is not provided in the SecretRef, Secret data should have key "license"
LicenseRef *SecretKeyRef `json:"licenseRef,omitempty"`

// When InitialValidationForVolume is enabled the mounted Redpanda data folder
// will be checked if:
// - it is dir
// - it has XFS file system
// - it can create test file and delete it
InitialValidationForVolume *bool `json:"initialValidationForVolume,omitempty"`
}

// RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades
Expand Down
5 changes: 3 additions & 2 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"testing"
"time"

"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"

"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
)

//nolint:funlen // this is ok for a test
Expand Down
7 changes: 4 additions & 3 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"regexp"
"strconv"

cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -29,6 +27,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
)

const (
Expand Down
5 changes: 3 additions & 2 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (
"strings"
"testing"

cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -25,6 +24,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/utils/pointer"

"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
)

//nolint:funlen // this is ok for a test
Expand Down
8 changes: 3 additions & 5 deletions src/go/k8s/apis/redpanda/v1alpha1/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
admissionv1beta1 "k8s.io/api/admission/v1beta1" //nolint:goimports // crlfmt
//+kubebuilder:scaffold:imports
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
Expand All @@ -45,9 +45,7 @@ var (
func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecsWithDefaultAndCustomReporters(t,
"Webhook Suite",
[]Reporter{printer.NewlineReporter{}})
RunSpecs(t, "Webhook Suite")
}

var _ = BeforeSuite(func() {
Expand Down
7 changes: 6 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 80 additions & 3 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"log"
"os"
"path"
"path/filepath"
"strconv"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/moby/sys/mountinfo"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -45,6 +47,7 @@ const (
nodeNameEnvVar = "NODE_NAME"
proxyHostPortEnvVar = "PROXY_HOST_PORT"
rackAwarenessEnvVar = "RACK_AWARENESS"
validateMountedVolumeEnvVar = "VALIDATE_MOUNTED_VOLUME"
redpandaRPCPortEnvVar = "REDPANDA_RPC_PORT"
svcFQDNEnvVar = "SERVICE_FQDN"
)
Expand All @@ -64,6 +67,7 @@ type configuratorConfig struct {
nodeName string
proxyHostPort int
rackAwareness bool
validateMountedVolume bool
redpandaRPCPort int
subdomain string
svcFQDN string
Expand All @@ -82,7 +86,8 @@ func (c *configuratorConfig) String() string {
"redpandaRPCPort: %d\n"+
"hostPort: %d\n"+
"proxyHostPort: %d\n"+
"rackAwareness: %t\n",
"rackAwareness: %t\n"+
"validateMountedVolume: %t\n",
c.hostName,
c.svcFQDN,
c.configSourceDir,
Expand All @@ -94,7 +99,8 @@ func (c *configuratorConfig) String() string {
c.redpandaRPCPort,
c.hostPort,
c.proxyHostPort,
c.rackAwareness)
c.rackAwareness,
c.validateMountedVolume)
}

var errorMissingEnvironmentVariable = errors.New("missing environment variable")
Expand All @@ -120,6 +126,11 @@ func main() {
log.Fatalf("%s", fmt.Errorf("unable to parse the redpanda configuration file, %q: %w", p, err))
}

err = validateMountedVolume(cfg, c.validateMountedVolume)
if err != nil {
log.Fatalf("%s", fmt.Errorf("unable to pass validation for the mounted volume: %w", err))
}

kafkaAPIPort, err := getInternalKafkaAPIPort(cfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -181,7 +192,64 @@ func main() {
log.Printf("Configuration saved to: %s", c.configDestination)
}

var errInternalPortMissing = errors.New("port configration is missing internal port")
func validateMountedVolume(cfg *config.Config, validate bool) error {
if !validate {
return nil
}
dir, err := os.Open(cfg.Redpanda.Directory)
if err != nil {
return fmt.Errorf("unable to open Redpanda directory (%s): %w", cfg.Redpanda.Directory, err)
}
defer func() {
if errClose := dir.Close(); errClose != nil {
log.Printf("Error closing file: %s, %s\n", cfg.Redpanda.Directory, errClose)
}
}()

stat, err := dir.Stat()
if err != nil {
return fmt.Errorf("unable to stat the dir: %s: %w", cfg.Redpanda.Directory, err)
}

if !stat.IsDir() {
return fmt.Errorf("%s is not a directory", cfg.Redpanda.Directory) //nolint:goerr113 // Error will not be validated, but rather returned to the end user of configurator
}

info, err := mountinfo.GetMounts(mountinfo.FSTypeFilter("xfs"))
if err != nil {
return fmt.Errorf("%s must have an xfs formatted filesystem. unable to find xfs file system in /proc/self/mountinfo: %w", cfg.Redpanda.Directory, err)
}

if len(info) == 0 {
return fmt.Errorf("%s must have an xfs formatted filesystem. returned mount info (/proc/self/mountinfo) does not have any xfs file system", cfg.Redpanda.Directory) //nolint:goerr113 // Error will not be validated, but rather returned to the end user of configurator
}

found := false
for _, fs := range info {
if fs.Mountpoint == cfg.Redpanda.Directory {
found = true
}
}

if !found {
return fmt.Errorf("returned XFS mount info list (/proc/self/mountinfo) does not have Redpanda directory (%s)", cfg.Redpanda.Directory) //nolint:goerr113 // Error will not be validated, but rather returned to the end user of configurator
}

file := filepath.Join(cfg.Redpanda.Directory, "testing.file")
err = os.WriteFile(file, []byte("test-content"), 0o600)
if err != nil {
return fmt.Errorf("unable to write to test file (%s): %w", file, err)
}

err = os.Remove(file)
if err != nil {
return fmt.Errorf("unable to remove test file (%s): %w", file, err)
}

return nil
}

var errInternalPortMissing = errors.New("port configuration is missing internal port")

func getZoneLabels(nodeName string) (zone, zoneID string, err error) {
node, err := getNode(nodeName)
Expand Down Expand Up @@ -427,6 +495,15 @@ func checkEnvVars() (configuratorConfig, error) {
result = multierror.Append(result, fmt.Errorf("unable to parse bool: %w", err))
}

validateMountedVolume, exist := os.LookupEnv(validateMountedVolumeEnvVar)
if !exist {
result = multierror.Append(result, fmt.Errorf("%s %w", validateMountedVolumeEnvVar, errorMissingEnvironmentVariable))
}
c.validateMountedVolume, err = strconv.ParseBool(validateMountedVolume)
if err != nil {
result = multierror.Append(result, fmt.Errorf("unable to parse bool: %w", err))
}

// Providing the address type is optional.
addressType, exists := os.LookupEnv(externalConnectivityAddressTypeEnvVar)
if exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ spec:
image:
description: Image is the fully qualified name of the Redpanda container
type: string
initialValidationForVolume:
description: 'When InitialValidationForVolume is enabled the mounted
Redpanda data folder will be checked if: - it is dir - it has XFS
file system - it can create test file and delete it'
type: boolean
kafkaEnableAuthorization:
description: "Enable authorization for Kafka connections. Values are:
\n - `nil`: Ignored. Authorization is enabled with `enable_sasl:
Expand Down
53 changes: 48 additions & 5 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (r *ClusterReconciler) Reconcile(
var raErr *resources.RequeueAfterError
if errors.As(err, &raErr) {
log.Info(raErr.Error())
return ctrl.Result{RequeueAfter: requeueErr.RequeueAfter}, nil
return ctrl.Result{RequeueAfter: raErr.RequeueAfter}, nil
}
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -524,22 +524,65 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
if _, ok := pod.Annotations[PodAnnotationNodeIDKey]; ok {
nodeIDStr, ok := pod.Annotations[PodAnnotationNodeIDKey]

nodeID, err := r.fetchAdminNodeID(ctx, rp, pod, log)
if err != nil {
return fmt.Errorf(`cannot fetch node id for "%s" node-id annotation: %w`, pod.Name, err)
}

realNodeIDStr := fmt.Sprintf("%d", nodeID)

if ok && realNodeIDStr == nodeIDStr {
continue
}
nodeID, err := r.fetchAdminNodeID(ctx, rp, pod, log)

oldNodeID, err := strconv.Atoi(nodeIDStr)
if err != nil {
return fmt.Errorf("cannot fetch node id for node-id annotation: %w", err)
return fmt.Errorf("unable to convert node ID (%s) to int: %w", nodeIDStr, err)
}

if err = r.decommissionBroker(ctx, rp, oldNodeID, log); err != nil {
return fmt.Errorf("unable to decommission broker: %w", err)
}

log.WithValues("pod-name", pod.Name, "node-id", nodeID).Info("setting node-id annotation")
pod.Annotations[PodAnnotationNodeIDKey] = fmt.Sprintf("%d", nodeID)
pod.Annotations[PodAnnotationNodeIDKey] = realNodeIDStr
if err := r.Update(ctx, pod, &client.UpdateOptions{}); err != nil {
return fmt.Errorf(`unable to update pod "%s" with node-id annotation: %w`, pod.Name, err)
}
}
return nil
}

func (r *ClusterReconciler) decommissionBroker(
ctx context.Context, rp *redpandav1alpha1.Cluster, nodeID int, log logr.Logger,
) error {
log.V(6).WithValues("node-id", nodeID).Info("decommission broker")

redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
clusterPorts := collectClusterPorts(redpandaPorts, rp)
headlessSvc := resources.NewHeadlessService(r.Client, rp, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, rp, r.Scheme, clusterPorts, log)

pki, err := certmanager.NewPki(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
if err != nil {
return fmt.Errorf("unable to create pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}

err = adminClient.DecommissionBroker(ctx, nodeID)
if err != nil {
return fmt.Errorf("unable to decommission broker: %w", err)
}
return nil
}

func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1alpha1.Cluster, pod *corev1.Pod, log logr.Logger) (int32, error) {
redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
Expand Down
Loading

0 comments on commit 4336360

Please sign in to comment.