Skip to content

Commit

Permalink
Merge pull request #138 from schemahero/0.8.0-alpha.2
Browse files Browse the repository at this point in the history
Generating better names for plan phase
  • Loading branch information
marccampbell committed Apr 5, 2020
2 parents e99f531 + 5399cc3 commit 88c618d
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 124 deletions.
14 changes: 12 additions & 2 deletions pkg/controller/migration/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ReconcileMigration) Reconcile(request reconcile.Request) (reconcile.Res
// so this function is simply an entrypoint that executes the right reconcile loop
instance, instanceErr := r.getInstance(request)
if instanceErr == nil {
result, err := r.reconcileInstance(context.Background(), instance)
result, err := r.reconcileMigration(context.Background(), instance)
if err != nil {
logger.Error(err)
}
Expand All @@ -117,7 +117,7 @@ func (r *ReconcileMigration) Reconcile(request reconcile.Request) (reconcile.Res
pod := &corev1.Pod{}
podErr := r.Get(context.Background(), request.NamespacedName, pod)
if podErr == nil {
result, err := r.reconcilePod(pod)
result, err := r.reconcilePod(context.Background(), pod)
if err != nil {
logger.Error(err)
}
Expand All @@ -127,6 +127,16 @@ func (r *ReconcileMigration) Reconcile(request reconcile.Request) (reconcile.Res
return reconcile.Result{}, errors.New("unknown error in migration reconciler")
}

func (r *ReconcileMigration) getInstance(request reconcile.Request) (*schemasv1alpha3.Migration, error) {
v1alpha3instance := &schemasv1alpha3.Migration{}
err := r.Get(context.Background(), request.NamespacedName, v1alpha3instance)
if err != nil {
return nil, err // don't wrap
}

return v1alpha3instance, nil
}

func (r *ReconcileMigration) readConnectionURI(database *databasesv1alpha3.Database) (string, error) {
var valueOrValueFrom *databasesv1alpha3.ValueOrValueFrom

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (r *ReconcileMigration) getInstance(request reconcile.Request) (*schemasv1alpha3.Migration, error) {
v1alpha3instance := &schemasv1alpha3.Migration{}
err := r.Get(context.Background(), request.NamespacedName, v1alpha3instance)
if err != nil {
return nil, err // don't wrap
}

return v1alpha3instance, nil
}

func (r *ReconcileMigration) reconcileInstance(ctx context.Context, instance *schemasv1alpha3.Migration) (reconcile.Result, error) {
func (r *ReconcileMigration) reconcileMigration(ctx context.Context, instance *schemasv1alpha3.Migration) (reconcile.Result, error) {
logger.Debug("reconciling migration",
zap.String("kind", instance.Kind),
zap.String("name", instance.Name),
Expand Down Expand Up @@ -122,13 +112,3 @@ func (r *ReconcileMigration) reconcileInstance(ctx context.Context, instance *sc

return reconcile.Result{}, nil
}

func (r *ReconcileMigration) reconcilePod(pod *corev1.Pod) (reconcile.Result, error) {
// podLabels := pod.GetObjectMeta().GetLabels()
// role, ok := podLabels["schemahero-role"]
// if !ok {
// return reconcile.Result{}, nil
// }

return reconcile.Result{}, nil
}
41 changes: 41 additions & 0 deletions pkg/controller/migration/reconcile_pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package migration

import (
"context"

"github.com/pkg/errors"
"github.com/schemahero/schemahero/pkg/logger"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (r *ReconcileMigration) reconcilePod(ctx context.Context, pod *corev1.Pod) (reconcile.Result, error) {
podLabels := pod.GetObjectMeta().GetLabels()
role, ok := podLabels["schemahero-role"]
if !ok {
return reconcile.Result{}, nil
}

if role != "" && role != "apply" {
// make sure we are filtering to relevant events
return reconcile.Result{}, nil
}

logger.Debug("reconciling schemahero pod",
zap.String("kind", pod.Kind),
zap.String("name", pod.Name),
zap.String("role", role),
zap.String("podPhase", string(pod.Status.Phase)))

if pod.Status.Phase != corev1.PodSucceeded {
return reconcile.Result{}, nil
}

// delete it
if err := r.Delete(ctx, pod); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to delete apply pod")
}

return reconcile.Result{}, nil
}
99 changes: 51 additions & 48 deletions pkg/controller/table/objects.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,73 @@
package table

import (
"context"
"crypto/sha256"
"fmt"

"github.com/pkg/errors"
databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

func planConfigMap(namespace string, tableName string, tableSpec schemasv1alpha3.TableSpec) (*corev1.ConfigMap, error) {
b, err := yaml.Marshal(tableSpec)
func configMapNameForPlan(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) string {
shortID, err := getShortIDForTableSpec(table)
if err != nil {
return table.Name
}
configMapName := fmt.Sprintf("%s-%s-%s-plan", database.Name, table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(configMapName, false)) > 0 {
configMapName = fmt.Sprintf("%s-%s-plan", table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(configMapName, false)) > 0 {
configMapName = fmt.Sprintf("%s-plan", shortID)
}
}

return configMapName
}

func podNameForPlan(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) string {
shortID, err := getShortIDForTableSpec(table)
if err != nil {
return table.Name
}
podName := fmt.Sprintf("%s-%s-%s-plan", database.Name, table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(podName, false)) > 0 {
podName = fmt.Sprintf("%s-%s-plan", table.Name, shortID)
if len(apimachineryvalidation.NameIsDNSSubdomain(podName, false)) > 0 {
podName = fmt.Sprintf("%s-plan", shortID)
}
}

return podName
}

func getShortIDForTableSpec(table *schemasv1alpha3.Table) (string, error) {
b, err := yaml.Marshal(table.Spec)
if err != nil {
return "", errors.Wrap(err, "failed to marshal yaml spec")
}

sum := sha256.Sum256(b)
return fmt.Sprintf("%x", sum)[:7], nil
}

func getPlanConfigMap(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.ConfigMap, error) {
b, err := yaml.Marshal(table.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal yaml spec")
}

tableData := make(map[string]string)
tableData["table.yaml"] = string(b)

name := tableName
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Name: configMapNameForPlan(database, table),
Namespace: table.Namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -39,7 +79,7 @@ func planConfigMap(namespace string, tableName string, tableSpec schemasv1alpha3
return configMap, nil
}

func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.Pod, error) {
func (r *ReconcileTable) getPlanPod(database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) (*corev1.Pod, error) {
imageName := "schemahero/schemahero:alpha"
nodeSelector := make(map[string]string)
driver := ""
Expand Down Expand Up @@ -78,9 +118,6 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc
labels["schemahero-namespace"] = table.Namespace
labels["schemahero-role"] = "plan"

name := fmt.Sprintf("%s-plan", table.Name)
configMapName := table.Name

args := []string{
"plan",
"--driver",
Expand All @@ -93,7 +130,7 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: podNameForPlan(database, table),
Namespace: database.Namespace,
Labels: labels,
},
Expand Down Expand Up @@ -125,7 +162,7 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMapName,
Name: configMapNameForPlan(database, table),
},
},
},
Expand All @@ -136,37 +173,3 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc

return pod, nil
}

func (r *ReconcileTable) ensureTableConfigMap(ctx context.Context, desiredConfigMap *corev1.ConfigMap) error {
existingConfigMap := corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Name: desiredConfigMap.Name, Namespace: desiredConfigMap.Namespace}, &existingConfigMap); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(ctx, desiredConfigMap)
if err != nil {
return errors.Wrap(err, "failed to create configmap")
}
}

return errors.Wrap(err, "failed to get existing configmap")
}

return nil
}

func (r *ReconcileTable) ensureTablePod(ctx context.Context, desiredPod *corev1.Pod) error {
existingPod := corev1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Name: desiredPod.Name, Namespace: desiredPod.Namespace}, &existingPod); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(ctx, desiredPod)
if err != nil {
return errors.Wrap(err, "failed to create table migration pod")
}

return nil
}

return errors.Wrap(err, "failed to get existing pod object")
}

return nil
}
41 changes: 23 additions & 18 deletions pkg/controller/table/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,50 @@ package table
import (
"testing"

databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

func Test_planConfigMap(t *testing.T) {
tests := []struct {
name string
namespace string
tableName string
tableSpec schemasv1alpha3.TableSpec
expect corev1.ConfigMap
name string
table schemasv1alpha3.Table
database databasesv1alpha3.Database
expect string
}{
{
name: "basic test",
namespace: "foo",
tableName: "name",
tableSpec: schemasv1alpha3.TableSpec{
Database: "db",
Name: "name",
Schema: &schemasv1alpha3.TableSchema{
Postgres: &schemasv1alpha3.SQLTableSchema{},
name: "basic test",
table: schemasv1alpha3.Table{
Spec: schemasv1alpha3.TableSpec{
Database: "db",
Name: "name",
Schema: &schemasv1alpha3.TableSchema{
Postgres: &schemasv1alpha3.SQLTableSchema{},
},
},
},
database: databasesv1alpha3.Database{},
expect: `database: db
name: name
schema:
postgres:
primaryKey: []
columns: []
`,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req := require.New(t)

actual, err := planConfigMap(test.namespace, test.tableName, test.tableSpec)
actual, err := getPlanConfigMap(&test.database, &test.table)
req.NoError(err)

// check some of the fields on the config map
assert.Equal(t, test.tableName, actual.Name)
assert.Len(t, actual.Data, 1)
assert.NotNil(t, actual.Data["table.yaml"], actual.Data["table.yaml"])
assert.Equal(t, actual.Data["table.yaml"], test.expect)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ func (r *ReconcileTable) reconcilePod(ctx context.Context, pod *corev1.Pod) (rec
return reconcile.Result{}, nil
}

if role != "table" && role != "plan" {
// we want to avoid migration pods in this reconciler
return reconcile.Result{}, nil
}

logger.Debug("reconciling schemahero pod",
zap.String("kind", pod.Kind),
zap.String("name", pod.Name),
zap.String("role", role),
zap.String("podPhase", string(pod.Status.Phase)))

if role != "table" && role != "plan" {
// we want to avoid migration pods in this reconciler
return reconcile.Result{}, nil
}

if pod.Status.Phase != corev1.PodSucceeded {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -188,29 +188,6 @@ func (r *ReconcileTable) reconcilePod(ctx context.Context, pod *corev1.Pod) (rec
return reconcile.Result{}, nil
}

func (r *ReconcileTable) plan(ctx context.Context, database *databasesv1alpha3.Database, table *schemasv1alpha3.Table) error {
logger.Debug("deploying plan")

configMap, err := planConfigMap(database.Namespace, table.Name, table.Spec)
if err != nil {
return errors.Wrap(err, "failed to get config map object for plan")
}
pod, err := r.planPod(database, table)
if err != nil {
return errors.Wrap(err, "failed to get pod for plan")
}

if err := r.ensureTableConfigMap(ctx, configMap); err != nil {
return errors.Wrap(err, "failed to create config map for plan")
}

if err := r.ensureTablePod(ctx, pod); err != nil {
return errors.Wrap(err, "failerd to create pod for plan")
}

return nil
}

func (r *ReconcileTable) readConnectionURI(namespace string, valueOrValueFrom databasesv1alpha3.ValueOrValueFrom) (string, error) {
if valueOrValueFrom.Value != "" {
return valueOrValueFrom.Value, nil
Expand Down
File renamed without changes.

0 comments on commit 88c618d

Please sign in to comment.