diff --git a/e2e/backup_restore_test.go b/e2e/backup_restore_test.go index accd8d38..0f3de4ce 100644 --- a/e2e/backup_restore_test.go +++ b/e2e/backup_restore_test.go @@ -526,8 +526,11 @@ func TestS3CreateDBFromBackup(t *testing.T) { PatroniPort: pointerTo(0), Nodes: []*controlplane.DatabaseNodeSpec{ { - Name: "n1", - HostIds: []controlplane.Identifier{controlplane.Identifier(host2)}, + Name: "n1", + HostIds: []controlplane.Identifier{ + controlplane.Identifier(host1), + controlplane.Identifier(host2), + }, }, }, RestoreConfig: &controlplane.RestoreConfigSpec{ diff --git a/server/internal/database/instance_resource.go b/server/internal/database/instance_resource.go index c9eb6eff..daf93b95 100644 --- a/server/internal/database/instance_resource.go +++ b/server/internal/database/instance_resource.go @@ -12,6 +12,7 @@ import ( "github.com/samber/do" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/ds" "github.com/pgEdge/control-plane/server/internal/patroni" "github.com/pgEdge/control-plane/server/internal/postgres" "github.com/pgEdge/control-plane/server/internal/resource" @@ -153,6 +154,32 @@ func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, return conn, nil } +func (r *InstanceResource) InstanceID() string { + return r.Spec.InstanceID +} + +func (r *InstanceResource) PostgresVersion() (*ds.Version, error) { + if r.Spec.PgEdgeVersion == nil { + return nil, errors.New("instance spec is missing a pgedge version") + } + if r.Spec.PgEdgeVersion.PostgresVersion == nil { + return nil, errors.New("instance spec is missing a postgres version") + } + return r.Spec.PgEdgeVersion.PostgresVersion, nil +} + +func (r *InstanceResource) Paths(orchestrator Orchestrator) (InstancePaths, error) { + postgresVersion, err := r.PostgresVersion() + if err != nil { + return InstancePaths{}, err + } + paths, err := orchestrator.InstancePaths(postgresVersion, r.InstanceID()) + if err != nil { + return InstancePaths{}, fmt.Errorf("failed to compute instance paths: %w", err) + } + return paths, nil +} + func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { if err := r.updateConnectionInfo(ctx, rc); err != nil { return err diff --git a/server/internal/database/operations/add_nodes.go b/server/internal/database/operations/add_nodes.go index fb95bea5..5d540520 100644 --- a/server/internal/database/operations/add_nodes.go +++ b/server/internal/database/operations/add_nodes.go @@ -18,7 +18,7 @@ func AddNode(node *NodeResources) ([]*resource.State, error) { states := make([]*resource.State, 0, 2) - primary, err := instanceState(node.InstanceResources[0]) + primary, err := node.InstanceResources[0].InstanceState() if err != nil { return nil, err } @@ -26,7 +26,7 @@ func AddNode(node *NodeResources) ([]*resource.State, error) { var replicas *resource.State for _, inst := range node.InstanceResources[1:] { - replica, err := instanceState(inst) + replica, err := inst.InstanceState() if err != nil { return nil, fmt.Errorf("failed to compute replica instance resource state: %w", err) } diff --git a/server/internal/database/operations/add_nodes_test.go b/server/internal/database/operations/add_nodes_test.go index 3a4f9d0f..908705e0 100644 --- a/server/internal/database/operations/add_nodes_test.go +++ b/server/internal/database/operations/add_nodes_test.go @@ -15,6 +15,16 @@ func TestAddNode(t *testing.T) { instance1 := makeInstance(t, "n1", 1) instance2 := makeInstance(t, "n1", 2) instance3 := makeInstance(t, "n1", 3) + // This is similar to how the PgBackRestStanza resource is returned with the + // instance resources, but it's not a dependency of the instance and it + // depends on the node resource. + n1NodeDependent := makeNodeDependentResource(t, "n1", 1) + instance1OrchestratorResource := makeOrchestratorResource(t, "n1", 1, 1) + instance1WithNodeDependent := makeInstance(t, "n1", 1, instance1OrchestratorResource) + instance1WithNodeDependent.AddNodeDependents(n1NodeDependent) + instance2OrchestratorResource := makeOrchestratorResource(t, "n1", 2, 1) + instance2WithNodeDependent := makeInstance(t, "n1", 2, instance2OrchestratorResource) + instance2WithNodeDependent.AddNodeDependents(n1NodeDependent) for _, tc := range []struct { name string @@ -40,7 +50,7 @@ func TestAddNode(t *testing.T) { InstanceIDs: []string{instance1.InstanceID()}, }, }, - instance1.Resources, + instance1.InstanceDependencies, ), }, }, @@ -62,7 +72,7 @@ func TestAddNode(t *testing.T) { []resource.Resource{ instance1.Instance, }, - instance1.Resources, + instance1.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -75,7 +85,7 @@ func TestAddNode(t *testing.T) { }, }, }, - instance2.Resources, + instance2.InstanceDependencies, ), }, }, @@ -98,7 +108,7 @@ func TestAddNode(t *testing.T) { []resource.Resource{ instance1.Instance, }, - instance1.Resources, + instance1.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -114,8 +124,8 @@ func TestAddNode(t *testing.T) { }, }, slices.Concat( - instance2.Resources, - instance3.Resources, + instance2.InstanceDependencies, + instance3.InstanceDependencies, ), ), }, @@ -125,6 +135,41 @@ func TestAddNode(t *testing.T) { input: &operations.NodeResources{NodeName: "n1"}, expectedErr: "got empty instances for node n1", }, + { + name: "two instances with node dependent resource", + input: &operations.NodeResources{ + DatabaseName: "test", + NodeName: "n1", + InstanceResources: []*database.InstanceResources{ + instance1WithNodeDependent, + instance2WithNodeDependent, + }, + }, + expected: []*resource.State{ + makeState(t, + []resource.Resource{ + instance1WithNodeDependent.Instance, + instance1OrchestratorResource, + }, + nil, + ), + makeState(t, + []resource.Resource{ + instance2WithNodeDependent.Instance, + instance2OrchestratorResource, + &database.NodeResource{ + Name: "n1", + InstanceIDs: []string{ + instance1WithNodeDependent.InstanceID(), + instance2WithNodeDependent.InstanceID(), + }, + }, + n1NodeDependent, + }, + nil, + ), + }, + }, } { t.Run(tc.name, func(t *testing.T) { out, err := operations.AddNode(tc.input) @@ -170,7 +215,7 @@ func TestAddNodes(t *testing.T) { InstanceIDs: []string{n1Instance1.InstanceID()}, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), }, }, @@ -204,8 +249,8 @@ func TestAddNodes(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), }, @@ -240,8 +285,8 @@ func TestAddNodes(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), makeState(t, @@ -256,7 +301,7 @@ func TestAddNodes(t *testing.T) { }, }, slices.Concat( - n1Instance2.Resources, + n1Instance2.InstanceDependencies, ), ), }, @@ -290,8 +335,8 @@ func TestAddNodes(t *testing.T) { n2Instance1.Instance, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), makeState(t, @@ -314,8 +359,8 @@ func TestAddNodes(t *testing.T) { }, }, slices.Concat( - n1Instance2.Resources, - n2Instance2.Resources, + n1Instance2.InstanceDependencies, + n2Instance2.InstanceDependencies, ), ), }, diff --git a/server/internal/database/operations/common.go b/server/internal/database/operations/common.go index 5c277c00..82e47a1f 100644 --- a/server/internal/database/operations/common.go +++ b/server/internal/database/operations/common.go @@ -35,6 +35,7 @@ func (n *NodeResources) nodeResourceState() (*resource.State, error) { state := resource.NewState() for _, instance := range n.InstanceResources { instanceIDs = append(instanceIDs, instance.InstanceID()) + state.Add(instance.NodeDependents...) } err := state.AddResource(&database.NodeResource{ @@ -91,14 +92,6 @@ func (n *NodeResources) databaseResourceState() (*resource.State, error) { return state, nil } -func instanceState(inst *database.InstanceResources) (*resource.State, error) { - state, err := inst.State() - if err != nil { - return nil, fmt.Errorf("failed to compute updated instance state: %w", err) - } - return state, nil -} - func mergePartialStates(in [][]*resource.State) []*resource.State { var out []*resource.State diff --git a/server/internal/database/operations/end.go b/server/internal/database/operations/end.go index 3bc76f37..8ced1f80 100644 --- a/server/internal/database/operations/end.go +++ b/server/internal/database/operations/end.go @@ -16,7 +16,7 @@ func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.S var resources []resource.Resource for _, inst := range node.InstanceResources { - state, err := instanceState(inst) + state, err := inst.InstanceState() if err != nil { return nil, err } diff --git a/server/internal/database/operations/helpers_test.go b/server/internal/database/operations/helpers_test.go index 070821f8..54d68217 100644 --- a/server/internal/database/operations/helpers_test.go +++ b/server/internal/database/operations/helpers_test.go @@ -78,6 +78,7 @@ func makeInstance(t testing.TB, node string, num int, dependencies ...resource.R }, dependencies, nil, + nil, ) if err != nil { t.Fatal(err) @@ -304,3 +305,32 @@ func makeServiceResources(t testing.TB, databaseID, serviceID, hostID string, no MonitorResource: monitorResource, } } + +var _ resource.Resource = (*nodeDependentResource)(nil) + +func makeNodeDependentResource(t testing.TB, node string, depNum int) *nodeDependentResource { + t.Helper() + + return &nodeDependentResource{ + orchestratorResource: orchestratorResource{ + ID: fmt.Sprintf("%s-node-dependent-%d-id", node, depNum), + }, + node: node, + } +} + +type nodeDependentResource struct { + orchestratorResource + node string +} + +func (r *nodeDependentResource) Identifier() resource.Identifier { + return resource.Identifier{ + ID: r.ID, + Type: "orchestrator.node_dependent_resource", + } +} + +func (r *nodeDependentResource) Dependencies() []resource.Identifier { + return []resource.Identifier{database.NodeResourceIdentifier(r.node)} +} diff --git a/server/internal/database/operations/restore_database.go b/server/internal/database/operations/restore_database.go index 4f6cad26..f01e578e 100644 --- a/server/internal/database/operations/restore_database.go +++ b/server/internal/database/operations/restore_database.go @@ -67,25 +67,25 @@ func RestoreNode(node *NodeRestoreResources) ([]*resource.State, error) { // The pre-restore state only contains the orchestrator resources. preRestoreState := resource.NewState() - preRestoreState.Add(node.PrimaryInstance.Resources...) + preRestoreState.Add(node.PrimaryInstance.InstanceDependencies...) states = append(states, preRestoreState) // The restore state has the restore resources, the instance and the // instance monitor. - restoreState, err := instanceState(node.RestoreInstance) + restoreState, err := node.RestoreInstance.InstanceState() if err != nil { return nil, fmt.Errorf("failed to compute state for restore resources: %w", err) } states = append(states, restoreState) - postRestore, err := instanceState(node.PrimaryInstance) + postRestore, err := node.PrimaryInstance.InstanceState() if err != nil { return nil, fmt.Errorf("failed to compute post-restore state for primary instance: %w", err) } states = append(states, postRestore) for _, inst := range node.ReplicaInstances { - replica, err := instanceState(inst) + replica, err := inst.InstanceState() if err != nil { return nil, fmt.Errorf("failed to compute post-restore state for replica instance: %w", err) } diff --git a/server/internal/database/operations/restore_database_test.go b/server/internal/database/operations/restore_database_test.go index cecea7b6..15ae5424 100644 --- a/server/internal/database/operations/restore_database_test.go +++ b/server/internal/database/operations/restore_database_test.go @@ -39,7 +39,7 @@ func TestRestoreDatabase(t *testing.T) { DatabaseName: "test", }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ) twoNodeState := makeState(t, []resource.Resource{ @@ -87,8 +87,8 @@ func TestRestoreDatabase(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ) twoNodeStateWithReplica := makeState(t, @@ -147,9 +147,9 @@ func TestRestoreDatabase(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n1Instance2.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n1Instance2.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ) diff --git a/server/internal/database/operations/update_database_test.go b/server/internal/database/operations/update_database_test.go index 93e1dc44..c1ab528c 100644 --- a/server/internal/database/operations/update_database_test.go +++ b/server/internal/database/operations/update_database_test.go @@ -37,7 +37,7 @@ func TestUpdateDatabase(t *testing.T) { DatabaseName: "test", }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ) twoNodeState := makeState(t, []resource.Resource{ @@ -85,8 +85,8 @@ func TestUpdateDatabase(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ) @@ -187,9 +187,9 @@ func TestUpdateDatabase(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, - n3Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, + n3Instance1.InstanceDependencies, ), ) @@ -211,7 +211,7 @@ func TestUpdateDatabase(t *testing.T) { svcRes.MonitorResource, }, slices.Concat( - n1Instance1.Resources, + n1Instance1.InstanceDependencies, svcRes.Resources, ), ) diff --git a/server/internal/database/operations/update_nodes.go b/server/internal/database/operations/update_nodes.go index 4afede45..03328267 100644 --- a/server/internal/database/operations/update_nodes.go +++ b/server/internal/database/operations/update_nodes.go @@ -21,7 +21,7 @@ func UpdateNode(node *NodeResources) ([]*resource.State, error) { for _, inst := range node.InstanceResources { instanceID := inst.InstanceID() - state, err := instanceState(inst) + state, err := inst.InstanceState() if err != nil { return nil, err } diff --git a/server/internal/database/operations/update_nodes_test.go b/server/internal/database/operations/update_nodes_test.go index e8f5864f..84109f0d 100644 --- a/server/internal/database/operations/update_nodes_test.go +++ b/server/internal/database/operations/update_nodes_test.go @@ -42,7 +42,7 @@ func TestUpdateNode(t *testing.T) { InstanceIDs: []string{instance1.InstanceID()}, }, }, - instance1.Resources, + instance1.InstanceDependencies, ), }, }, @@ -65,7 +65,7 @@ func TestUpdateNode(t *testing.T) { []resource.Resource{ instance2.Instance, }, - instance2.Resources, + instance2.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -83,7 +83,7 @@ func TestUpdateNode(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - instance1.Resources, + instance1.InstanceDependencies, ), }, }, @@ -106,13 +106,13 @@ func TestUpdateNode(t *testing.T) { []resource.Resource{ instance2.Instance, }, - instance2.Resources, + instance2.InstanceDependencies, ), makeState(t, []resource.Resource{ instance3.Instance, }, - instance3.Resources, + instance3.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -131,7 +131,7 @@ func TestUpdateNode(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - instance1.Resources, + instance1.InstanceDependencies, ), }, }, @@ -195,7 +195,7 @@ func TestRollingUpdateNodes(t *testing.T) { InstanceIDs: []string{n1Instance1.InstanceID()}, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), }, }, @@ -225,7 +225,7 @@ func TestRollingUpdateNodes(t *testing.T) { InstanceIDs: []string{n1Instance1.InstanceID()}, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -235,7 +235,7 @@ func TestRollingUpdateNodes(t *testing.T) { InstanceIDs: []string{n2Instance1.InstanceID()}, }, }, - n2Instance1.Resources, + n2Instance1.InstanceDependencies, ), }, }, @@ -266,7 +266,7 @@ func TestRollingUpdateNodes(t *testing.T) { []resource.Resource{ n1Instance2.Instance, }, - n1Instance2.Resources, + n1Instance2.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -284,7 +284,7 @@ func TestRollingUpdateNodes(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -294,7 +294,7 @@ func TestRollingUpdateNodes(t *testing.T) { InstanceIDs: []string{n2Instance1.InstanceID()}, }, }, - n2Instance1.Resources, + n2Instance1.InstanceDependencies, ), }, }, @@ -327,7 +327,7 @@ func TestRollingUpdateNodes(t *testing.T) { []resource.Resource{ n1Instance2.Instance, }, - n1Instance2.Resources, + n1Instance2.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -345,13 +345,13 @@ func TestRollingUpdateNodes(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), makeState(t, []resource.Resource{ n2Instance2.Instance, }, - n2Instance2.Resources, + n2Instance2.InstanceDependencies, ), makeState(t, []resource.Resource{ @@ -369,7 +369,7 @@ func TestRollingUpdateNodes(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - n2Instance1.Resources, + n2Instance1.InstanceDependencies, ), }, }, @@ -419,7 +419,7 @@ func TestConcurrentUpdateNodes(t *testing.T) { InstanceIDs: []string{n1Instance1.InstanceID()}, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), }, }, @@ -455,8 +455,8 @@ func TestConcurrentUpdateNodes(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), }, @@ -494,8 +494,8 @@ func TestConcurrentUpdateNodes(t *testing.T) { }, }, slices.Concat( - n1Instance2.Resources, - n2Instance1.Resources, + n1Instance2.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), makeState(t, @@ -514,7 +514,7 @@ func TestConcurrentUpdateNodes(t *testing.T) { TargetRole: patroni.InstanceRolePrimary, }, }, - n1Instance1.Resources, + n1Instance1.InstanceDependencies, ), }, }, @@ -549,8 +549,8 @@ func TestConcurrentUpdateNodes(t *testing.T) { n2Instance2.Instance, }, slices.Concat( - n1Instance2.Resources, - n2Instance2.Resources, + n1Instance2.InstanceDependencies, + n2Instance2.InstanceDependencies, ), ), makeState(t, @@ -583,8 +583,8 @@ func TestConcurrentUpdateNodes(t *testing.T) { }, }, slices.Concat( - n1Instance1.Resources, - n2Instance1.Resources, + n1Instance1.InstanceDependencies, + n2Instance1.InstanceDependencies, ), ), }, diff --git a/server/internal/database/orchestrator.go b/server/internal/database/orchestrator.go index e2cc087d..8f42a569 100644 --- a/server/internal/database/orchestrator.go +++ b/server/internal/database/orchestrator.go @@ -24,24 +24,29 @@ const ResourceTypeServiceInstance = "swarm.service_instance" type InstanceResources struct { Instance *InstanceResource - Resources []*resource.ResourceData + InstanceDependencies []*resource.ResourceData DatabaseDependencies []*resource.ResourceData + NodeDependents []*resource.ResourceData } func NewInstanceResources( instance *InstanceResource, - resources []resource.Resource, + instanceDependencies []resource.Resource, databaseDependencies []resource.Resource, + nodeDependents []resource.Resource, ) (*InstanceResources, error) { inst := &InstanceResources{ Instance: instance, } - if err := inst.AddResources(resources...); err != nil { + if err := inst.AddInstanceDependencies(instanceDependencies...); err != nil { return nil, err } if err := inst.AddDatabaseDependencies(databaseDependencies...); err != nil { return nil, err } + if err := inst.AddNodeDependents(nodeDependents...); err != nil { + return nil, err + } return inst, nil } @@ -55,12 +60,12 @@ func (r *InstanceResources) DatabaseDependencyIdentifiers() []resource.Identifie return ids } -func (r *InstanceResources) AddResources(resources ...resource.Resource) error { +func (r *InstanceResources) AddInstanceDependencies(resources ...resource.Resource) error { resourceDataSlice, err := resource.ToResourceDataSlice(resources...) if err != nil { return fmt.Errorf("failed to convert instance resources: %w", err) } - r.Resources = append(r.Resources, resourceDataSlice...) + r.InstanceDependencies = append(r.InstanceDependencies, resourceDataSlice...) return nil } @@ -75,6 +80,16 @@ func (r *InstanceResources) AddDatabaseDependencies(resources ...resource.Resour return nil } +func (r *InstanceResources) AddNodeDependents(resources ...resource.Resource) error { + nodeDataSlice, err := resource.ToResourceDataSlice(resources...) + if err != nil { + return fmt.Errorf("failed to convert node dependent resources: %w", err) + } + r.NodeDependents = append(r.NodeDependents, nodeDataSlice...) + + return nil +} + func (r *InstanceResources) InstanceID() string { return r.Instance.Spec.InstanceID } @@ -95,9 +110,9 @@ func (r *InstanceResources) NodeName() string { return r.Instance.Spec.NodeName } -func (r *InstanceResources) State() (*resource.State, error) { +func (r *InstanceResources) InstanceState() (*resource.State, error) { state := resource.NewState() - state.Add(r.Resources...) + state.Add(r.InstanceDependencies...) if err := state.AddResource(r.Instance); err != nil { return nil, fmt.Errorf("failed to add instance to state: %w", err) @@ -165,4 +180,5 @@ type Orchestrator interface { StopInstance(ctx context.Context, instanceID string) error StartInstance(ctx context.Context, instanceID string) error NodeDSN(ctx context.Context, rc *resource.Context, nodeName string, fromInstanceID string, dbName string) (*postgres.DSN, error) + InstancePaths(pgVersion *ds.Version, instanceID string) (InstancePaths, error) } diff --git a/server/internal/orchestrator/common/paths.go b/server/internal/database/paths.go similarity index 69% rename from server/internal/orchestrator/common/paths.go rename to server/internal/database/paths.go index 7a1ee94f..76133060 100644 --- a/server/internal/orchestrator/common/paths.go +++ b/server/internal/database/paths.go @@ -1,4 +1,4 @@ -package common +package database import ( "fmt" @@ -9,6 +9,22 @@ import ( "github.com/pgEdge/control-plane/server/internal/pgbackrest" ) +const ( + EtcdCaCertName = "ca.crt" + EtcdClientCertName = "client.crt" + EtcdClientKeyName = "client.key" +) + +const ( + PostgresCaCertName = "ca.crt" + PostgresServerCertName = "server.crt" + PostgresServerKeyName = "server.key" + PostgresSuperuserCertName = "superuser.crt" + PostgresSuperuserKeyName = "superuser.key" + PostgresReplicatorCertName = "replication.crt" + PostgresReplicatorKeyName = "replication.key" +) + type InstancePaths struct { Instance Paths `json:"instance"` Host Paths `json:"host"` @@ -27,7 +43,7 @@ func (p *InstancePaths) InstanceMvRestoreToDataCmd() []string { func (p *InstancePaths) PgBackRestBackupCmd(command string, args ...string) pgbackrest.Cmd { return pgbackrest.Cmd{ PgBackrestCmd: p.PgBackRestPath, - Config: p.Instance.PgBackRestConfig(PgBackRestConfigTypeBackup), + Config: p.Instance.PgBackRestConfig(pgbackrest.ConfigTypeBackup), Stanza: "db", Command: command, Args: args, @@ -54,8 +70,8 @@ func (p *InstancePaths) PgBackRestRestoreCmd(command string, args ...string) pgb if arg == "--type" && i+1 < len(args) { restoreType = args[i+1] i++ // skip the next arg since it's the value of --type - } else if strings.HasPrefix(arg, "--type=") { - restoreType = strings.TrimPrefix(arg, "--type=") + } else if after, ok := strings.CutPrefix(arg, "--type="); ok { + restoreType = after } else { continue } @@ -69,7 +85,7 @@ func (p *InstancePaths) PgBackRestRestoreCmd(command string, args ...string) pgb return pgbackrest.Cmd{ PgBackrestCmd: p.PgBackRestPath, - Config: p.Instance.PgBackRestConfig(PgBackRestConfigTypeRestore), + Config: p.Instance.PgBackRestConfig(pgbackrest.ConfigTypeRestore), Stanza: "db", Command: command, Args: args, @@ -104,7 +120,7 @@ func (p *Paths) PatroniConfig() string { return filepath.Join(p.Configs(), "patroni.yaml") } -func (p *Paths) PgBackRestConfig(confType PgBackRestConfigType) string { +func (p *Paths) PgBackRestConfig(confType pgbackrest.ConfigType) string { return filepath.Join(p.Configs(), fmt.Sprintf("pgbackrest.%s.conf", confType)) } @@ -113,15 +129,15 @@ func (p *Paths) EtcdCertificates() string { } func (p *Paths) EtcdCaCert() string { - return filepath.Join(p.EtcdCertificates(), etcdCaCertName) + return filepath.Join(p.EtcdCertificates(), EtcdCaCertName) } func (p *Paths) EtcdClientCert() string { - return filepath.Join(p.EtcdCertificates(), etcdClientCertName) + return filepath.Join(p.EtcdCertificates(), EtcdClientCertName) } func (p *Paths) EtcdClientKey() string { - return filepath.Join(p.EtcdCertificates(), etcdClientKeyName) + return filepath.Join(p.EtcdCertificates(), EtcdClientKeyName) } func (p *Paths) PostgresCertificates() string { @@ -129,29 +145,29 @@ func (p *Paths) PostgresCertificates() string { } func (p *Paths) PostgresCaCert() string { - return filepath.Join(p.PostgresCertificates(), postgresCaCertName) + return filepath.Join(p.PostgresCertificates(), PostgresCaCertName) } func (p *Paths) PostgresServerCert() string { - return filepath.Join(p.PostgresCertificates(), postgresServerCertName) + return filepath.Join(p.PostgresCertificates(), PostgresServerCertName) } func (p *Paths) PostgresServerKey() string { - return filepath.Join(p.PostgresCertificates(), postgresServerKeyName) + return filepath.Join(p.PostgresCertificates(), PostgresServerKeyName) } func (p *Paths) PostgresSuperuserCert() string { - return filepath.Join(p.PostgresCertificates(), postgresSuperuserCertName) + return filepath.Join(p.PostgresCertificates(), PostgresSuperuserCertName) } func (p *Paths) PostgresSuperuserKey() string { - return filepath.Join(p.PostgresCertificates(), postgresSuperuserKeyName) + return filepath.Join(p.PostgresCertificates(), PostgresSuperuserKeyName) } func (p *Paths) PostgresReplicatorCert() string { - return filepath.Join(p.PostgresCertificates(), postgresReplicatorCertName) + return filepath.Join(p.PostgresCertificates(), PostgresReplicatorCertName) } func (p *Paths) PostgresReplicatorKey() string { - return filepath.Join(p.PostgresCertificates(), postgresReplicatorKeyName) + return filepath.Join(p.PostgresCertificates(), PostgresReplicatorKeyName) } diff --git a/server/internal/orchestrator/common/paths_test.go b/server/internal/database/paths_test.go similarity index 89% rename from server/internal/orchestrator/common/paths_test.go rename to server/internal/database/paths_test.go index 4c9afb61..0908cc51 100644 --- a/server/internal/orchestrator/common/paths_test.go +++ b/server/internal/database/paths_test.go @@ -1,11 +1,11 @@ -package common_test +package database_test import ( "testing" "github.com/stretchr/testify/assert" - "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/pgbackrest" ) @@ -79,9 +79,9 @@ func TestInstancePaths(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - paths := &common.InstancePaths{ - Instance: common.Paths{BaseDir: "/opt/pgedge"}, - Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + paths := &database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/bin/patroni", } diff --git a/server/internal/orchestrator/common/etcd_creds.go b/server/internal/orchestrator/common/etcd_creds.go index b00dd410..96070d2a 100644 --- a/server/internal/orchestrator/common/etcd_creds.go +++ b/server/internal/orchestrator/common/etcd_creds.go @@ -11,18 +11,13 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/etcd" "github.com/pgEdge/control-plane/server/internal/filesystem" "github.com/pgEdge/control-plane/server/internal/patroni" "github.com/pgEdge/control-plane/server/internal/resource" ) -const ( - etcdCaCertName = "ca.crt" - etcdClientCertName = "client.crt" - etcdClientKeyName = "client.key" -) - var _ resource.Resource = (*EtcdCreds)(nil) const ResourceTypeEtcdCreds resource.Type = "common.etcd_creds" @@ -93,15 +88,15 @@ func (c *EtcdCreds) Refresh(ctx context.Context, rc *resource.Context) error { } certsDir := filepath.Join(parentFullPath, "etcd") - caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdCaCertName)) + caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.EtcdCaCertName)) if err != nil { return fmt.Errorf("failed to read CA cert: %w", err) } - clientCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientCertName)) + clientCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.EtcdClientCertName)) if err != nil { return fmt.Errorf("failed to read client cert: %w", err) } - clientKey, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientKeyName)) + clientKey, err := ReadResourceFile(fs, filepath.Join(certsDir, database.EtcdClientKeyName)) if err != nil { return fmt.Errorf("failed to read client key: %w", err) } @@ -160,9 +155,9 @@ func (c *EtcdCreds) Create(ctx context.Context, rc *resource.Context) error { } files := map[string][]byte{ - etcdCaCertName: c.CaCert, - etcdClientCertName: c.ClientCert, - etcdClientKeyName: c.ClientKey, + database.EtcdCaCertName: c.CaCert, + database.EtcdClientCertName: c.ClientCert, + database.EtcdClientKeyName: c.ClientKey, } for name, content := range files { diff --git a/server/internal/orchestrator/common/patroni_config_generator.go b/server/internal/orchestrator/common/patroni_config_generator.go index 69895c14..0873fac4 100644 --- a/server/internal/orchestrator/common/patroni_config_generator.go +++ b/server/internal/orchestrator/common/patroni_config_generator.go @@ -87,7 +87,7 @@ type PatroniConfigGeneratorOptions struct { // PostgresPort is the port that Postgres will listen on. PostgresPort int // Paths is used to compute the paths of directories and executables. - Paths InstancePaths + Paths database.InstancePaths } func NewPatroniConfigGenerator(opts PatroniConfigGeneratorOptions) *PatroniConfigGenerator { @@ -185,9 +185,9 @@ func (p *PatroniConfigGenerator) parameters() map[string]any { maps.Copy(parameters, postgres.DefaultTunableGUCs(p.MemoryBytes, p.CPUs, p.ClusterSize)) maps.Copy(parameters, map[string]any{ "ssl": "on", - "ssl_ca_file": filepath.Join(p.PostgresCertsDir, postgresCaCertName), - "ssl_cert_file": filepath.Join(p.PostgresCertsDir, postgresServerCertName), - "ssl_key_file": filepath.Join(p.PostgresCertsDir, postgresServerKeyName), + "ssl_ca_file": filepath.Join(p.PostgresCertsDir, database.PostgresCaCertName), + "ssl_cert_file": filepath.Join(p.PostgresCertsDir, database.PostgresServerCertName), + "ssl_key_file": filepath.Join(p.PostgresCertsDir, database.PostgresServerKeyName), }) maps.Copy(parameters, p.OrchestratorParameters) if p.ArchiveCommand != "" { @@ -253,9 +253,9 @@ func (p *PatroniConfigGenerator) log() *patroni.Log { func (p *PatroniConfigGenerator) etcd(hosts []string, creds *EtcdCreds) *patroni.Etcd { return &patroni.Etcd{ Hosts: &hosts, - CACert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdCaCertName)), - Cert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdClientCertName)), - Key: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdClientKeyName)), + CACert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, database.EtcdCaCertName)), + Cert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, database.EtcdClientCertName)), + Key: utils.PointerTo(filepath.Join(p.EtcdCertsDir, database.EtcdClientKeyName)), Username: &creds.Username, Password: &creds.Password, Protocol: utils.PointerTo("https"), @@ -315,16 +315,16 @@ func (p *PatroniConfigGenerator) authentication() *patroni.Authentication { return &patroni.Authentication{ Superuser: &patroni.User{ Username: utils.PointerTo("pgedge"), - SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresCaCertName)), - SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresSuperuserCertName)), - SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresSuperuserKeyName)), + SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresCaCertName)), + SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresSuperuserCertName)), + SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresSuperuserKeyName)), SSLMode: utils.PointerTo("verify-full"), }, Replication: &patroni.User{ Username: utils.PointerTo("patroni_replicator"), - SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresCaCertName)), - SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresReplicatorCertName)), - SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresReplicatorKeyName)), + SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresCaCertName)), + SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresReplicatorCertName)), + SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, database.PostgresReplicatorKeyName)), SSLMode: utils.PointerTo("verify-full"), }, } diff --git a/server/internal/orchestrator/common/patroni_config_generator_test.go b/server/internal/orchestrator/common/patroni_config_generator_test.go index a0547155..25a5ce5c 100644 --- a/server/internal/orchestrator/common/patroni_config_generator_test.go +++ b/server/internal/orchestrator/common/patroni_config_generator_test.go @@ -63,9 +63,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/opt/pgedge"}, - Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, @@ -120,9 +120,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/opt/pgedge"}, - Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, @@ -177,9 +177,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/opt/pgedge"}, - Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, @@ -235,9 +235,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/opt/pgedge"}, - Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, @@ -291,9 +291,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, - Host: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + Host: database.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, @@ -332,9 +332,9 @@ func TestPatroniConfigGenerator(t *testing.T) { }, PatroniPort: 8888, PostgresPort: 5432, - Paths: common.InstancePaths{ - Instance: common.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, - Host: common.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, + Paths: database.InstancePaths{ + Instance: database.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, + Host: database.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", }, diff --git a/server/internal/orchestrator/common/pgbackrest_config.go b/server/internal/orchestrator/common/pgbackrest_config.go index 243f755f..665ddad7 100644 --- a/server/internal/orchestrator/common/pgbackrest_config.go +++ b/server/internal/orchestrator/common/pgbackrest_config.go @@ -10,27 +10,17 @@ import ( "github.com/samber/do" "github.com/spf13/afero" + "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/filesystem" "github.com/pgEdge/control-plane/server/internal/pgbackrest" "github.com/pgEdge/control-plane/server/internal/resource" ) -type PgBackRestConfigType string - -func (t PgBackRestConfigType) String() string { - return string(t) -} - -const ( - PgBackRestConfigTypeBackup PgBackRestConfigType = "backup" - PgBackRestConfigTypeRestore PgBackRestConfigType = "restore" -) - var _ resource.Resource = (*PgBackRestConfig)(nil) const ResourceTypePgBackRestConfig resource.Type = "common.pgbackrest_config" -func PgBackRestConfigIdentifier(instanceID string, configType PgBackRestConfigType) resource.Identifier { +func PgBackRestConfigIdentifier(instanceID string, configType pgbackrest.ConfigType) resource.Identifier { return resource.Identifier{ ID: instanceID + "-" + configType.String(), Type: ResourceTypePgBackRestConfig, @@ -44,10 +34,10 @@ type PgBackRestConfig struct { NodeName string `json:"node_name"` Repositories []*pgbackrest.Repository `json:"repositories"` ParentID string `json:"parent_id"` - Type PgBackRestConfigType `json:"type"` + Type pgbackrest.ConfigType `json:"type"` OwnerUID int `json:"owner_uid"` OwnerGID int `json:"owner_gid"` - Paths InstancePaths `json:"paths"` + Paths database.InstancePaths `json:"paths"` Port int `json:"port"` } diff --git a/server/internal/orchestrator/common/pgbackrest_stanza.go b/server/internal/orchestrator/common/pgbackrest_stanza.go index 386334ca..d5cfaa8e 100644 --- a/server/internal/orchestrator/common/pgbackrest_stanza.go +++ b/server/internal/orchestrator/common/pgbackrest_stanza.go @@ -24,9 +24,8 @@ func PgBackRestStanzaIdentifier(nodeName string) resource.Identifier { } type PgBackRestStanza struct { - DatabaseID string `json:"database_id"` - NodeName string `json:"node_name"` - Paths InstancePaths `json:"paths"` + DatabaseID string `json:"database_id"` + NodeName string `json:"node_name"` } func (p *PgBackRestStanza) ResourceVersion() string { @@ -60,14 +59,18 @@ func (p *PgBackRestStanza) Refresh(ctx context.Context, rc *resource.Context) er if err != nil { return err } - node, err := resource.FromContext[*database.NodeResource](rc, database.NodeResourceIdentifier(p.NodeName)) + primary, err := database.GetPrimaryInstance(ctx, rc, p.NodeName) if err != nil { - return fmt.Errorf("failed to get node %q: %w", p.NodeName, err) + return err + } + paths, err := primary.Paths(orchestrator) + if err != nil { + return fmt.Errorf("failed to get primary instance paths: %w", err) } var output bytes.Buffer - infoCmd := p.Paths.PgBackRestBackupCmd("info", "--output=json").StringSlice() - err = orchestrator.ExecuteInstanceCommand(ctx, &output, p.DatabaseID, node.PrimaryInstanceID, infoCmd...) + infoCmd := paths.PgBackRestBackupCmd("info", "--output=json").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &output, p.DatabaseID, primary.InstanceID(), infoCmd...) if err != nil { // pgbackrest info returns a 0 exit code even if the stanza doesn't // exist, so an error here means something else went wrong. @@ -96,20 +99,24 @@ func (p *PgBackRestStanza) Create(ctx context.Context, rc *resource.Context) err if err != nil { return err } - node, err := resource.FromContext[*database.NodeResource](rc, database.NodeResourceIdentifier(p.NodeName)) + primary, err := database.GetPrimaryInstance(ctx, rc, p.NodeName) + if err != nil { + return err + } + paths, err := primary.Paths(orchestrator) if err != nil { - return fmt.Errorf("failed to get node %q: %w", p.NodeName, err) + return fmt.Errorf("failed to get primary instance paths: %w", err) } var stanzaCreateOut bytes.Buffer - createCmd := p.Paths.PgBackRestBackupCmd("stanza-create", "--io-timeout=10s").StringSlice() - err = orchestrator.ExecuteInstanceCommand(ctx, &stanzaCreateOut, p.DatabaseID, node.PrimaryInstanceID, createCmd...) + createCmd := paths.PgBackRestBackupCmd("stanza-create", "--io-timeout=10s").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &stanzaCreateOut, p.DatabaseID, primary.InstanceID(), createCmd...) if err != nil { return fmt.Errorf("failed to exec pgbackrest stanza-create: %w, output: %s", err, stanzaCreateOut.String()) } var checkOut bytes.Buffer - checkCmd := p.Paths.PgBackRestBackupCmd("check").StringSlice() - err = orchestrator.ExecuteInstanceCommand(ctx, &checkOut, p.DatabaseID, node.PrimaryInstanceID, checkCmd...) + checkCmd := paths.PgBackRestBackupCmd("check").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &checkOut, p.DatabaseID, primary.InstanceID(), checkCmd...) if err != nil { return fmt.Errorf("failed to exec pgbackrest check: %w, output: %s", err, checkOut.String()) } diff --git a/server/internal/orchestrator/common/postgres_certs.go b/server/internal/orchestrator/common/postgres_certs.go index 05aecd97..c698dc64 100644 --- a/server/internal/orchestrator/common/postgres_certs.go +++ b/server/internal/orchestrator/common/postgres_certs.go @@ -10,21 +10,12 @@ import ( "github.com/spf13/afero" "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/ds" "github.com/pgEdge/control-plane/server/internal/filesystem" "github.com/pgEdge/control-plane/server/internal/resource" ) -const ( - postgresCaCertName = "ca.crt" - postgresServerCertName = "server.crt" - postgresServerKeyName = "server.key" - postgresSuperuserCertName = "superuser.crt" - postgresSuperuserKeyName = "superuser.key" - postgresReplicatorCertName = "replication.crt" - postgresReplicatorKeyName = "replication.key" -) - var _ resource.Resource = (*PostgresCerts)(nil) const ResourceTypePostgresCerts resource.Type = "common.postgres_certs" @@ -98,31 +89,31 @@ func (c *PostgresCerts) Refresh(ctx context.Context, rc *resource.Context) error } certsDir := filepath.Join(parentFullPath, "postgres") - caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresCaCertName)) + caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresCaCertName)) if err != nil { return fmt.Errorf("failed to read CA cert: %w", err) } - serverCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresServerCertName)) + serverCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresServerCertName)) if err != nil { return fmt.Errorf("failed to read server cert: %w", err) } - serverKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresServerKeyName)) + serverKey, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresServerKeyName)) if err != nil { return fmt.Errorf("failed to read server key: %w", err) } - superuserCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresSuperuserCertName)) + superuserCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresSuperuserCertName)) if err != nil { return fmt.Errorf("failed to read superuser cert: %w", err) } - superuserKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresSuperuserKeyName)) + superuserKey, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresSuperuserKeyName)) if err != nil { return fmt.Errorf("failed to read superuser key: %w", err) } - replicationCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresReplicatorCertName)) + replicationCert, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresReplicatorCertName)) if err != nil { return fmt.Errorf("failed to read replication cert: %w", err) } - replicationKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresReplicatorKeyName)) + replicationKey, err := ReadResourceFile(fs, filepath.Join(certsDir, database.PostgresReplicatorKeyName)) if err != nil { return fmt.Errorf("failed to read replication key: %w", err) } @@ -190,13 +181,13 @@ func (c *PostgresCerts) Create(ctx context.Context, rc *resource.Context) error } files := map[string][]byte{ - postgresCaCertName: c.CaCert, - postgresServerCertName: c.ServerCert, - postgresServerKeyName: c.ServerKey, - postgresSuperuserCertName: c.SuperuserCert, - postgresSuperuserKeyName: c.SuperuserKey, - postgresReplicatorCertName: c.ReplicationCert, - postgresReplicatorKeyName: c.ReplicationKey, + database.PostgresCaCertName: c.CaCert, + database.PostgresServerCertName: c.ServerCert, + database.PostgresServerKeyName: c.ServerKey, + database.PostgresSuperuserCertName: c.SuperuserCert, + database.PostgresSuperuserKeyName: c.SuperuserKey, + database.PostgresReplicatorCertName: c.ReplicationCert, + database.PostgresReplicatorKeyName: c.ReplicationKey, } for name, content := range files { diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index fd06dfd2..8cc3cb54 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -149,12 +149,12 @@ func (o *Orchestrator) PopulateHostStatus(ctx context.Context, status *host.Host } func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResources, error) { - instance, orchestratorResources, err := o.instanceResources(spec, scripts) + instance, instanceDependencies, nodeDependents, err := o.instanceResources(spec, scripts) if err != nil { return nil, err } - resources, err := database.NewInstanceResources(instance, orchestratorResources, nil) + resources, err := database.NewInstanceResources(instance, instanceDependencies, nil, nodeDependents) if err != nil { return nil, fmt.Errorf("failed to create instance resources: %w", err) } @@ -171,10 +171,10 @@ func ServiceInstanceName(databaseID, serviceID, hostID string) string { return fmt.Sprintf("%s-%s-%s", databaseID, serviceID, base36[:8]) } -func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, error) { +func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResource, []resource.Resource, []resource.Resource, error) { images, err := o.versions.GetImages(spec.PgEdgeVersion) if err != nil { - return nil, nil, fmt.Errorf("failed to get images: %w", err) + return nil, nil, nil, fmt.Errorf("failed to get images: %w", err) } instanceHostname := fmt.Sprintf("postgres-%s", spec.InstanceID) @@ -300,7 +300,7 @@ func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts da }, } - orchestratorResources := []resource.Resource{ + instanceDependencies := []resource.Resource{ databaseNetwork, patroniCluster, patroniMember, @@ -317,8 +317,10 @@ func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts da service, } + var nodeDependents []resource.Resource + if spec.BackupConfig != nil { - orchestratorResources = append(orchestratorResources, + instanceDependencies = append(instanceDependencies, &PgBackRestConfig{ InstanceID: spec.InstanceID, HostID: spec.HostID, @@ -330,12 +332,14 @@ func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts da OwnerUID: o.cfg.DatabaseOwnerUID, OwnerGID: o.cfg.DatabaseOwnerUID, }, + ) + nodeDependents = append(nodeDependents, &PgBackRestStanza{ NodeName: spec.NodeName, }, ) for _, schedule := range spec.BackupConfig.Schedules { - orchestratorResources = append(orchestratorResources, scheduler.NewScheduledJobResource( + nodeDependents = append(nodeDependents, scheduler.NewScheduledJobResource( fmt.Sprintf("%s-%s-%s", schedule.ID, spec.DatabaseID, spec.NodeName), schedule.CronExpression, scheduler.WorkflowCreatePgBackRestBackup, @@ -350,7 +354,7 @@ func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts da } if spec.RestoreConfig != nil { - orchestratorResources = append(orchestratorResources, &PgBackRestConfig{ + instanceDependencies = append(instanceDependencies, &PgBackRestConfig{ InstanceID: spec.InstanceID, HostID: spec.HostID, DatabaseID: spec.RestoreConfig.SourceDatabaseID, @@ -363,7 +367,7 @@ func (o *Orchestrator) instanceResources(spec *database.InstanceSpec, scripts da }) } - return instance, orchestratorResources, nil + return instance, instanceDependencies, nodeDependents, nil } func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceSpec, taskID uuid.UUID) (*database.InstanceResources, error) { @@ -373,12 +377,12 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS spec.InPlaceRestore = true - instance, resources, err := o.instanceResources(spec, nil) + instance, instanceDependencies, nodeDependents, err := o.instanceResources(spec, nil) if err != nil { return nil, fmt.Errorf("failed to generate instance resources: %w", err) } - resources = append(resources, + instanceDependencies = append(instanceDependencies, &ScaleService{ InstanceID: spec.InstanceID, ScaleDirection: ScaleDirectionDOWN, @@ -401,7 +405,7 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS instance.OrchestratorDependencies = append(instance.OrchestratorDependencies, ScaleServiceResourceIdentifier(spec.InstanceID, ScaleDirectionUP)) - instanceResources, err := database.NewInstanceResources(instance, resources, nil) + instanceResources, err := database.NewInstanceResources(instance, instanceDependencies, nil, nodeDependents) if err != nil { return nil, fmt.Errorf("failed to initialize instance resources: %w", err) } @@ -750,14 +754,14 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan // KeysDirID is the parent data dir; the actual keys subdir path is derived at runtime. serviceName := ServiceInstanceName(spec.DatabaseID, spec.ServiceSpec.ServiceID, spec.HostID) serviceInstanceSpec := &ServiceInstanceSpecResource{ - ServiceInstanceID: spec.ServiceInstanceID, - ServiceSpec: spec.ServiceSpec, - DatabaseID: spec.DatabaseID, - DatabaseName: spec.DatabaseName, - HostID: spec.HostID, - ServiceName: serviceName, - Hostname: serviceName, - CohortMemberID: o.swarmNodeID, + ServiceInstanceID: spec.ServiceInstanceID, + ServiceSpec: spec.ServiceSpec, + DatabaseID: spec.DatabaseID, + DatabaseName: spec.DatabaseName, + HostID: spec.HostID, + ServiceName: serviceName, + Hostname: serviceName, + CohortMemberID: o.swarmNodeID, ServiceImage: serviceImage, Credentials: spec.Credentials, DatabaseNetworkID: databaseNetwork.Name, @@ -1019,6 +1023,15 @@ func (o *Orchestrator) NodeDSN(ctx context.Context, rc *resource.Context, nodeNa return node.DSN(ctx, rc, instance, dbName) } +func (o *Orchestrator) InstancePaths(_ *ds.Version, instanceID string) (database.InstancePaths, error) { + return database.InstancePaths{ + Instance: database.Paths{BaseDir: "/opt/pgedge"}, + Host: database.Paths{BaseDir: filepath.Join(o.cfg.DataDir, "instances", instanceID)}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, nil +} + func (o *Orchestrator) scaleInstance( ctx context.Context, instanceID string, diff --git a/server/internal/orchestrator/systemd/orchestrator.go b/server/internal/orchestrator/systemd/orchestrator.go index db78e9df..dba7414e 100644 --- a/server/internal/orchestrator/systemd/orchestrator.go +++ b/server/internal/orchestrator/systemd/orchestrator.go @@ -147,7 +147,7 @@ func (o *Orchestrator) PopulateHostStatus(ctx context.Context, h *host.HostStatu } func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, scripts database.Scripts) (*database.InstanceResources, error) { - paths, err := o.instancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) + paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil { return nil, err } @@ -280,7 +280,7 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc }, } - orchestratorResources := []resource.Resource{ + instanceDependencies := []resource.Resource{ patroniCluster, patroniMember, instanceDir, @@ -293,7 +293,7 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc patroniUnit, } - dbDependencyResources := []resource.Resource{&common.PgServiceConf{ + dbDependencies := []resource.Resource{&common.PgServiceConf{ ParentID: configsDir.ID, HostID: spec.HostID, InstanceID: spec.InstanceID, @@ -301,8 +301,9 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc OwnerGID: o.cfg.DatabaseOwnerUID, }} + var nodeDependents []resource.Resource if spec.BackupConfig != nil { - orchestratorResources = append(orchestratorResources, + instanceDependencies = append(instanceDependencies, &common.PgBackRestConfig{ InstanceID: spec.InstanceID, HostID: spec.HostID, @@ -310,20 +311,21 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc NodeName: spec.NodeName, Repositories: spec.BackupConfig.Repositories, ParentID: configsDir.ID, - Type: common.PgBackRestConfigTypeBackup, + Type: pgbackrest.ConfigTypeBackup, OwnerUID: o.cfg.DatabaseOwnerUID, OwnerGID: o.cfg.DatabaseOwnerUID, Paths: paths, Port: postgresPort, }, + ) + nodeDependents = append(nodeDependents, &common.PgBackRestStanza{ DatabaseID: spec.DatabaseID, NodeName: spec.NodeName, - Paths: paths, }, ) for _, schedule := range spec.BackupConfig.Schedules { - orchestratorResources = append(orchestratorResources, scheduler.NewScheduledJobResource( + nodeDependents = append(nodeDependents, scheduler.NewScheduledJobResource( fmt.Sprintf("%s-%s-%s", schedule.ID, spec.DatabaseID, spec.NodeName), schedule.CronExpression, scheduler.WorkflowCreatePgBackRestBackup, @@ -338,14 +340,14 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc } if spec.RestoreConfig != nil { - orchestratorResources = append(orchestratorResources, &common.PgBackRestConfig{ + instanceDependencies = append(instanceDependencies, &common.PgBackRestConfig{ InstanceID: spec.InstanceID, HostID: spec.HostID, DatabaseID: spec.RestoreConfig.SourceDatabaseID, NodeName: spec.RestoreConfig.SourceNodeName, Repositories: []*pgbackrest.Repository{spec.RestoreConfig.Repository}, ParentID: configsDir.ID, - Type: common.PgBackRestConfigTypeRestore, + Type: pgbackrest.ConfigTypeRestore, OwnerUID: o.cfg.DatabaseOwnerUID, OwnerGID: o.cfg.DatabaseOwnerUID, Paths: paths, @@ -353,7 +355,7 @@ func (o *Orchestrator) GenerateInstanceResources(spec *database.InstanceSpec, sc }) } - return database.NewInstanceResources(instance, orchestratorResources, dbDependencyResources) + return database.NewInstanceResources(instance, instanceDependencies, dbDependencies, nodeDependents) } func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) { @@ -364,7 +366,7 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS if spec.RestoreConfig == nil { return nil, fmt.Errorf("missing restore config for node %s instance %s", spec.NodeName, spec.InstanceID) } - paths, err := o.instancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) + paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil { return nil, err } @@ -377,7 +379,7 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS return nil, err } - err = instance.AddResources(&PgBackRestRestore{ + err = instance.AddInstanceDependencies(&PgBackRestRestore{ DatabaseID: spec.DatabaseID, HostID: spec.HostID, InstanceID: spec.InstanceID, @@ -408,7 +410,7 @@ func (o *Orchestrator) GetInstanceConnectionInfo(ctx context.Context, return nil, fmt.Errorf("postgres version is not yet recorded for this instance") } - paths, err := o.instancePaths(pgEdgeVersion.PostgresVersion, instanceID) + paths, err := o.InstancePaths(pgEdgeVersion.PostgresVersion, instanceID) if err != nil { return nil, err } @@ -455,7 +457,7 @@ func (o *Orchestrator) ExecuteInstanceCommand(ctx context.Context, w io.Writer, } func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer, spec *database.InstanceSpec, options *pgbackrest.BackupOptions) error { - paths, err := o.instancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) + paths, err := o.InstancePaths(spec.PgEdgeVersion.PostgresVersion, spec.InstanceID) if err != nil { return err } @@ -559,10 +561,10 @@ func (o *Orchestrator) NodeDSN(ctx context.Context, rc *resource.Context, nodeNa }, nil } -func (o *Orchestrator) instancePaths(pgVersion *ds.Version, instanceID string) (common.InstancePaths, error) { +func (o *Orchestrator) InstancePaths(pgVersion *ds.Version, instanceID string) (database.InstancePaths, error) { pgMajor, ok := pgVersion.MajorString() if !ok { - return common.InstancePaths{}, errors.New("got empty postgres version") + return database.InstancePaths{}, errors.New("got empty postgres version") } var baseDir string @@ -572,9 +574,9 @@ func (o *Orchestrator) instancePaths(pgVersion *ds.Version, instanceID string) ( baseDir = filepath.Join(o.packageManager.InstanceDataBaseDir(pgMajor), instanceID) } - return common.InstancePaths{ - Instance: common.Paths{BaseDir: baseDir}, - Host: common.Paths{BaseDir: baseDir}, + return database.InstancePaths{ + Instance: database.Paths{BaseDir: baseDir}, + Host: database.Paths{BaseDir: baseDir}, PgBackRestPath: o.cfg.SystemD.PgBackRestPath, PatroniPath: o.cfg.SystemD.PatroniPath, }, nil diff --git a/server/internal/orchestrator/systemd/patroni_unit.go b/server/internal/orchestrator/systemd/patroni_unit.go index 05994ed2..5dbadea2 100644 --- a/server/internal/orchestrator/systemd/patroni_unit.go +++ b/server/internal/orchestrator/systemd/patroni_unit.go @@ -6,11 +6,11 @@ import ( "path/filepath" "github.com/coreos/go-systemd/v22/unit" - "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/database" ) func PatroniUnitOptions( - paths common.InstancePaths, + paths database.InstancePaths, pgBinPath string, cpus float64, memoryBytes uint64, diff --git a/server/internal/orchestrator/systemd/pgbackrest_restore.go b/server/internal/orchestrator/systemd/pgbackrest_restore.go index ab51f74c..e958180e 100644 --- a/server/internal/orchestrator/systemd/pgbackrest_restore.go +++ b/server/internal/orchestrator/systemd/pgbackrest_restore.go @@ -13,6 +13,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/pgbackrest" "github.com/pgEdge/control-plane/server/internal/resource" "github.com/pgEdge/control-plane/server/internal/task" "github.com/pgEdge/control-plane/server/internal/utils" @@ -30,13 +31,13 @@ func PgBackRestRestoreResourceIdentifier(instanceID string) resource.Identifier } type PgBackRestRestore struct { - DatabaseID string `json:"database_id"` - HostID string `json:"host_id"` - InstanceID string `json:"instance_id"` - TaskID uuid.UUID `json:"task_id"` - NodeName string `json:"node_name"` - Paths common.InstancePaths `json:"paths"` - RestoreOptions map[string]string `json:"restore_options"` + DatabaseID string `json:"database_id"` + HostID string `json:"host_id"` + InstanceID string `json:"instance_id"` + TaskID uuid.UUID `json:"task_id"` + NodeName string `json:"node_name"` + Paths database.InstancePaths `json:"paths"` + RestoreOptions map[string]string `json:"restore_options"` } func (p *PgBackRestRestore) ResourceVersion() string { @@ -57,7 +58,7 @@ func (p *PgBackRestRestore) Identifier() resource.Identifier { func (p *PgBackRestRestore) Dependencies() []resource.Identifier { return []resource.Identifier{ - common.PgBackRestConfigIdentifier(p.InstanceID, common.PgBackRestConfigTypeRestore), + common.PgBackRestConfigIdentifier(p.InstanceID, pgbackrest.ConfigTypeRestore), common.PatroniClusterResourceIdentifier(p.NodeName), UnitResourceIdentifier(patroniServiceName(p.InstanceID), p.DatabaseID, p.HostID), } diff --git a/server/internal/orchestrator/systemd/unit_options_test.go b/server/internal/orchestrator/systemd/unit_options_test.go index 9201deb9..0e431bf1 100644 --- a/server/internal/orchestrator/systemd/unit_options_test.go +++ b/server/internal/orchestrator/systemd/unit_options_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/coreos/go-systemd/v22/unit" - "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/database" "github.com/pgEdge/control-plane/server/internal/orchestrator/systemd" "github.com/pgEdge/control-plane/server/internal/testutils" ) @@ -45,9 +45,9 @@ func TestUnitOptions(t *testing.T) { } t.Run("PatroniUnitOptions", func(t *testing.T) { - paths := common.InstancePaths{ - Instance: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, - Host: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + paths := database.InstancePaths{ + Instance: database.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + Host: database.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, PgBackRestPath: "/usr/bin/pgbackrest", PatroniPath: "/usr/local/bin/patroni", } @@ -55,7 +55,7 @@ func TestUnitOptions(t *testing.T) { for _, tc := range []struct { name string - paths common.InstancePaths + paths database.InstancePaths pgBinPath string cpus float64 memoryBytes uint64 diff --git a/server/internal/pgbackrest/config.go b/server/internal/pgbackrest/config.go index f7e3c040..b580ae6a 100644 --- a/server/internal/pgbackrest/config.go +++ b/server/internal/pgbackrest/config.go @@ -13,6 +13,17 @@ import ( "gopkg.in/ini.v1" ) +type ConfigType string + +func (t ConfigType) String() string { + return string(t) +} + +const ( + ConfigTypeBackup ConfigType = "backup" + ConfigTypeRestore ConfigType = "restore" +) + type RepositoryType string const (