Skip to content

Implement K8S_SYNC without prune feature by using plugin SDK#5624

Merged
ffjlabo merged 14 commits intomasterfrom
implement-k8s-sync-with-sdk
Mar 6, 2025
Merged

Implement K8S_SYNC without prune feature by using plugin SDK#5624
ffjlabo merged 14 commits intomasterfrom
implement-k8s-sync-with-sdk

Conversation

@ffjlabo
Copy link
Member

@ffjlabo ffjlabo commented Mar 5, 2025

What this PR does:

I implemented k8s sync without prune feature by using SDK.

The fix summary

  • deployment/plugin.go: Implement deployment.Plugin, similar to the deployment.DeploymentService to keep the current implementation working until finishing the re-implementation.
  • deployment/plugin_test.go: Implement two tests for the quick sync features without pruning.

Why we need it:

We want to implement plugins with SDK.

Which issue(s) this PR fixes:

Part of #4980 #5006

Does this PR introduce a user-facing change?:

  • How are users affected by this change:
  • Is this breaking change:
  • How to migrate (if breaking change):

ffjlabo added 12 commits March 6, 2025 11:52
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
@ffjlabo ffjlabo force-pushed the implement-k8s-sync-with-sdk branch from d6287cb to 24a5807 Compare March 6, 2025 03:34
@codecov
Copy link

codecov bot commented Mar 6, 2025

Codecov Report

Attention: Patch coverage is 44.52055% with 81 lines in your changes missing coverage. Please review.

Project coverage is 26.64%. Comparing base (0137126) to head (4376cac).
Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...app/pipedv1/plugin/kubernetes/deployment/plugin.go 47.32% 61 Missing and 8 partials ⚠️
pkg/plugin/sdk/client.go 0.00% 9 Missing ⚠️
.../app/pipedv1/plugin/kubernetes/deployment/apply.go 60.00% 2 Missing ⚠️
pkg/app/pipedv1/plugin/kubernetes/main.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5624      +/-   ##
==========================================
+ Coverage   26.57%   26.64%   +0.07%     
==========================================
  Files         477      478       +1     
  Lines       50644    50780     +136     
==========================================
+ Hits        13458    13530      +72     
- Misses      36128    36184      +56     
- Partials     1058     1066       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ffjlabo ffjlabo changed the title Implement k8s sync with sdk Implement k8s sync with plugin SDK Mar 6, 2025
}

