Skip to content

Commit

Permalink
Generating better names for plan phase
Browse files Browse the repository at this point in the history
  • Loading branch information
marccampbell committed Apr 5, 2020
1 parent 7249f6a commit d265b9b
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 96 deletions.
99 changes: 51 additions & 48 deletions pkg/controller/table/objects.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,73 @@
package table

import (
"context"
"crypto/sha256"
"fmt"

"github.com/pkg/errors"
databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

func planConfigMap(namespace string, tableName string, tableSpec schemasv1alpha3.TableSpec) (*corev1.ConfigMap, error) {
b, err := yaml.Marshal(tableSpec)
func configMapNameForPlan(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) string {
shortID, err := getShortIDForTableSpec(table)
if err != nil {
return table.Name
}
configMapName := fmt.Sprintf("%s-%s-%s-plan", database.Name, table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(configMapName, false)) > 0 {
configMapName = fmt.Sprintf("%s-%s-plan", table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(configMapName, false)) > 0 {
configMapName = fmt.Sprintf("%s-plan", shortID)
}
}

return configMapName
}

func podNameForPlan(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) string {
shortID, err := getShortIDForTableSpec(table)
if err != nil {
return table.Name
}
podName := fmt.Sprintf("%s-%s-%s-plan", database.Name, table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(podName, false)) > 0 {
podName = fmt.Sprintf("%s-%s-plan", table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(podName, false)) > 0 {
podName = fmt.Sprintf("%s-plan", shortID)
}
}

return podName
}

func getShortIDForTableSpec(table *schemasv1alpha3.Table) (string, error) {
b, err := yaml.Marshal(table.Spec)
if err != nil {
return "", errors.Wrap(err, "failed to marshal yaml spec")
}

sum := sha256.Sum256(b)
return fmt.Sprintf("%x", sum)[:7], nil
}

func getPlanConfigMap(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.ConfigMap, error) {
b, err := yaml.Marshal(table.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal yaml spec")
}

tableData := make(map[string]string)
tableData["table.yaml"] = string(b)

name := tableName
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Name: configMapNameForPlan(database, table),
Namespace: table.Namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -39,7 +79,7 @@ func planConfigMap(namespace string, tableName string, tableSpec schemasv1alpha3
return configMap, nil
}

func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.Pod, error) {
func (r *ReconcileTable) getPlanPod(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.Pod, error) {
imageName := "schemahero/schemahero:alpha"
nodeSelector := make(map[string]string)
driver := ""
Expand Down Expand Up @@ -78,9 +118,6 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc
labels["schemahero-namespace"] = table.Namespace
labels["schemahero-role"] = "plan"

name := fmt.Sprintf("%s-plan", table.Name)
configMapName := table.Name

args := []string{
"plan",
"--driver",
Expand All @@ -93,7 +130,7 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: podNameForPlan(database, table),
Namespace: database.Namespace,
Labels: labels,
},
Expand Down Expand Up @@ -125,7 +162,7 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMapName,
Name: configMapNameForPlan(database, table),
},
},
},
Expand All @@ -136,37 +173,3 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc

return pod, nil
}

func (r *ReconcileTable) ensureTableConfigMap(ctx context.Context, desiredConfigMap *corev1.ConfigMap) error {
existingConfigMap := corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Name: desiredConfigMap.Name, Namespace: desiredConfigMap.Namespace}, &existingConfigMap); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(ctx, desiredConfigMap)
if err != nil {
return errors.Wrap(err, "failed to create configmap")
}
}

return errors.Wrap(err, "failed to get existing configmap")
}

return nil
}

func (r *ReconcileTable) ensureTablePod(ctx context.Context, desiredPod *corev1.Pod) error {
existingPod := corev1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Name: desiredPod.Name, Namespace: desiredPod.Namespace}, &existingPod); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(ctx, desiredPod)
if err != nil {
return errors.Wrap(err, "failed to create table migration pod")
}

return nil
}

return errors.Wrap(err, "failed to get existing pod object")
}

