Skip to content

Commit

Permalink
Gossip Router based Cross-Site Replication
Browse files Browse the repository at this point in the history
Using Gossip Router and TUNNEL transport for cross-site replication
between Infinispan clusters
  • Loading branch information
pruivo committed Oct 4, 2021
1 parent 6381d8b commit 2d8f235
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 34 deletions.
3 changes: 3 additions & 0 deletions api/v1/infinispan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type InfinispanContainerSpec struct {
type InfinispanSitesLocalSpec struct {
Name string `json:"name"`
Expose CrossSiteExposeSpec `json:"expose"`
// +optional
MaxRelayNodes int32 `json:"maxRelayNodes,omitempty"`
}

type InfinispanSiteLocationSpec struct {
Expand Down Expand Up @@ -335,6 +337,7 @@ const (
ConditionUpgrade ConditionType = "Upgrade"
ConditionWellFormed ConditionType = "WellFormed"
ConditionCrossSiteViewFormed ConditionType = "CrossSiteViewFormed"
ConditionGossipRouterReady ConditionType = "GossipRouterReady"
)

// InfinispanCondition define a condition of the cluster
Expand Down
7 changes: 7 additions & 0 deletions api/v1/types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (

SiteServiceNameTemplate = "%v-site"
SiteServiceFQNTemplate = "%s.%s.svc.cluster.local"

GossipRouterDeploymentNameTemplate = "%s-tunnel"
)

type ExternalDependencyType string
Expand Down Expand Up @@ -619,3 +621,8 @@ func (ispn *Infinispan) IsServiceMonitorEnabled() bool {
}
return false
}

// GetGossipRouterDeploymentName returns the Gossip Router deployment name
func (ispn *Infinispan) GetGossipRouterDeploymentName() string {
return fmt.Sprintf(GossipRouterDeploymentNameTemplate, ispn.Name)
}
1 change: 1 addition & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/v2alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/infinispan.org_infinispans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ spec:
required:
- type
type: object
maxRelayNodes:
format: int32
type: integer
name:
type: string
required:
Expand Down
3 changes: 3 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ rules:
resources:
- deployments
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- apps
Expand Down
2 changes: 0 additions & 2 deletions controllers/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (
SystemPodLabels = map[string]bool{
appsv1.StatefulSetPodNameLabel: true,
appsv1.StatefulSetRevisionLabel: true,
CoordinatorPodLabel: true,
}
)

Expand All @@ -57,7 +56,6 @@ const (
InfinispanUserPort = 11222
CrossSitePort = 7900
CrossSitePortName = "xsite"
CoordinatorPodLabel = "coordinator"
StaticCrossSiteUriSchema = "infinispan+xsite"
// DefaultCacheManagerName default cache manager name used for cross site
DefaultCacheManagerName = "default"
Expand Down
72 changes: 68 additions & 4 deletions controllers/infinispan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *InfinispanReconciler) SetupWithManager(mgr ctrl.Manager) error {

// TODO(user): Modify this to be the types you create that are owned by the primary resource
// Watch for changes to secondary resource Pods and requeue the owner Infinispan
secondaryResourceTypes := []client.Object{&appsv1.StatefulSet{}, &corev1.ConfigMap{}, &corev1.Secret{}}
secondaryResourceTypes := []client.Object{&appsv1.StatefulSet{}, &corev1.ConfigMap{}, &corev1.Secret{}, &appsv1.Deployment{}}
for _, secondaryResource := range secondaryResourceTypes {
builder.Owns(secondaryResource)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *InfinispanReconciler) SetupWithManager(mgr ctrl.Manager) error {
// +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=create
// +kubebuilder:rbac:groups=core;events.k8s.io,resources=events,verbs=create;patch

// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get
// +kubebuilder:rbac:groups=apps,resources=deployments/finalizers;statefulsets,verbs=get;list;watch;create;update;delete

Expand Down Expand Up @@ -290,6 +290,55 @@ func (reconciler *InfinispanReconciler) Reconcile(ctx context.Context, ctrlReque
}
}

if infinispan.HasSites() {
reqLogger.Info("Creating the Cross-Site Deployment (Gossip Router)")
tunnelDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: infinispan.Name,
Namespace: infinispan.Namespace,
},
}
result, err := controllerutil.CreateOrUpdate(r.ctx, r.Client, tunnelDeployment, func() error {
tunnel := r.GetGossipRouterDeployment(infinispan)
tunnelDeployment.Spec = tunnel.Spec
tunnelDeployment.Labels = tunnel.Labels
if tunnelDeployment.CreationTimestamp.IsZero() {
reqLogger.Info("Creating the Cross-Site Deployment (Gossip Router)")
return controllerutil.SetControllerReference(r.infinispan, tunnelDeployment, r.scheme)
}
return nil
})
if err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
} else {
reqLogger.Error(err, "Failed to configure Cross-Site Deployment")
return reconcile.Result{}, err
}
}
if result != controllerutil.OperationResultNone {
reqLogger.Info(fmt.Sprintf("Cross-site deployment %s", string(result)))
}

