Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Simple Pipeline Controller with tests #75

Merged
merged 5 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/knative/build-pipeline/pkg/logging"

"github.com/knative/build-pipeline/pkg/reconciler"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun"
"github.com/knative/build-pipeline/pkg/system"
sharedclientset "github.com/knative/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -64,7 +65,7 @@ func main() {
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, logging.ControllerLogKey)
defer logger.Sync()

logger.Info("Starting the Pipeline Controller")
logger.Info("Starting the Build Controller")
tejal29 marked this conversation as resolved.
Show resolved Hide resolved

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -113,6 +114,8 @@ func main() {
buildTemplateInformer := knativebuildInformerFactory.Build().V1alpha1().BuildTemplates()
clusterBuildTemplateInformer := knativebuildInformerFactory.Build().V1alpha1().ClusterBuildTemplates()
tejal29 marked this conversation as resolved.
Show resolved Hide resolved

pipelineInformer := pipelineInformerFactory.Pipeline().V1alpha1().Pipelines()
pipelineRunInformer := pipelineInformerFactory.Pipeline().V1alpha1().PipelineRuns()
// Build all of our controllers, with the clients constructed above.
controllers := []*controller.Impl{
// Pipeline Controllers
Expand All @@ -123,6 +126,10 @@ func main() {
buildTemplateInformer,
clusterBuildTemplateInformer,
),
pipelinerun.NewController(opt,
pipelineRunInformer,
pipelineInformer,
),
}

// Watch the logging config map and dynamically update logging levels.
Expand Down
172 changes: 172 additions & 0 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipelinerun

import (
"context"
"fmt"
"reflect"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler"
"github.com/knative/pkg/controller"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"

informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1"
listers "github.com/knative/build-pipeline/pkg/client/listers/pipeline/v1alpha1"
)

const (
// pipelineRunAgentName defines logging agent name for PipelineRun Controller
pipelineRunAgentName = "pipeline-controller"
// pipelineRunControllerName defines name for PipelineRun Controller
pipelineRunControllerName = "PipelineRun"
)

type reconcilerConfig struct {
pipelineRunLister listers.PipelineRunLister
pipelineLister listers.PipelineLister
}

// Reconciler implements controller.Reconciler for Configuration resources.
type Reconciler struct {
*reconciler.Base

// listers index properties about resources
pipelineRunLister listers.PipelineRunLister
pipelineLister listers.PipelineLister
}

// Check that our Reconciler implements controller.Reconciler
var _ controller.Reconciler = (*Reconciler)(nil)

// NewController creates a new Configuration controller
func NewController(
opt reconciler.Options,
pipelineRunInformer informers.PipelineRunInformer,
pipelineInformer informers.PipelineInformer,
) *controller.Impl {

rc := &reconcilerConfig{
pipelineRunLister: pipelineRunInformer.Lister(),
pipelineLister: pipelineInformer.Lister(),
}
return new(opt, pipelineRunInformer, pipelineInformer, rc)
}

func new(opt reconciler.Options,
pipelineRunInformer informers.PipelineRunInformer,
pipelineInformer informers.PipelineInformer,
rc *reconcilerConfig,
) *controller.Impl {

r := &Reconciler{
Base: reconciler.NewBase(opt, pipelineRunAgentName),
pipelineRunLister: rc.pipelineRunLister,
pipelineLister: rc.pipelineLister,
}

impl := controller.NewImpl(r, r.Logger, pipelineRunControllerName)

r.Logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

pipelineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

return impl
}

// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Task Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.Logger.Errorf("invalid resource key: %s", key)
return nil
}

// Get the Pipeline Run resource with this namespace/name
original, err := c.pipelineRunLister.PipelineRuns(namespace).Get(name)
if errors.IsNotFound(err) {
// TODO: create pipelineRun here
// The resource no longer exists, in which case we stop processing.
c.Logger.Errorf("pipeline run %q in work queue no longer exists", key)
return nil
} else if err != nil {
return err
}

// Don't modify the informer's copy.
pr := original.DeepCopy()

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, pr)
if equality.Semantic.DeepEqual(original.Status, pr.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err))
return err
}
return err
}

func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) error {
// fetch the equivelant pipeline for this pipelinerun Run
name := pr.Spec.PipelineRef.Name
if _, err := c.pipelineLister.Pipelines(pr.Namespace).Get(name); err != nil {
c.Logger.Errorf("%q failed to Get Pipeline: %q",
fmt.Sprintf("%s/%s", pr.Namespace, pr.Name),
fmt.Sprintf("%s/%s", pr.Namespace, name))
return nil
}

// TODO fetch the taskruns status for this pipeline run.

// TODO check status of tasks and update status of PipelineRuns

return nil
}

func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
if err != nil {
return nil, err
}
if !reflect.DeepEqual(newPr.Status, pr.Status) {
newPr.Status = pr.Status
return c.PipelineClientSet.PipelineV1alpha1().PipelineRuns(pr.Namespace).Update(newPr)
}
return newPr, nil
}
Loading