Skip to content

Commit

Permalink
meets min to use knative to get k8s client
Browse files Browse the repository at this point in the history
rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Feb 21, 2024
1 parent 1ddc253 commit 83dfded
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
26 changes: 10 additions & 16 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/fatih/color"
"github.com/jonboulle/clockwork"
"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
Expand Down Expand Up @@ -55,6 +54,7 @@ var (
// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
kubernetesClientset kubernetes.Interface
resultsClient *results.Client
objectClient ObjectClient
cfg *reconciler.Config
Expand All @@ -80,11 +80,12 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool,
type AfterDeletion func(ctx context.Context, object results.Object) error

// NewDynamicReconciler creates a new dynamic Reconciler.
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, kc kubernetes.Interface, cfg *reconciler.Config) *Reconciler {
return &Reconciler{
resultsClient: results.NewClient(rc, lc),
objectClient: oc,
cfg: cfg,
kubernetesClientset: kc,
resultsClient: results.NewClient(rc, lc),
objectClient: oc,
cfg: cfg,
// Always true predicate.
IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) {
return true, nil
Expand Down Expand Up @@ -357,11 +358,11 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
return nil
}

func getPodLogs(ctx context.Context, client kubernetes.Interface, ns, pod, container string) ([]byte, error) {
func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container string) ([]byte, error) {
podLogOpts := corev1.PodLogOptions{
Container: container,
}
req := client.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts)
req := r.kubernetesClientset.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts)
podLogs, err := req.Stream(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -392,18 +393,11 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,

inMemWriteBufferStdout := bytes.NewBuffer(make([]byte, 0))

tknParams := &cli.TektonParams{}
tknParams.SetNamespace(o.GetNamespace())
k8sClient, err := tknParams.KubeClient()
if err != nil {
return err
}

lo := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", labelKey, o.GetName()),
}
var pods *corev1.PodList
pods, err = k8sClient.CoreV1().Pods(o.GetNamespace()).List(ctx, lo)
pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(ctx, lo)
if err != nil {
return err
}
Expand All @@ -414,7 +408,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,
copy(containers, pod.Spec.InitContainers)
containers = append(containers, pod.Spec.Containers...)
for _, container := range containers {
ba, podLogsErr := getPodLogs(ctx, k8sClient, o.GetNamespace(), pod.Name, container.Name)
ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name)
if podLogsErr != nil {
return podLogsErr
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -140,7 +141,8 @@ func TestReconcile_TaskRun(t *testing.T) {
RequeueInterval: 1 * time.Second,
}

r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg)
clientset := fakekubeclientset.NewSimpleClientset()
r := NewDynamicReconciler(resultsClient, logsClient, trclient, clientset, cfg)
if err := r.Reconcile(ctx, taskrun); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -429,7 +431,8 @@ func TestReconcile_PipelineRun(t *testing.T) {
t.Fatal(err)
}

r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil)
clientset := fakekubeclientset.NewSimpleClientset()
r := NewDynamicReconciler(resultsClient, logsClient, prclient, clientset, nil)
if err := r.Reconcile(ctx, pipelinerun); err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -42,12 +43,14 @@ func NewController(ctx context.Context, resultsClient pb.ResultsClient, cmw conf
func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient, cfg *reconciler.Config, cmw configmap.Watcher) *controller.Impl {
pipelineRunInformer := pipelineruninformer.Get(ctx)
pipelineRunLister := pipelineRunInformer.Lister()
kubeclientset := kubeclient.Get(ctx)
logger := logging.FromContext(ctx)
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger))
configStore.WatchConfigs(cmw)

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List),
kubeClientSet: kubeclientset,
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
pipelineRunLister: pipelineRunLister,
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/reconciler/pipelinerun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -43,6 +44,7 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

kubeClientSet kubernetes.Interface
resultsClient pb.ResultsClient
logsClient pb.LogsClient
pipelineRunLister pipelinev1beta1listers.PipelineRunLister
Expand Down Expand Up @@ -86,7 +88,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.kubeClientSet, r.cfg)
// Tell the dynamic reconciler to wait until all underlying TaskRuns are
// ready for deletion before deleting the PipelineRun. This guarantees
// that the TaskRuns will not be deleted before their final state being
Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -41,12 +42,14 @@ func NewController(ctx context.Context, resultsClient pb.ResultsClient, cmw conf
func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient, cfg *reconciler.Config, cmw configmap.Watcher) *controller.Impl {
informer := taskruninformer.Get(ctx)
lister := informer.Lister()
kubeclientset := kubeclient.Get(ctx)
logger := logging.FromContext(ctx)
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger))
configStore.WatchConfigs(cmw)

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List),
kubeClientset: kubeclientset,
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
lister: lister,
Expand Down
4 changes: 3 additions & 1 deletion pkg/watcher/reconciler/taskrun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/logging"
)
Expand All @@ -29,6 +30,7 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

kubeClientset kubernetes.Interface
resultsClient pb.ResultsClient
logsClient pb.LogsClient
lister v1beta1.TaskRunLister
Expand Down Expand Up @@ -71,7 +73,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
TaskRunInterface: r.pipelineClient.TektonV1beta1().TaskRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.kubeClientset, r.cfg)
dyn.AfterDeletion = func(ctx context.Context, o results.Object) error {
tr := o.(*pipelinev1beta1.TaskRun)
return r.metrics.DurationAndCountDeleted(ctx, r.configStore.Load().Metrics, tr)
Expand Down

0 comments on commit 83dfded

Please sign in to comment.