gossipRouterPods, err := GossipRouterPodList(infinispan, r.kubernetes, r.ctx)
if err != nil {
reqLogger.Error(err, "Failed to fetch Gossip Router pod")
return reconcile.Result{}, err
}
if !kube.AreAllPodsReady(gossipRouterPods) {
reqLogger.Info("Gossip Router pod is not ready")
return reconcile.Result{}, r.update(func() {
r.infinispan.SetCondition(infinispanv1.ConditionGossipRouterReady, metav1.ConditionFalse, "Gossip Router pod not ready")
})
}
if err = r.update(func() {
r.infinispan.SetCondition(infinispanv1.ConditionGossipRouterReady, metav1.ConditionTrue, "")
}); err != nil {
reqLogger.Error(err, "Failed to set Gossip Router pod condition")
return reconcile.Result{}, err
}
}

// Reconcile the StatefulSet
// Check if the StatefulSet already exists, if not create a new one
statefulSet := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -523,9 +572,8 @@ func (reconciler *InfinispanReconciler) Reconcile(ctx context.Context, ctrlReque
}
}

// If x-site enable configure the coordinator pods to be selected by the x-site service
if infinispan.HasSites() {
crossSiteViewCondition, err := r.applyLabelsToCoordinatorsPod(podList, infinispan.GetSiteLocationsName(), cluster)
crossSiteViewCondition, err := r.GetCrossSiteViewCondition(podList, infinispan.GetSiteLocationsName(), cluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -627,6 +675,17 @@ func (r *infinispanRequest) destroyResources() error {
return err
}

err = r.Client.Delete(r.ctx,
&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: infinispan.GetGossipRouterDeploymentName(),
Namespace: infinispan.Namespace,
},
})
if err != nil && !errors.IsNotFound(err) {
return err
}

err = r.Client.Delete(r.ctx,
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1485,3 +1544,8 @@ func (r *infinispanRequest) update(update UpdateFn, ignoreNotFound ...bool) erro
func (reconciler *InfinispanReconciler) isTypeSupported(kind string) bool {
return reconciler.supportedTypes[kind].GroupVersionSupported
}

func GossipRouterPodList(infinispan *infinispanv1.Infinispan, kube *kube.Kubernetes, ctx context.Context) (*corev1.PodList, error) {
podList := &corev1.PodList{}
return podList, kube.ResourcesList(infinispan.Namespace, GossipRouterPodLabels(infinispan.Name), podList, ctx)
}
4 changes: 1 addition & 3 deletions controllers/infinispan_service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -466,8 +465,7 @@ func computeServiceExternal(ispn *ispnv1.Infinispan) *corev1.Service {

// computeSiteService compute the XSite service
func computeSiteService(ispn *ispnv1.Infinispan) *corev1.Service {
lsPodSelector := PodLabels(ispn.Name)
lsPodSelector[consts.CoordinatorPodLabel] = strconv.FormatBool(true)
lsPodSelector := GossipRouterPodLabels(ispn.Name)

exposeSpec := corev1.ServiceSpec{}
exposeConf := ispn.Spec.Service.Sites.Local.Expose
Expand Down
5 changes: 5 additions & 0 deletions controllers/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ func BatchLabels(name string) map[string]string {
"app": "infinispan-batch-pod",
}
}

// GossipRouterPodLabels returns the labels to apply to GossipRouter pod
func GossipRouterPodLabels(name string) map[string]string {
return LabelsResource(name, "infinispan-router-pod")
}
27 changes: 27 additions & 0 deletions controllers/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ func probe(failureThreshold, initialDelay, period, successThreshold, timeout int
}
}

func GossipRouterLivenessProbe() *corev1.Probe {
return TcpProbe(consts.CrossSitePort, 5, 5, 10, 1, 60)
}

func GossipRouterReadinessProbe() *corev1.Probe {
return TcpProbe(consts.CrossSitePort, 5, 5, 10, 1, 60)
}

func GossipRouterStartupProbe() *corev1.Probe {
return TcpProbe(consts.CrossSitePort, 5, 5, 10, 1, 60)
}

func TcpProbe(port, failureThreshold, initialDelay, period, successThreshold, timeout int32) *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.IntOrString{IntVal: port},
},
},
FailureThreshold: failureThreshold,
InitialDelaySeconds: initialDelay,
PeriodSeconds: period,
SuccessThreshold: successThreshold,
TimeoutSeconds: timeout,
}
}

