Skip to content

Commit

Permalink
Wiring up the migration controller
Browse files Browse the repository at this point in the history
  • Loading branch information
marccampbell committed Mar 21, 2020
1 parent 7096df8 commit d47cccc
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 15 deletions.
55 changes: 44 additions & 11 deletions pkg/controller/migration/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ package migration

import (
"context"
"time"

"github.com/pkg/errors"
databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"github.com/schemahero/schemahero/pkg/logger"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -55,10 +59,29 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for changes to Migration
err = c.Watch(&source.Kind{Type: &schemasv1alpha3.Migration{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return errors.Wrap(err, "failed to start watch on migrations")
}

// Migrations are executed as pods, so we should also watch pod lifecycle
generatedClient := kubernetes.NewForConfigOrDie(mgr.GetConfig())
generatedInformers := kubeinformers.NewSharedInformerFactory(generatedClient, time.Minute)
err = mgr.Add(manager.RunnableFunc(func(s <-chan struct{}) error {
generatedInformers.Start(s)
<-s
return nil
}))
if err != nil {
return err
}

err = c.Watch(&source.Informer{
Informer: generatedInformers.Core().V1().Pods().Informer(),
}, &handler.EnqueueRequestForObject{})
if err != nil {
return errors.Wrap(err, "failed to start watch on pods")
}

return nil
}

Expand All @@ -78,20 +101,30 @@ type ReconcileMigration struct {
// +kubebuilder:rbac:groups=schemas.schemahero.io,resources=migrations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=schemas.schemahero.io,resources=migrations/status,verbs=get;update;patch
func (r *ReconcileMigration) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the Migration instance
instance := &schemasv1alpha3.Migration{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if kuberneteserrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
// This reconcile loop will be called for all Migration objects and all pods
// because of the informer that we have set up
// The behavior here is pretty different depending on the type
// so this function is simply an entrypoint that executes the right reconcile loop
instance, instanceErr := r.getInstance(request)
if instanceErr == nil {
result, err := r.reconcileInstance(instance)
if err != nil {
logger.Error(err)
}
return result, err
}

pod := &corev1.Pod{}
podErr := r.Get(context.Background(), request.NamespacedName, pod)
if podErr == nil {
result, err := r.reconcilePod(pod)
if err != nil {
logger.Error(err)
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
return result, err
}

return reconcile.Result{}, nil
return reconcile.Result{}, errors.New("unknown error in migration reconciler")
}

func (r *ReconcileMigration) readConnectionURI(namespace string, valueOrValueFrom databasesv1alpha3.ValueOrValueFrom) (string, error) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/controller/migration/reconcile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package migration

import (
"context"

schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
"github.com/schemahero/schemahero/pkg/logger"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (r *ReconcileMigration) getInstance(request reconcile.Request) (*schemasv1alpha3.Migration, error) {
v1alpha3instance := &schemasv1alpha3.Migration{}
err := r.Get(context.Background(), request.NamespacedName, v1alpha3instance)
if err != nil {
return nil, err // don't wrap
}

return v1alpha3instance, nil
}

func (r *ReconcileMigration) reconcileInstance(instance *schemasv1alpha3.Migration) (reconcile.Result, error) {
logger.Debug("reconciling migration",
zap.String("kind", instance.Kind),
zap.String("name", instance.Name),
zap.String("tableName", instance.Spec.TableName))

return reconcile.Result{}, nil
}

func (r *ReconcileMigration) reconcilePod(pod *corev1.Pod) (reconcile.Result, error) {
// podLabels := pod.GetObjectMeta().GetLabels()
// role, ok := podLabels["schemahero-role"]
// if !ok {
// return reconcile.Result{}, nil
// }

return reconcile.Result{}, nil
}
4 changes: 2 additions & 2 deletions pkg/controller/table/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (r *ReconcileTable) planPod(database *databasesv1alpha3.Database, table *sc

func (r *ReconcileTable) ensureTableConfigMap(desiredConfigMap *corev1.ConfigMap) error {
existingConfigMap := corev1.ConfigMap{}
if err := r.Get(context.TODO(), types.NamespacedName{Name: desiredConfigMap.Name, Namespace: desiredConfigMap.Namespace}, &existingConfigMap); err != nil {
if err := r.Get(context.Background(), types.NamespacedName{Name: desiredConfigMap.Name, Namespace: desiredConfigMap.Namespace}, &existingConfigMap); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(context.Background(), desiredConfigMap)
if err != nil {
Expand All @@ -155,7 +155,7 @@ func (r *ReconcileTable) ensureTableConfigMap(desiredConfigMap *corev1.ConfigMap

func (r *ReconcileTable) ensureTablePod(desiredPod *corev1.Pod) error {
existingPod := corev1.Pod{}
if err := r.Get(context.TODO(), types.NamespacedName{Name: desiredPod.Name, Namespace: desiredPod.Namespace}, &existingPod); err != nil {
if err := r.Get(context.Background(), types.NamespacedName{Name: desiredPod.Name, Namespace: desiredPod.Namespace}, &existingPod); err != nil {
if kuberneteserrors.IsNotFound(err) {
err = r.Create(context.Background(), desiredPod)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/table/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
databasesv1alpha3 "github.com/schemahero/schemahero/pkg/apis/databases/v1alpha3"
schemasv1alpha3 "github.com/schemahero/schemahero/pkg/apis/schemas/v1alpha3"
databasesclientv1alpha3 "github.com/schemahero/schemahero/pkg/client/schemaheroclientset/typed/databases/v1alpha3"
schemasclientv1alpha3 "github.com/schemahero/schemahero/pkg/client/schemaheroclientset/typed/schemas/v1alpha3"
"github.com/schemahero/schemahero/pkg/logger"
"go.uber.org/zap"
Expand Down Expand Up @@ -122,6 +123,21 @@ func (r *ReconcileTable) reconcilePod(pod *corev1.Pod) (reconcile.Result, error)
},
}

// If the database is set to immediate deploy, then set it as approved also
databasesClient, err := databasesclientv1alpha3.NewForConfig(cfg)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "Failed to create database client")
}

database, err := databasesClient.Databases(tableNamespace).Get(table.Spec.Database, metav1.GetOptions{})
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to get database")
}

if database.Spec.ImmediateDeploy {
migration.Status.ApprovedAt = time.Now().Unix()
}

if err := r.Create(context.Background(), &migration); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create migration resource")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/table/table_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Watch for changes to Table
err = c.Watch(&source.Kind{Type: &schemasv1alpha3.Table{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
return errors.Wrap(err, "failed to start watch on tables")
}

// Add an informer on pods, which are created to deploy schemas. the informer will
Expand All @@ -79,7 +79,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
Informer: generatedInformers.Core().V1().Pods().Informer(),
}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
return errors.Wrap(err, "failed to start watch on pods")
}

return nil
Expand Down

0 comments on commit d47cccc

Please sign in to comment.