return nil
}
41 changes: 23 additions & 18 deletions pkg/controller/table/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,50 @@ package table
import (
"testing"

databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

func Test_planConfigMap(t *testing.T) {
tests := []struct {
name string
namespace string
tableName string
tableSpec schemasv1alpha3.TableSpec
expect corev1.ConfigMap
name string
table schemasv1alpha3.Table
database databasesv1alpha3.Database
expect string
}{
{
name: "basic test",
namespace: "foo",
tableName: "name",
tableSpec: schemasv1alpha3.TableSpec{
Database: "db",
Name: "name",
Schema: &schemasv1alpha3.TableSchema{
Postgres: &schemasv1alpha3.SQLTableSchema{},
name: "basic test",
table: schemasv1alpha3.Table{
Spec: schemasv1alpha3.TableSpec{
Database: "db",
Name: "name",
Schema: &schemasv1alpha3.TableSchema{
Postgres: &schemasv1alpha3.SQLTableSchema{},
},
},
},
database: databasesv1alpha3.Database{},
expect: `database: db
name: name
schema:
postgres:
primaryKey: []
columns: []
`,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req := require.New(t)

actual, err := planConfigMap(test.namespace, test.tableName, test.tableSpec)
actual, err := getPlanConfigMap(&test.database, &test.table)
req.NoError(err)

// check some of the fields on the config map
assert.Equal(t, test.tableName, actual.Name)
assert.Len(t, actual.Data, 1)
assert.NotNil(t, actual.Data["table.yaml"], actual.Data["table.yaml"])
assert.Equal(t, actual.Data["table.yaml"], test.expect)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,6 @@ func (r *ReconcileTable) reconcilePod(ctx context.Context, pod *corev1.Pod) (rec
return reconcile.Result{}, nil
}

func (r *ReconcileTable) plan(ctx context.Context, database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) error {
logger.Debug("deploying plan")

configMap, err := planConfigMap(database.Namespace, table.Name, table.Spec)
if err != nil {
return errors.Wrap(err, "failed to get config map object for plan")
}
pod, err := r.planPod(database, table)
if err != nil {
return errors.Wrap(err, "failed to get pod for plan")
}

if err := r.ensureTableConfigMap(ctx, configMap); err != nil {
return errors.Wrap(err, "failed to create config map for plan")
}

if err := r.ensureTablePod(ctx, pod); err != nil {
return errors.Wrap(err, "failerd to create pod for plan")
}

return nil
}

func (r *ReconcileTable) readConnectionURI(namespace string, valueOrValueFrom databasesv1alpha3.ValueOrValueFrom) (string, error) {
if valueOrValueFrom.Value != "" {
return valueOrValueFrom.Value, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ import (
databasesclientv1alpha3 "github.com/schemahero/schemahero/pkg/client/schemaheroclientset/typed/databases/v1alpha3"
"github.com/schemahero/schemahero/pkg/logger"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (r *ReconcileTable) reconcileInstance(ctx context.Context, instance *schemasv1alpha3.Table) (reconcile.Result, error) {
func (r *ReconcileTable) reconcileTable(ctx context.Context, instance *schemasv1alpha3.Table) (reconcile.Result, error) {
logger.Debug("reconciling table",
zap.String("kind", instance.Kind),
zap.String("name", instance.Name),
zap.String("database", instance.Spec.Database))

// get the full database spec from the api
database, err := r.getDatabaseSpec(ctx, instance.Namespace, instance.Spec.Database)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to get database spec")
}

// the database object might not yet exist
// this can happen if the table was deployed at the same time or before the database object
if database == nil {
// TDOO add a status field with this state
logger.Debug("requeuing table reconcile request for 10 seconds because database instance was not present",
Expand Down Expand Up @@ -64,11 +70,7 @@ func (r *ReconcileTable) reconcileInstance(ctx context.Context, instance *schema
}

// Deploy a pod to calculculate the plan
if err := r.plan(ctx, database, instance); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to schedule plan phase")
}

return reconcile.Result{}, nil
return r.deployMigrationPlanPhase(ctx, database, instance)
}

func (r *ReconcileTable) getInstance(request reconcile.Request) (*schemasv1alpha3.Table, error) {
Expand Down Expand Up @@ -140,3 +142,87 @@ func checkDatabaseTypeMatches(connection *databasesv1alpha3.DatabaseConnection,

return false
}

func (r *ReconcileTable) deployMigrationPlanPhase(ctx context.Context, database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (reconcile.Result, error) {
logger.Debug("deploying plan phase of migration",
zap.String("databaseName", database.Name),
zap.String("tableName", table.Name))

desiredConfigMap, err := getPlanConfigMap(database, table)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to get config map object for plan")
}
var existingConfigMap corev1.ConfigMap
configMapChanged := false
err = r.Get(ctx, types.NamespacedName{
Name: desiredConfigMap.Name,
Namespace: desiredConfigMap.Namespace,
}, &existingConfigMap)
if kuberneteserrors.IsNotFound(err) {
// create it
if err := controllerutil.SetControllerReference(table, desiredConfigMap, r.scheme); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to set owner on configmap")
}
if err := r.Create(ctx, desiredConfigMap); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create config map")
}
} else if err == nil {
// update it
existingConfigMap.Data = map[string]string{}
for k, v := range desiredConfigMap.Data {
d, ok := existingConfigMap.Data[k]
if !ok || d != v {
existingConfigMap.Data[k] = v
configMapChanged = true
}
}
if configMapChanged {
if err := controllerutil.SetControllerReference(table, &existingConfigMap, r.scheme); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to update owner on configmap")
}
if err = r.Update(ctx, &existingConfigMap); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to update config map")
}
}
} else {
// something bad is happening here
return reconcile.Result{}, errors.Wrap(err, "failed to check if config map exists")
}

desiredPod, err := r.getPlanPod(database, table)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to get pod for plan")
}
var existingPod corev1.Pod
err = r.Get(ctx, types.NamespacedName{
Name: desiredPod.Name,
Namespace: desiredPod.Namespace,
}, &existingPod)
if kuberneteserrors.IsNotFound(err) {
// create it
if err := r.Create(ctx, desiredPod); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create plan pod")
}
if err := controllerutil.SetControllerReference(table, desiredPod, r.scheme); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to set owner on pod")
}
} else if err == nil {
// maybe update it
if configMapChanged {
// restart the pod by deleting and recreating
logger.Debug("deleting plan pod because config map has changed",
zap.String("podName", existingPod.Name))
if err = r.Delete(ctx, &existingPod); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to delete pod")
}

// This pod will be recreated later in another exceution of the reconcile loop
// we watch pods, and when that above delete completed, the reconcile will happen again
}
} else {
// again, something bad
return reconcile.Result{}, errors.Wrap(err, "failed to check if pod exists")
}

return reconcile.Result{}, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/table/table_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ReconcileTable) Reconcile(request reconcile.Request) (reconcile.Result,
// so this function is simply an entrypoint that executes the right reconcile loop
instance, instanceErr := r.getInstance(request)
if instanceErr == nil {
result, err := r.reconcileInstance(context.Background(), instance)
result, err := r.reconcileTable(context.Background(), instance)
if err != nil {
logger.Error(err)
}
Expand Down

0 comments on commit d265b9b

Please sign in to comment.