Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset cluster using API and redis operator yaml #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions api/v1beta1/rediscluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ type RedisClusterSpec struct {
// +kubebuilder:default:={livenessProbe:{initialDelaySeconds: 1, timeoutSeconds: 1, periodSeconds: 10, successThreshold: 1, failureThreshold:3}, readinessProbe:{initialDelaySeconds: 1, timeoutSeconds: 1, periodSeconds: 10, successThreshold: 1, failureThreshold:3}}
RedisLeader RedisLeader `json:"redisLeader,omitempty"`
// +kubebuilder:default:={livenessProbe:{initialDelaySeconds: 1, timeoutSeconds: 1, periodSeconds: 10, successThreshold: 1, failureThreshold:3}, readinessProbe:{initialDelaySeconds: 1, timeoutSeconds: 1, periodSeconds: 10, successThreshold: 1, failureThreshold:3}}
RedisFollower RedisFollower `json:"redisFollower,omitempty"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
Storage *Storage `json:"storage,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
RedisFollower RedisFollower `json:"redisFollower,omitempty"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
Storage *Storage `json:"storage,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
DangerouslyRecreateClusterOnError bool `json:"dangerouslyRecreateClusterOnError,omitempty"`
}

func (cr *RedisClusterSpec) GetReplicaCounts(t string) int32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
format: int32
minimum: 3
type: integer
dangerouslyRecreateClusterOnError:
type: boolean
kubernetesConfig:
description: KubernetesConfig will be the JSON struct for Basic Redis
Config
Expand Down
2 changes: 2 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ spec:
- -zap-log-level=info
image: quay.io/opstree/redis-operator:v0.9.0
imagePullPolicy: Always
ports:
- containerPort: 8090
name: manager
securityContext:
allowPrivilegeEscalation: false
Expand Down
86 changes: 76 additions & 10 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ package controllers

import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"net/http"
"os"
"redis-operator/k8sutils"
"strconv"
"time"

"redis-operator/k8sutils"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,43 +47,50 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger := r.Log.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
reqLogger.Info("Reconciling opstree redis Cluster controller")
instance := &redisv1beta1.RedisCluster{}

// NOTE: retrieves redis deployment instance detail.
// QUERY: But why not pass the ctx received in reconcile
err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

// NOTE: retrieve the expected number of leaders and followers from spec (not from cluster)
leaderReplicas := instance.Spec.GetReplicaCounts("leader")
followerReplicas := instance.Spec.GetReplicaCounts("follower")
totalReplicas := leaderReplicas + followerReplicas

// NOTE: if the redis cluster is marked to be deleted then execute deletion workflow.
if err := k8sutils.HandleRedisClusterFinalizer(instance, r.Client); err != nil {
return ctrl.Result{}, err
}

// QUERY: Add redis cluster finalizer but why ? Deletion is detected by deletion timestamp. so it can be done anyways.
if err := k8sutils.AddRedisClusterFinalizer(instance, r.Client); err != nil {
return ctrl.Result{}, err
}

// NOTE: Create a patch of stateful set definition and applies it.
err = k8sutils.CreateRedisLeader(instance)
if err != nil {
return ctrl.Result{}, err
}
if leaderReplicas != 0 {
// NOTE: Same. creates a patch for service and applies.
err = k8sutils.CreateRedisLeaderService(instance)
if err != nil {
return ctrl.Result{}, err
}
}

// NOTE: None of the clusters have PDB. So not applicable
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget)
if err != nil {
return ctrl.Result{}, err
}

// START: Same for follower.
err = k8sutils.CreateRedisFollower(instance)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -96,6 +106,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err != nil {
return ctrl.Result{}, err
}
// END: Same for follower.

redisLeaderInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-leader")
if err != nil {
Expand All @@ -115,23 +126,53 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Info("Redis leader and follower nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)
return ctrl.Result{RequeueAfter: time.Second * 120}, nil
}
reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)))
if k8sutils.CheckRedisNodeCount(instance, "") != totalReplicas {
redisClusterNodes := k8sutils.CheckRedisNodeCount(instance, "")
reqLogger.Info("Creating redis cluster by executing cluster creation commands",
"Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)),
"Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)),
"redisClusterNodes", redisClusterNodes)

if redisClusterNodes != totalReplicas {
leaderCount := k8sutils.CheckRedisNodeCount(instance, "leader")
if leaderCount != leaderReplicas {
reqLogger.Info("Not all leader are part of the cluster...", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas)
k8sutils.ExecuteRedisClusterCommand(instance)
reqLogger.Info("Not all leader are part of the cluster...",
"Leaders.Count", leaderCount,
"Instance.Size", leaderReplicas,
"DangerouslyRecreateClusterOnError", instance.Spec.DangerouslyRecreateClusterOnError)
err := k8sutils.ExecuteRedisClusterCommand(instance)
if err != nil && instance.Spec.DangerouslyRecreateClusterOnError {
reqLogger.Info("Adding Leaders failed. Executing fail-over")
err = k8sutils.ExecuteFailoverOperation(instance)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
return ctrl.Result{RequeueAfter: time.Second * 120}, nil
}
} else {
if followerReplicas > 0 {
reqLogger.Info("All leader are part of the cluster, adding follower/replicas", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
k8sutils.ExecuteRedisReplicationCommand(instance)
err := k8sutils.ExecuteRedisReplicationCommand(instance)
if err != nil && instance.Spec.DangerouslyRecreateClusterOnError {
reqLogger.Info("Adding Leaders failed. Executing fail-over")
err = k8sutils.ExecuteFailoverOperation(instance)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
return ctrl.Result{RequeueAfter: time.Second * 120}, nil
}
} else {
reqLogger.Info("no follower/replicas configured, skipping replication configuration", "Leaders.Count", leaderCount, "Leader.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
}
}
} else {
reqLogger.Info("Redis leader count is desired")
if k8sutils.CheckRedisClusterState(instance) >= int(totalReplicas)-1 {
failedNodesCount := k8sutils.CheckRedisClusterState(instance)
executeForceClusterReset := instance.Spec.DangerouslyRecreateClusterOnError && (failedNodesCount > 0)
reqLogger.Info("Dangerously Reset Cluster",
"DangerouslyRecreateClusterOnError", instance.Spec.DangerouslyRecreateClusterOnError,
"failedNodesCount", failedNodesCount)
// PROBLEM: why failed count number has to be so large to execute failover.
if failedNodesCount >= int(totalReplicas)-1 || executeForceClusterReset {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(instance)
if err != nil {
Expand All @@ -150,3 +191,28 @@ func (r *RedisClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&redisv1beta1.RedisCluster{}).
Complete(r)
}

func (r *RedisClusterReconciler) forceRecreateCluster(w http.ResponseWriter, req *http.Request) {
q := req.URL.Query()
ns := q.Get("ns")
name := q.Get("name")
instance := &redisv1beta1.RedisCluster{}
namespacedName := types.NamespacedName{
Name: name,
Namespace: ns,
}
err := r.Client.Get(context.TODO(), namespacedName, instance)
if err != nil {
fmt.Fprintf(w, "ERROR")
}
k8sutils.ExecuteFailoverOperation(instance)
fmt.Fprintf(w, "OK")
}

func (r *RedisClusterReconciler) SetupHttpCommandServer() {
http.HandleFunc("/force-recreate", r.forceRecreateCluster)
err := http.ListenAndServe(":8090", nil)
if err != nil {
os.Exit(1)
}
}
28 changes: 18 additions & 10 deletions k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func getRedisServerIP(redisInfo RedisDetails) string {
}

// ExecuteRedisClusterCommand will execute redis cluster creation command
func ExecuteRedisClusterCommand(cr *redisv1beta1.RedisCluster) {
func ExecuteRedisClusterCommand(cr *redisv1beta1.RedisCluster) error {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
replicas := cr.Spec.GetReplicaCounts("leader")
cmd := []string{"redis-cli", "--cluster", "create"}
Expand All @@ -68,7 +68,7 @@ func ExecuteRedisClusterCommand(cr *redisv1beta1.RedisCluster) {
}
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.ObjectMeta.Name+"-leader-0")...)
logger.Info("Redis cluster creation command is", "Command", cmd)
executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
return executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
}

func getRedisTLSArgs(tlsConfig *redisv1beta1.TLSConfig, clientHost string) []string {
Expand Down Expand Up @@ -105,7 +105,7 @@ func createRedisReplicationCommand(cr *redisv1beta1.RedisCluster, leaderPod Redi
}

// ExecuteRedisReplicationCommand will execute the replication command
func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) {
func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) error {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
replicas := cr.Spec.GetReplicaCounts("follower")
nodes := checkRedisCluster(cr)
Expand All @@ -122,17 +122,22 @@ func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) {
if !checkRedisNodePresence(cr, nodes, podIP) {
logger.Info("Adding node to cluster.", "Node.IP", podIP, "Follower.Pod", followerPod)
cmd := createRedisReplicationCommand(cr, leaderPod, followerPod)
executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
err := executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
if err != nil {
return err
}
} else {
logger.Info("Skipping Adding node to cluster, already present.", "Follower.Pod", followerPod)
}
}
return nil
}

// checkRedisCluster will check the redis cluster have sufficient nodes or not
func checkRedisCluster(cr *redisv1beta1.RedisCluster) [][]string {
var client *redis.Client
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
// PROBLEM: queries only one leader
client = configureRedisClient(cr, cr.ObjectMeta.Name+"-leader-0")
cmd := redis.NewStringCmd("cluster", "nodes")
err := client.Process(cmd)
Expand Down Expand Up @@ -184,6 +189,7 @@ func executeFailoverCommand(cr *redisv1beta1.RedisCluster, role string) error {
err := client.Process(cmd)
if err != nil {
logger.Error(err, "Redis command failed with this error")
// PROBLEM: Do we need a flush all
flushcommand := redis.NewStringCmd("flushall")
err := client.Process(flushcommand)
if err != nil {
Expand Down Expand Up @@ -235,7 +241,8 @@ func CheckRedisNodeCount(cr *redisv1beta1.RedisCluster, nodeType string) int32 {
return int32(count)
}

// CheckRedisClusterState will check the redis cluster state
// CheckRedisClusterState will return the count of failed or disconnected node
// PROBLEM: I think this never returned > 0 (at least based on logs)
func CheckRedisClusterState(cr *redisv1beta1.RedisCluster) int {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
clusterNodes := checkRedisCluster(cr)
Expand Down Expand Up @@ -282,7 +289,7 @@ func configureRedisClient(cr *redisv1beta1.RedisCluster, podName string) *redis.
}

// executeCommand will execute the commands in pod
func executeCommand(cr *redisv1beta1.RedisCluster, cmd []string, podName string) {
func executeCommand(cr *redisv1beta1.RedisCluster, cmd []string, podName string) error {
var (
execOut bytes.Buffer
execErr bytes.Buffer
Expand All @@ -291,12 +298,12 @@ func executeCommand(cr *redisv1beta1.RedisCluster, cmd []string, podName string)
config, err := generateK8sConfig()
if err != nil {
logger.Error(err, "Could not find pod to execute")
return
return err
}
targetContainer, pod := getContainerID(cr, podName)
if targetContainer < 0 {
logger.Error(err, "Could not find pod to execute")
return
return err
}

req := generateK8sClient().CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(cr.Namespace).SubResource("exec")
Expand All @@ -309,7 +316,7 @@ func executeCommand(cr *redisv1beta1.RedisCluster, cmd []string, podName string)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
logger.Error(err, "Failed to init executor")
return
return err
}

err = exec.Stream(remotecommand.StreamOptions{
Expand All @@ -319,9 +326,10 @@ func executeCommand(cr *redisv1beta1.RedisCluster, cmd []string, podName string)
})
if err != nil {
logger.Error(err, "Could not execute command", "Command", cmd, "Output", execOut.String(), "Error", execErr.String())
return
return err
}
logger.Info("Successfully executed the command", "Command", cmd, "Output", execOut.String())
return nil
}

// getContainerID will return the id of container from pod
Expand Down
12 changes: 12 additions & 0 deletions k8sutils/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,18 @@ func getVolumeMount(name string, persistenceEnabled *bool, externalConfig *strin

// getProbeInfo generate probe for Redis StatefulSet
func getProbeInfo(probe *redisv1beta1.Probe) *corev1.Probe {
if probe == nil {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{
"bash",
"/usr/bin/healthcheck.sh",
},
},
},
}
}
return &corev1.Probe{
InitialDelaySeconds: probe.InitialDelaySeconds,
PeriodSeconds: probe.PeriodSeconds,
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Redis")
os.Exit(1)
}
if err = (&controllers.RedisClusterReconciler{
redisClusterController := &controllers.RedisClusterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("RedisCluster"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
}
go redisClusterController.SetupHttpCommandServer()
if err = redisClusterController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedisCluster")
os.Exit(1)
}
Expand Down