Skip to content

Commit

Permalink
feat: Add support for logical replication slots with HA (cloudnative-…
Browse files Browse the repository at this point in the history
  • Loading branch information
sjuls committed May 3, 2023
1 parent b0d6966 commit c75cc45
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 26 deletions.
4 changes: 2 additions & 2 deletions config/manager/manager.yaml
Expand Up @@ -58,8 +58,8 @@ spec:
value: $(DEFAULT_MONITORING_CONFIGMAP)
resources:
limits:
cpu: 100m
memory: 200Mi
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 100Mi
Expand Down
6 changes: 3 additions & 3 deletions docs/src/replication.md
Expand Up @@ -212,16 +212,16 @@ primary and have lost their slot.

CloudNativePG fills this gap by introducing the concept of cluster-managed
replication slots, starting with high availability clusters. This feature
automatically manages physical replication slots for each hot standby replica
automatically manages replication slots for each hot standby replica
in the High Availability cluster, both in the primary and the standby.

In CloudNativePG, we use the terms:

- **Primary HA slot**: a physical replication slot whose lifecycle is entirely
- **Primary HA slot**: a replication slot whose lifecycle is entirely
managed by the current primary of the cluster and whose purpose is to map to
a specific standby in streaming replication. Such a slot lives on the primary
only.
- **Standby HA slot**: a physical replication slot for a standby whose
- **Standby HA slot**: a replication slot for a standby whose
lifecycle is entirely managed by another standby in the cluster, based on the
content of the `pg_replication_slots` view in the primary, and updated at regular
intervals using `pg_replication_slot_advance()`.
Expand Down
2 changes: 1 addition & 1 deletion hack/e2e/run-e2e.sh
Expand Up @@ -44,7 +44,7 @@ export E2E_PRE_ROLLING_UPDATE_IMG=${E2E_PRE_ROLLING_UPDATE_IMG:-${POSTGRES_IMG%.
export AZURE_STORAGE_ACCOUNT=${AZURE_STORAGE_ACCOUNT:-''}

# Getting the operator images need a pull secret
kubectl create namespace cnpg-system
kubectl create namespace cnpg-system --dry-run=client -o yaml | kubectl apply -f -
if [ -n "${DOCKER_SERVER-}" ] && [ -n "${DOCKER_USERNAME-}" ] && [ -n "${DOCKER_PASSWORD-}" ]; then
kubectl create secret docker-registry \
-n cnpg-system \
Expand Down
Expand Up @@ -27,10 +27,14 @@ import (
type Manager interface {
// List the available replication slots
List(ctx context.Context, config *apiv1.ReplicationSlotsConfiguration) (ReplicationSlotList, error)
// ListLogical lists the available logical replication slots
ListLogical(ctx context.Context, config *apiv1.ReplicationSlotsConfiguration) (ReplicationSlotList, error)
// Update the replication slot
Update(ctx context.Context, slot ReplicationSlot) error
// Create the replication slot
Create(ctx context.Context, slot ReplicationSlot) error
// Delete the replication slot
Delete(ctx context.Context, slot ReplicationSlot) error
// GetState returns the raw state of a replication slot
GetState(ctx context.Context, slot ReplicationSlot) ([]byte, error)
}
Expand Up @@ -19,6 +19,7 @@ package infrastructure
import (
"context"
"database/sql"
"errors"

v1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/log"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (sm PostgresManager) String() string {
return sm.pool.GetDsn("postgres")
}

// List the available replication slots
// List the available managed physical replication slots
func (sm PostgresManager) List(
ctx context.Context,
config *v1.ReplicationSlotsConfiguration,
Expand Down Expand Up @@ -92,6 +93,53 @@ func (sm PostgresManager) List(
return status, nil
}

// ListLogical lists the available logical replication slots
func (sm PostgresManager) ListLogical(
ctx context.Context,
config *v1.ReplicationSlotsConfiguration,
) (ReplicationSlotList, error) {
db, err := sm.pool.Connection("postgres")
if err != nil {
return ReplicationSlotList{}, err
}

rows, err := db.QueryContext(
ctx,
`SELECT slot_name, plugin, slot_type, active, coalesce(restart_lsn::TEXT, ''), two_phase AS restart_lsn FROM pg_replication_slots
WHERE NOT temporary AND slot_type = 'logical'`,
)
if err != nil {
return ReplicationSlotList{}, err
}
defer func() {
_ = rows.Close()
}()

var status ReplicationSlotList
for rows.Next() {
var slot ReplicationSlot
err := rows.Scan(
&slot.SlotName,
&slot.Plugin,
&slot.Type,
&slot.Active,
&slot.RestartLSN,
&slot.TwoPhase,
)
if err != nil {
return ReplicationSlotList{}, err
}

status.Items = append(status.Items, slot)
}

if rows.Err() != nil {
return ReplicationSlotList{}, rows.Err()
}

return status, nil
}

// Update the replication slot
func (sm PostgresManager) Update(ctx context.Context, slot ReplicationSlot) error {
contextLog := log.FromContext(ctx).WithName("updateSlot")
Expand All @@ -118,11 +166,40 @@ func (sm PostgresManager) Create(ctx context.Context, slot ReplicationSlot) erro
return err
}

_, err = db.ExecContext(ctx, "SELECT pg_create_physical_replication_slot($1, $2)",
slot.SlotName, slot.RestartLSN != "")
switch slot.Type {
case SlotTypePhysical:
_, err = db.ExecContext(ctx, "SELECT pg_create_physical_replication_slot($1, $2)",
slot.SlotName, slot.RestartLSN != "")
case SlotTypeLogical:
_, err = db.ExecContext(ctx, "SELECT pg_create_logical_replication_slot($1, $2, $3, $4)",
slot.SlotName, slot.Plugin, false, slot.TwoPhase)
default:
return errors.New("unsupported replication slot type")
}

return err
}

// GetState returns the state of the replication slot
func (sm PostgresManager) GetState(ctx context.Context, slot ReplicationSlot) ([]byte, error) {
contextLog := log.FromContext(ctx).WithName("createSlot")
contextLog.Trace("Invoked", "slot", slot)

db, err := sm.pool.Connection("postgres")
if err != nil {
return nil, err
}

var state []byte
err = db.QueryRowContext(
ctx,
`SELECT pg_catalog.pg_read_binary_file('pg_replslot/' || slot_name || '/state') FROM pg_catalog.pg_get_replication_slots() WHERE slot_name = $1`,
slot.SlotName,
).Scan(&state)

return state, err
}

// Delete the replication slot
func (sm PostgresManager) Delete(ctx context.Context, slot ReplicationSlot) error {
contextLog := log.FromContext(ctx).WithName("dropSlot")
Expand Down
Expand Up @@ -22,12 +22,17 @@ type SlotType string
// SlotTypePhysical represents the physical replication slot
const SlotTypePhysical SlotType = "physical"

// SlotTypeLogical represents the logical replication slot
const SlotTypeLogical SlotType = "logical"

// ReplicationSlot represents a single replication slot
type ReplicationSlot struct {
SlotName string `json:"slotName,omitempty"`
Plugin string `json:"plugin,omitempty"`
Type SlotType `json:"type,omitempty"`
Active bool `json:"active"`
RestartLSN string `json:"restartLSN,omitempty"`
TwoPhase bool `json:"twoPhase,omitempty"`
}

// ReplicationSlotList contains a list of replication slots
Expand Down
32 changes: 25 additions & 7 deletions internal/management/controller/slots/reconciler/replicationslot.go
Expand Up @@ -43,7 +43,7 @@ func ReconcileReplicationSlots(
// if the replication slots feature was deactivated, ensure any existing
// replication slots get cleaned up
if !cluster.Spec.ReplicationSlots.HighAvailability.GetEnabled() {
return dropReplicationSlots(ctx, manager, cluster)
return dropReplicationSlots(ctx, manager, cluster, instanceName)
}

if cluster.Status.CurrentPrimary == instanceName || cluster.Status.TargetPrimary == instanceName {
Expand Down Expand Up @@ -82,7 +82,7 @@ func reconcilePrimaryReplicationSlots(
}

// at this point, the cluster instance does not have a replication slot
if err := manager.Create(ctx, infrastructure.ReplicationSlot{SlotName: slotName}); err != nil {
if err := manager.Create(ctx, infrastructure.ReplicationSlot{SlotName: slotName, Type: infrastructure.SlotTypePhysical}); err != nil {
return reconcile.Result{}, fmt.Errorf("creating primary HA replication slots: %w", err)
}
}
Expand Down Expand Up @@ -117,11 +117,7 @@ func reconcilePrimaryReplicationSlots(
return reconcile.Result{}, nil
}

func dropReplicationSlots(
ctx context.Context,
manager infrastructure.Manager,
cluster *apiv1.Cluster,
) (reconcile.Result, error) {
func dropReplicationSlots(ctx context.Context, manager infrastructure.Manager, cluster *apiv1.Cluster, instanceName string) (reconcile.Result, error) {
contextLogger := log.FromContext(ctx)

// we fetch all replication slots
Expand All @@ -145,6 +141,28 @@ func dropReplicationSlots(
}
}

// clean up HA logical replication slots on standbys
if cluster.Status.CurrentPrimary != instanceName && cluster.Status.TargetPrimary != instanceName {
// we fetch all logical replication slots
logicalSlots, err := manager.ListLogical(ctx, cluster.Spec.ReplicationSlots)
if err != nil {
return reconcile.Result{}, err
}
for _, logicalSlot := range logicalSlots.Items {
if logicalSlot.Active {
contextLogger.Trace("Skipping deletion of logical replication slot because it is active",
"slot", logicalSlot)
needToReschedule = true
continue
}
contextLogger.Trace("Attempt to delete logical replication slot",
"slot", logicalSlot)
if err := manager.Delete(ctx, logicalSlot); err != nil {
return reconcile.Result{}, fmt.Errorf("while disabling standby HA logical replication slots: %w", err)
}
}
}

if needToReschedule {
return reconcile.Result{RequeueAfter: time.Second}, nil
}
Expand Down
Expand Up @@ -28,6 +28,8 @@ import (
. "github.com/onsi/gomega"
)

var _ infrastructure.Manager = (*fakeReplicationSlotManager)(nil)

type fakeSlot struct {
name string
active bool
Expand Down Expand Up @@ -69,6 +71,14 @@ func (fk fakeReplicationSlotManager) List(
return slotList, nil
}

func (fk fakeReplicationSlotManager) ListLogical(ctx context.Context, config *apiv1.ReplicationSlotsConfiguration) (infrastructure.ReplicationSlotList, error) {
return infrastructure.ReplicationSlotList{}, nil
}

func (fk fakeReplicationSlotManager) GetState(ctx context.Context, slot infrastructure.ReplicationSlot) ([]byte, error) {
return nil, nil
}

func makeClusterWithInstanceNames(instanceNames []string, primary string) apiv1.Cluster {
return apiv1.Cluster{
Spec: apiv1.ClusterSpec{
Expand Down

0 comments on commit c75cc45

Please sign in to comment.