// FIXME
func (p *Plugin) executeK8sSyncStage(ctx context.Context, input *sdk.ExecuteStageInput, dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus {
Copy link
Member Author

@ffjlabo ffjlabo Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference implementation:

func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersister.StageLogPersister, input *deployment.ExecutePluginInput) model.StageStatus {
lp.Infof("Start syncing the deployment")
cfg, err := config.DecodeYAML[*kubeconfig.KubernetesApplicationSpec](input.GetTargetDeploymentSource().GetApplicationConfig())
if err != nil {
lp.Errorf("Failed while decoding application config (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
lp.Infof("Loading manifests at commit %s for handling", input.GetDeployment().GetTrigger().GetCommit().GetHash())
manifests, err := a.loadManifests(ctx, input.GetDeployment(), cfg.Spec, input.GetTargetDeploymentSource())
if err != nil {
lp.Errorf("Failed while loading manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
lp.Successf("Successfully loaded %d manifests", len(manifests))
// Because the loaded manifests are read-only
// we duplicate them to avoid updating the shared manifests data in cache.
// TODO: implement duplicateManifests function
// When addVariantLabelToSelector is true, ensure that all workloads
// have the variant label in their selector.
var (
variantLabel = cfg.Spec.VariantLabel.Key
primaryVariant = cfg.Spec.VariantLabel.PrimaryValue
)
// TODO: treat the stage options specified under "with"
if cfg.Spec.QuickSync.AddVariantLabelToSelector {
workloads := findWorkloadManifests(manifests, cfg.Spec.Workloads)
for _, m := range workloads {
if err := ensureVariantSelectorInWorkload(m, variantLabel, primaryVariant); err != nil {
lp.Errorf("Unable to check/set %q in selector of workload %s (%v)", variantLabel+": "+primaryVariant, m.Key().ReadableString(), err)
return model.StageStatus_STAGE_FAILURE
}
}
}
// Add variant annotations to all manifests.
for i := range manifests {
manifests[i].AddLabels(map[string]string{
variantLabel: primaryVariant,
})
manifests[i].AddAnnotations(map[string]string{
variantLabel: primaryVariant,
})
}
if err := annotateConfigHash(manifests); err != nil {
lp.Errorf("Unable to set %q annotation into the workload manifest (%v)", provider.AnnotationConfigHash, err)
return model.StageStatus_STAGE_FAILURE
}
// Get the deploy target config.
targets := input.GetDeployment().GetDeployTargets(a.pluginConfig.Name)
if len(targets) == 0 {
lp.Errorf("No deploy target was found for the plugin %s", a.pluginConfig.Name)
return model.StageStatus_STAGE_FAILURE
}
deployTargetConfig, err := kubeconfig.FindDeployTarget(a.pluginConfig, targets[0]) // TODO: consider multiple targets
if err != nil {
lp.Errorf("Failed while unmarshalling deploy target config (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
// Get the kubectl tool path.
kubectlPath, err := a.toolRegistry.Kubectl(ctx, cmp.Or(cfg.Spec.Input.KubectlVersion, deployTargetConfig.KubectlVersion, defaultKubectlVersion))
if err != nil {
lp.Errorf("Failed while getting kubectl tool (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
// Create the kubectl wrapper for the target cluster.
kubectl := provider.NewKubectl(kubectlPath)
// Create the applier for the target cluster.
applier := provider.NewApplier(kubectl, cfg.Spec.Input, deployTargetConfig, a.logger)
// Start applying all manifests to add or update running resources.
if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil {
lp.Errorf("Failed while applying manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
// TODO: treat the stage options specified under "with"
if !cfg.Spec.QuickSync.Prune {
lp.Info("Resource GC was skipped because sync.prune was not configured")
return model.StageStatus_STAGE_SUCCESS
}
// Wait for all applied manifests to be stable.
// In theory, we don't need to wait for them to be stable before going to the next step
// but waiting for a while reduces the number of Kubernetes changes in a short time.
lp.Info("Waiting for the applied manifests to be stable")
select {
case <-time.After(15 * time.Second):
break
case <-ctx.Done():
break
}
lp.Info("Start finding all running resources but no longer defined in Git")
namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, deployTargetConfig.KubeConfigPath, input.GetDeployment().GetApplicationId())
if err != nil {
lp.Errorf("Failed while getting live resources (%v)", err)
return model.StageStatus_STAGE_FAILURE
}
if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 {
lp.Info("There is no data about live resource so no resource will be removed")
return model.StageStatus_STAGE_SUCCESS
}
lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources))
removeKeys := provider.FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources)
if len(removeKeys) == 0 {
lp.Info("There are no live resources should be removed")
return model.StageStatus_STAGE_SUCCESS
}
lp.Infof("Start pruning %d resources", len(removeKeys))
var deletedCount int
for _, key := range removeKeys {
if err := kubectl.Delete(ctx, deployTargetConfig.KubeConfigPath, key.Namespace(), key); err != nil {
if errors.Is(err, provider.ErrNotFound) {
lp.Infof("Specified resource does not exist, so skip deleting the resource: %s (%v)", key.ReadableString(), err)
continue
}
lp.Errorf("Failed while deleting resource %s (%v)", key.ReadableString(), err)
continue // continue to delete other resources
}
deletedCount++
lp.Successf("- deleted resource: %s", key.ReadableString())
}
lp.Successf("Successfully deleted %d resources", deletedCount)
return model.StageStatus_STAGE_SUCCESS
}

return sdk.StageStatusSuccess
}

func (p *Plugin) loadManifests(ctx context.Context, deploy *sdk.Deployment, spec *kubeconfig.KubernetesApplicationSpec, deploymentSource *sdk.DeploymentSource, loader loader) ([]provider.Manifest, error) {
Copy link
Member Author

@ffjlabo ffjlabo Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference implementation:

func (a *DeploymentService) loadManifests(ctx context.Context, deploy *model.Deployment, spec *kubeconfig.KubernetesApplicationSpec, deploymentSource *common.DeploymentSource) ([]provider.Manifest, error) {
manifests, err := a.loader.LoadManifests(ctx, provider.LoaderInput{
PipedID: deploy.GetPipedId(),
AppID: deploy.GetApplicationId(),
CommitHash: deploymentSource.GetCommitHash(),
AppName: deploy.GetApplicationName(),
AppDir: deploymentSource.GetApplicationDirectory(),
ConfigFilename: deploymentSource.GetApplicationConfigFilename(),
Manifests: spec.Input.Manifests,
Namespace: spec.Input.Namespace,
TemplatingMethod: provider.TemplatingMethodNone, // TODO: Implement detection of templating method or add it to the config spec.
// TODO: Define other fields for LoaderInput
})
if err != nil {
return nil, err
}
return manifests, nil
}

return deployTargetCfg, dynamicClient
}

func TestPlugin_executeK8sSyncStage(t *testing.T) {
Copy link
Member Author

@ffjlabo ffjlabo Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference implementation:

func TestDeploymentService_executeK8sSyncStage(t *testing.T) {
t.Parallel()
ctx := context.Background()
// read the application config from the example file
cfg, err := os.ReadFile(filepath.Join(examplesDir(), "kubernetes", "simple", "app.pipecd.yaml"))
require.NoError(t, err)
// prepare the request
req := &deployment.ExecuteStageRequest{
Input: &deployment.ExecutePluginInput{
Deployment: &model.Deployment{
PipedId: "piped-id",
ApplicationId: "app-id",
DeployTargetsByPlugin: map[string]*model.DeployTargets{
"kubernetes": {
DeployTargets: []string{"default"},
},
},
},
Stage: &model.PipelineStage{
Id: "stage-id",
Name: "K8S_SYNC",
},
StageConfig: []byte(``),
RunningDeploymentSource: nil,
TargetDeploymentSource: &common.DeploymentSource{
ApplicationDirectory: filepath.Join(examplesDir(), "kubernetes", "simple"),
CommitHash: "0123456789",
ApplicationConfig: cfg,
ApplicationConfigFilename: "app.pipecd.yaml",
},
},
}
// initialize tool registry
testRegistry := toolregistrytest.NewTestToolRegistry(t)
// initialize plugin config and dynamic client for assertions with envtest
pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t)
svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t))
resp, err := svc.ExecuteStage(ctx, req)
require.NoError(t, err)
assert.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String())
deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, "simple", deployment.GetName())
assert.Equal(t, "simple", deployment.GetLabels()["app"])
assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"])
assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"])
assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"])
assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"])
assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"])
assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"])
assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"])
assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"])
assert.Equal(t, "apps:Deployment::simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior.
assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"])
}

assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"])
}

func TestPlugin_executeK8sSyncStage_withInputNamespace(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference implementation:

func TestDeploymentService_executeK8sSyncStage_withInputNamespace(t *testing.T) {
t.Parallel()
ctx := context.Background()
// read the application config from the example file
cfg, err := os.ReadFile(filepath.Join(examplesDir(), "kubernetes", "simple", "app.pipecd.yaml"))
require.NoError(t, err)
// decode and override the autoCreateNamespace and namespace
spec, err := config.DecodeYAML[*kubeConfigPkg.KubernetesApplicationSpec](cfg)
require.NoError(t, err)
spec.Spec.Input.AutoCreateNamespace = true
spec.Spec.Input.Namespace = "test-namespace"
cfg, err = yaml.Marshal(spec)
require.NoError(t, err)
// prepare the request
req := &deployment.ExecuteStageRequest{
Input: &deployment.ExecutePluginInput{
Deployment: &model.Deployment{
PipedId: "piped-id",
ApplicationId: "app-id",
DeployTargetsByPlugin: map[string]*model.DeployTargets{
"kubernetes": {
DeployTargets: []string{"default"},
},
},
},
Stage: &model.PipelineStage{
Id: "stage-id",
Name: "K8S_SYNC",
},
StageConfig: []byte(``),
RunningDeploymentSource: nil,
TargetDeploymentSource: &common.DeploymentSource{
ApplicationDirectory: filepath.Join(examplesDir(), "kubernetes", "simple"),
CommitHash: "0123456789",
ApplicationConfig: cfg,
ApplicationConfigFilename: "app.pipecd.yaml",
},
},
}
// initialize tool registry
testRegistry := toolregistrytest.NewTestToolRegistry(t)
// initialize plugin config and dynamic client for assertions with envtest
pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t)
svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t))
resp, err := svc.ExecuteStage(ctx, req)
require.NoError(t, err)
assert.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String())
deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("test-namespace").Get(context.Background(), "simple", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"])
assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"])
assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"])
assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"])
assert.Equal(t, "simple", deployment.GetName())
assert.Equal(t, "simple", deployment.GetLabels()["app"])
assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"])
assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"])
assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"])
assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"])
assert.Equal(t, "apps:Deployment:test-namespace:simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior.
assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"])
}

Comment on lines -45 to -66
type toolRegistry interface {
Kubectl(ctx context.Context, version string) (string, error)
Kustomize(ctx context.Context, version string) (string, error)
Helm(ctx context.Context, version string) (string, error)
}

type loader interface {
// LoadManifests renders and loads all manifests for application.
LoadManifests(ctx context.Context, input provider.LoaderInput) ([]provider.Manifest, error)
}

type applier interface {
// ApplyManifest does applying the given manifest.
ApplyManifest(ctx context.Context, manifest provider.Manifest) error
// CreateManifest does creating resource from given manifest.
CreateManifest(ctx context.Context, manifest provider.Manifest) error
// ReplaceManifest does replacing resource from given manifest.
ReplaceManifest(ctx context.Context, manifest provider.Manifest) error
// ForceReplaceManifest does force replacing resource from given manifest.
ForceReplaceManifest(ctx context.Context, manifest provider.Manifest) error
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved them to plugin.go to remove server.go at the end.

Comment on lines +52 to +60
func NewClient(base *pipedapi.PipedServiceClient, pluginName, applicationID, stageID string, lp StageLogPersister, tr *toolregistry.ToolRegistry) *Client {
return &Client{
base: base,
pluginName: pluginName,
applicationID: applicationID,
stageID: stageID,
logPersister: lp,
toolRegistry: tr,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented it to use sdk.Client in the test.
It might not be the best way but I want to prior to implement quick sync logic for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Currently, this is used to create the Client in the test for the plugin made by the SDK.

Let's emphasize this comment since the sdk package is visible to plugin developers.

for example:

// DO NOT USE this function except in tests.
// FIXME: ...... (Remove this function and make a better way for tests??)
// NewClient creates a new client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@t-kikuc
Thanks, sounds nice! fixed on
4376cac

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
@ffjlabo ffjlabo marked this pull request as ready for review March 6, 2025 07:01
@ffjlabo ffjlabo changed the title Implement k8s sync with plugin SDK Implement K8S_SYNC without prune feature by using plugin SDK Mar 6, 2025
Copy link
Member

@t-kikuc t-kikuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost LGTM

Comment on lines +52 to +60
func NewClient(base *pipedapi.PipedServiceClient, pluginName, applicationID, stageID string, lp StageLogPersister, tr *toolregistry.ToolRegistry) *Client {
return &Client{
base: base,
pluginName: pluginName,
applicationID: applicationID,
stageID: stageID,
logPersister: lp,
toolRegistry: tr,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Currently, this is used to create the Client in the test for the plugin made by the SDK.

Let's emphasize this comment since the sdk package is visible to plugin developers.

for example:

// DO NOT USE this function except in tests.
// FIXME: ...... (Remove this function and make a better way for tests??)
// NewClient creates a new client.

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
@ffjlabo ffjlabo requested a review from t-kikuc March 6, 2025 08:00
@ffjlabo ffjlabo merged commit 691a510 into master Mar 6, 2025
18 checks passed
@ffjlabo ffjlabo deleted the implement-k8s-sync-with-sdk branch March 6, 2025 08:24
@github-actions github-actions bot mentioned this pull request Mar 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants