Skip to content

Commit

Permalink
Gossip Router based Cross-Site Replication
Browse files Browse the repository at this point in the history
Using Gossip Router and TUNNEL transport for cross-site replication
between Infinispan clusters
  • Loading branch information
pruivo committed Sep 29, 2021
1 parent 6381d8b commit 64ae91c
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 11 deletions.
2 changes: 2 additions & 0 deletions api/v1/infinispan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ type InfinispanContainerSpec struct {
type InfinispanSitesLocalSpec struct {
Name string `json:"name"`
Expose CrossSiteExposeSpec `json:"expose"`
// +optional
MaxSiteMasters int32 `json:"maxSiteMasters,omitempty"`
}

type InfinispanSiteLocationSpec struct {
Expand Down
7 changes: 7 additions & 0 deletions api/v1/types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (

SiteServiceNameTemplate = "%v-site"
SiteServiceFQNTemplate = "%s.%s.svc.cluster.local"

GossipRouterDeploymentNameTemplate = "%s-tunnel"
)

type ExternalDependencyType string
Expand Down Expand Up @@ -619,3 +621,8 @@ func (ispn *Infinispan) IsServiceMonitorEnabled() bool {
}
return false
}

// GetGossipRouterDeploymentName returns the Gossip Router deployment name
func (ispn *Infinispan) GetGossipRouterDeploymentName() string {
return fmt.Sprintf(GossipRouterDeploymentNameTemplate, ispn.Name)
}
1 change: 1 addition & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/v2alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/infinispan.org_infinispans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ spec:
required:
- type
type: object
maxSiteMasters:
format: int32
type: integer
name:
type: string
required:
Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rules:
resources:
- deployments
verbs:
- create
- get
- list
- watch
Expand Down
39 changes: 38 additions & 1 deletion controllers/infinispan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *InfinispanReconciler) SetupWithManager(mgr ctrl.Manager) error {
// +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=create
// +kubebuilder:rbac:groups=core;events.k8s.io,resources=events,verbs=create;patch

// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;get;list;watch
// +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get
// +kubebuilder:rbac:groups=apps,resources=deployments/finalizers;statefulsets,verbs=get;list;watch;create;update;delete

Expand Down Expand Up @@ -290,6 +290,32 @@ func (reconciler *InfinispanReconciler) Reconcile(ctx context.Context, ctrlReque
}
}

if infinispan.HasSites() {
tunnel := &appsv1.Deployment{}
err = r.Client.Get(context.TODO(), types.NamespacedName{Namespace: infinispan.Namespace, Name: infinispan.GetGossipRouterDeploymentName()}, tunnel)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("Configuring the Cross-Site Deployment")
tunnel, err = r.createGossipRouterTunnel(infinispan)
if err != nil {
reqLogger.Error(err, "failed to configure new Deployment")
return reconcile.Result{}, err
}
reqLogger.Info("Creating a new Deployment", "Deployment.Name", tunnel.Name)
err = r.Client.Create(context.TODO(), tunnel)
if err != nil {
reqLogger.Error(err, "failed to create new Deployment", "Deployment.Name", tunnel.Name)
return reconcile.Result{}, err
}

// StatefulSet created successfully
reqLogger.Info("End of the Cross-Site tunneling Deployment creation")
}
if err != nil {
reqLogger.Error(err, "Failed to get Cross-Site Deployment")
return reconcile.Result{}, err
}
}

// Reconcile the StatefulSet
// Check if the StatefulSet already exists, if not create a new one
statefulSet := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -627,6 +653,17 @@ func (r *infinispanRequest) destroyResources() error {
return err
}

err = r.Client.Delete(r.ctx,
&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: infinispan.GetGossipRouterDeploymentName(),
Namespace: infinispan.Namespace,
},
})
if err != nil && !errors.IsNotFound(err) {
return err
}

err = r.Client.Delete(r.ctx,
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 1 addition & 3 deletions controllers/infinispan_service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -466,8 +465,7 @@ func computeServiceExternal(ispn *ispnv1.Infinispan) *corev1.Service {

// computeSiteService compute the XSite service
func computeSiteService(ispn *ispnv1.Infinispan) *corev1.Service {
lsPodSelector := PodLabels(ispn.Name)
lsPodSelector[consts.CoordinatorPodLabel] = strconv.FormatBool(true)
lsPodSelector := GossipRouterPodLabels(ispn.Name)

exposeSpec := corev1.ServiceSpec{}
exposeConf := ispn.Spec.Service.Sites.Local.Expose
Expand Down
5 changes: 5 additions & 0 deletions controllers/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ func BatchLabels(name string) map[string]string {
"app": "infinispan-batch-pod",
}
}

// GossipRouterPodLabels returns the labels to apply to GossipRouter pod
func GossipRouterPodLabels(name string) map[string]string {
return LabelsResource(name, "infinispan-router-pod")
}
53 changes: 53 additions & 0 deletions controllers/xsite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"strconv"
"strings"

infinispanv1 "github.com/infinispan/infinispan-operator/api/v1"
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
consts "github.com/infinispan/infinispan-operator/controllers/constants"
ispn "github.com/infinispan/infinispan-operator/pkg/infinispan"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func (r *infinispanRequest) applyLabelsToCoordinatorsPod(podList *corev1.PodList, siteLocations []string, cluster ispn.ClusterInterface) (*ispnv1.InfinispanCondition, error) {
Expand Down Expand Up @@ -58,3 +61,53 @@ func (r *infinispanRequest) applyLabelsToCoordinatorsPod(podList *corev1.PodList
}
return &ispnv1.InfinispanCondition{Type: ispnv1.ConditionCrossSiteViewFormed, Status: metav1.ConditionFalse, Message: "Coordinator not ready"}, nil
}

// returns the tunnel service
func (r *infinispanRequest) createGossipRouterTunnel(m *infinispanv1.Infinispan) (*appsv1.Deployment, error) {
name := m.GetGossipRouterDeploymentName()
lsTunnel := GossipRouterPodLabels(m.Name)

deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: m.ObjectMeta.Namespace,
Labels: lsTunnel,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: lsTunnel,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: m.ObjectMeta.Name,
Namespace: m.ObjectMeta.Namespace,
Labels: lsTunnel,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "gossiprouter",
Image: m.ImageName(),
Command: []string{"/opt/gossiprouter/bin/launch.sh"},
Args: []string{"-port", strconv.Itoa(consts.CrossSitePort)},
Ports: []corev1.ContainerPort{
{
ContainerPort: consts.CrossSitePort,
Name: "tunnel",
Protocol: corev1.ProtocolTCP,
},
},
}},
},
},
},
}
// Set Infinispan instance as the owner and controller
if err := controllerutil.SetControllerReference(m, deployment, r.scheme); err != nil {
return nil, err
}
return deployment, nil
}
13 changes: 10 additions & 3 deletions controllers/xsite_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@ func ComputeXSite(infinispan *ispnv1.Infinispan, kubernetes *kube.Kubernetes, se

logger.Info("local site service", "service name", siteServiceName, "host", localSiteHost, "port", localSitePort)

maxSiteMasters := infinispan.Spec.Service.Sites.Local.MaxSiteMasters
if maxSiteMasters <= 0 {
maxSiteMasters = 1
}

xsite := &config.XSite{
Address: localSiteHost,
Name: infinispan.Spec.Service.Sites.Local.Name,
Port: localSitePort,
Address: localSiteHost,
Name: infinispan.Spec.Service.Sites.Local.Name,
Port: localSitePort,
Transport: "tunnel",
MaxSiteMasters: maxSiteMasters,
}

for _, remoteLocation := range infinispan.GetRemoteSiteLocations() {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/integreatly/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/infinispan/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ type DNSPing struct {
}

type XSite struct {
Address string `yaml:"address"`
Name string `yaml:"name"`
Port int32 `yaml:"port"`
Backups []BackupSite `yaml:"backups"`
Address string `yaml:"address"`
Name string `yaml:"name"`
Port int32 `yaml:"port"`
Transport string `yaml:"transport"`
MaxSiteMasters int32 `yaml:"maxSiteMasters"`
Backups []BackupSite `yaml:"backups"`
}

type BackupSite struct {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/xsite/xsite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func crossSiteSpec(name string, replicas int32, primarySite, backupSite, siteNam
Type: exposeType,
Port: exposePort,
},
MaxSiteMasters: 2,
},
Locations: []ispnv1.InfinispanSiteLocationSpec{
{
Expand Down

0 comments on commit 64ae91c

Please sign in to comment.