func PodResources(spec infinispanv1.InfinispanContainerSpec) (*corev1.ResourceRequirements, error) {
memory, err := resource.ParseQuantity(spec.Memory)
if err != nil {
Expand Down
77 changes: 59 additions & 18 deletions controllers/xsite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,21 @@ import (
"strconv"
"strings"

infinispanv1 "github.com/infinispan/infinispan-operator/api/v1"
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
consts "github.com/infinispan/infinispan-operator/controllers/constants"
ispn "github.com/infinispan/infinispan-operator/pkg/infinispan"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

func (r *infinispanRequest) applyLabelsToCoordinatorsPod(podList *corev1.PodList, siteLocations []string, cluster ispn.ClusterInterface) (*ispnv1.InfinispanCondition, error) {
func (r *infinispanRequest) GetCrossSiteViewCondition(podList *corev1.PodList, siteLocations []string, cluster ispn.ClusterInterface) (*ispnv1.InfinispanCondition, error) {
for _, item := range podList.Items {
cacheManagerInfo, err := cluster.GetCacheManagerInfo(consts.DefaultCacheManagerName, item.Name)
if err == nil {
lab, ok := item.Labels[consts.CoordinatorPodLabel]
if cacheManagerInfo.Coordinator {
if !ok || lab != strconv.FormatBool(cacheManagerInfo.Coordinator) {
item.Labels[consts.CoordinatorPodLabel] = strconv.FormatBool(cacheManagerInfo.Coordinator)
if err = r.Client.Update(r.ctx, &item); err != nil {
return nil, err
}
}
// Perform cross-site view validation
crossSiteViewFormed := &ispnv1.InfinispanCondition{Type: ispnv1.ConditionCrossSiteViewFormed, Status: metav1.ConditionTrue}
sitesView, err := cacheManagerInfo.GetSitesView()
Expand All @@ -43,18 +39,63 @@ func (r *infinispanRequest) applyLabelsToCoordinatorsPod(podList *corev1.PodList
crossSiteViewFormed.Message = fmt.Sprintf("Error: %s", err.Error())
}
return crossSiteViewFormed, nil
} else {
if ok && lab == strconv.FormatBool(ok) {
// If present leave the label but false the value
if ok {
item.Labels[consts.CoordinatorPodLabel] = strconv.FormatBool(cacheManagerInfo.Coordinator)
if err = r.Client.Update(r.ctx, &item); err != nil {
return nil, err
}
}
}
}
}
}
return &ispnv1.InfinispanCondition{Type: ispnv1.ConditionCrossSiteViewFormed, Status: metav1.ConditionFalse, Message: "Coordinator not ready"}, nil
}

// GetGossipRouterDeployment returns the deployment for the Gossip Router pod
func (r *infinispanRequest) GetGossipRouterDeployment(m *infinispanv1.Infinispan) *appsv1.Deployment {
lsTunnel := GossipRouterPodLabels(m.Name)
replicas := int32(1)

// if the user configures 0 replicas, shutdown the gossip router pod too.
if m.Spec.Replicas <= 0 {
replicas = 0
}

deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: m.GetGossipRouterDeploymentName(),
Namespace: m.Namespace,
Labels: lsTunnel,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: lsTunnel,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: m.ObjectMeta.Name,
Namespace: m.ObjectMeta.Namespace,
Labels: lsTunnel,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "gossiprouter",
Image: m.ImageName(),
Command: []string{"/opt/gossiprouter/bin/launch.sh"},
Args: []string{"-port", strconv.Itoa(consts.CrossSitePort), "-dump_msgs", "registration"},
Ports: []corev1.ContainerPort{
{
ContainerPort: consts.CrossSitePort,
Name: "tunnel",
Protocol: corev1.ProtocolTCP,
},
},
LivenessProbe: GossipRouterLivenessProbe(),
ReadinessProbe: GossipRouterLivenessProbe(),
StartupProbe: GossipRouterStartupProbe(),
}},
},
},
Replicas: pointer.Int32Ptr(replicas),
},
}
return deployment
}
13 changes: 10 additions & 3 deletions controllers/xsite_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@ func ComputeXSite(infinispan *ispnv1.Infinispan, kubernetes *kube.Kubernetes, se

logger.Info("local site service", "service name", siteServiceName, "host", localSiteHost, "port", localSitePort)

maxRelayNodes := infinispan.Spec.Service.Sites.Local.MaxRelayNodes
if maxRelayNodes <= 0 {
maxRelayNodes = 1
}

xsite := &config.XSite{
Address: localSiteHost,
Name: infinispan.Spec.Service.Sites.Local.Name,
Port: localSitePort,
Address: localSiteHost,
Name: infinispan.Spec.Service.Sites.Local.Name,
Port: localSitePort,
Transport: "tunnel",
MaxSiteMasters: maxRelayNodes,
}

for _, remoteLocation := range infinispan.GetRemoteSiteLocations() {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/integreatly/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2d8f235

Please sign in to comment.