Skip to content

Commit

Permalink
Refactor kubernetes executors (#472)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

**Which issue(s) this PR fixes**:

Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
-->
```release-note
NONE
```

This PR was merged by Kapetanios.
  • Loading branch information
nghialv committed Jul 17, 2020
1 parent 0365176 commit f2c971c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 77 deletions.
31 changes: 6 additions & 25 deletions pkg/app/piped/executor/kubernetes/baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kubernetes

import (
"context"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -122,33 +121,15 @@ func (e *Executor) removeBaselineResources(ctx context.Context, resources []stri
}

// We delete the service first to close all incoming connections.
for _, k := range serviceKeys {
err := e.provider.Delete(ctx, k)
if err == nil {
e.LogPersister.AppendInfof("Deleted resource %s", k)
continue
}
if errors.Is(err, provider.ErrNotFound) {
e.LogPersister.AppendInfof("No resource %s to delete", k)
continue
}
e.LogPersister.AppendErrorf("Unable to delete resource %s (%v)", k, err)
//return model.StageStatus_STAGE_FAILURE
e.LogPersister.AppendInfo("Starting finding and deleting service resources of BASELINE variant")
if err := e.deleteResources(ctx, serviceKeys); err != nil {
return err
}

// Next, delete all workloads.
for _, k := range workloadKeys {
err := e.provider.Delete(ctx, k)
if err == nil {
e.LogPersister.AppendInfof("Deleted workload resource %s", k)
continue
}
if errors.Is(err, provider.ErrNotFound) {
e.LogPersister.AppendInfof("No worload resource %s to delete", k)
continue
}
e.LogPersister.AppendErrorf("Unable to delete workload resource %s (%v)", k, err)
//return model.StageStatus_STAGE_FAILURE
e.LogPersister.AppendInfo("Starting finding and deleting workload resources of BASELINE variant")
if err := e.deleteResources(ctx, workloadKeys); err != nil {
return err
}

return nil
Expand Down
31 changes: 6 additions & 25 deletions pkg/app/piped/executor/kubernetes/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kubernetes

import (
"context"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -123,33 +122,15 @@ func (e *Executor) removeCanaryResources(ctx context.Context, resources []string
}

// We delete the service first to close all incoming connections.
for _, k := range serviceKeys {
err := e.provider.Delete(ctx, k)
if err == nil {
e.LogPersister.AppendInfof("Deleted resource %s", k)
continue
}
if errors.Is(err, provider.ErrNotFound) {
e.LogPersister.AppendInfof("No resource %s to delete", k)
continue
}
e.LogPersister.AppendErrorf("Unable to delete resource %s (%v)", k, err)
//return model.StageStatus_STAGE_FAILURE
e.LogPersister.AppendInfo("Starting finding and deleting service resources of CANARY variant")
if err := e.deleteResources(ctx, serviceKeys); err != nil {
return err
}

// Next, delete all workloads.
for _, k := range workloadKeys {
err := e.provider.Delete(ctx, k)
if err == nil {
e.LogPersister.AppendInfof("Deleted workload resource %s", k)
continue
}
if errors.Is(err, provider.ErrNotFound) {
e.LogPersister.AppendInfof("No worload resource %s to delete", k)
continue
}
e.LogPersister.AppendErrorf("Unable to delete workload resource %s (%v)", k, err)
//return model.StageStatus_STAGE_FAILURE
e.LogPersister.AppendInfo("Starting finding and deleting workload resources of CANARY variant")
if err := e.deleteResources(ctx, workloadKeys); err != nil {
return err
}

return nil
Expand Down
29 changes: 25 additions & 4 deletions pkg/app/piped/executor/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kubernetes

import (
"context"
"errors"
"fmt"
"path/filepath"

Expand Down Expand Up @@ -200,14 +201,34 @@ func (e *Executor) applyManifests(ctx context.Context, manifests []provider.Mani
}

func (e *Executor) deleteResources(ctx context.Context, resources []provider.ResourceKey) error {
if len(resources) == 0 {
e.LogPersister.AppendInfo("No resources to delete")
return nil
}

e.LogPersister.AppendInfof("Start deleting %d resources", len(resources))
var deletedCount int

for _, k := range resources {
if err := e.provider.Delete(ctx, k); err != nil {
e.LogPersister.AppendErrorf("Failed to delete resource: %s (%v)", k.ReadableString(), err)
return err
err := e.provider.Delete(ctx, k)
if err == nil {
e.LogPersister.AppendSuccessf("- deleted resource: %s", k.ReadableString())
deletedCount++
continue
}
e.LogPersister.AppendSuccessf("- deleted resource: %s", k.ReadableString())
if errors.Is(err, provider.ErrNotFound) {
e.LogPersister.AppendInfof("- no resource %s to delete", k.ReadableString())
deletedCount++
continue
}
e.LogPersister.AppendErrorf("- unable to delete resource: %s (%v)", k.ReadableString(), err)
}

if deletedCount > 0 {
e.LogPersister.AppendInfof("Deleted %d/%d resources", deletedCount, len(resources))
return fmt.Errorf("unable to delete %d resources", len(resources)-deletedCount)
}

e.LogPersister.AppendSuccessf("Successfully deleted %d resources", len(resources))
return nil
}
Expand Down
55 changes: 34 additions & 21 deletions pkg/app/piped/executor/kubernetes/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,8 @@ func (e *Executor) ensureTrafficRouting(ctx context.Context) model.StageStatus {
)
}

// Because the loaded maninests are read-only
// so we duplicate them to avoid updating the shared manifests data in cache.
trafficRoutingManifest := duplicateManifest(trafficRoutingManifests[0], "")

trafficRoutingManifest, err = e.generateTrafficRoutingManifest(
trafficRoutingManifest,
trafficRoutingManifest, err := e.generateTrafficRoutingManifest(
trafficRoutingManifests[0],
primaryPercent,
canaryPercent,
baselinePercent,
Expand Down Expand Up @@ -147,15 +143,16 @@ func (e *Executor) generateTrafficRoutingManifest(manifest provider.Manifest, pr
istioConfig = &config.IstioTrafficRouting{}
}

var err error
if strings.HasPrefix(manifest.Key.APIVersion, "v1alpha3") {
err = generateVirtualServiceManifestV1Alpha3(manifest, istioConfig.Host, istioConfig.EditableRoutes, int32(canaryPercent), int32(baselinePercent))
} else {
err = generateVirtualServiceManifest(manifest, istioConfig.Host, istioConfig.EditableRoutes, int32(canaryPercent), int32(baselinePercent))
return generateVirtualServiceManifestV1Alpha3(manifest, istioConfig.Host, istioConfig.EditableRoutes, int32(canaryPercent), int32(baselinePercent))
}
return manifest, err
return generateVirtualServiceManifest(manifest, istioConfig.Host, istioConfig.EditableRoutes, int32(canaryPercent), int32(baselinePercent))
}

// Because the loaded maninests are read-only
// so we duplicate them to avoid updating the shared manifests data in cache.
manifest = duplicateManifest(manifest, "")

// Determine which variant will receive 100% percent of traffic.
var variant string
switch {
Expand Down Expand Up @@ -212,19 +209,23 @@ func findIstioVirtualServiceManifests(manifests []provider.Manifest, cfg config.
return out, nil
}

func generateVirtualServiceManifest(m provider.Manifest, host string, editableRoutes []string, canaryPercent, baselinePercent int32) error {
func generateVirtualServiceManifest(m provider.Manifest, host string, editableRoutes []string, canaryPercent, baselinePercent int32) (provider.Manifest, error) {
// Because the loaded maninests are read-only
// so we duplicate them to avoid updating the shared manifests data in cache.
m = duplicateManifest(m, "")

spec, err := m.GetSpec()
if err != nil {
return err
return m, err
}

vs := istiov1beta1.VirtualService{}
data, err := json.Marshal(spec)
if err != nil {
return err
return m, err
}
if err := json.Unmarshal(data, &vs); err != nil {
return err
return m, err
}

editableMap := make(map[string]struct{}, len(editableRoutes))
Expand Down Expand Up @@ -284,22 +285,30 @@ func generateVirtualServiceManifest(m provider.Manifest, host string, editableRo
http.Route = routes
}

return m.SetStructuredSpec(vs)
if err := m.SetStructuredSpec(vs); err != nil {
return m, err
}

return m, nil
}

func generateVirtualServiceManifestV1Alpha3(m provider.Manifest, host string, editableRoutes []string, canaryPercent, baselinePercent int32) error {
func generateVirtualServiceManifestV1Alpha3(m provider.Manifest, host string, editableRoutes []string, canaryPercent, baselinePercent int32) (provider.Manifest, error) {
// Because the loaded maninests are read-only
// so we duplicate them to avoid updating the shared manifests data in cache.
m = duplicateManifest(m, "")

spec, err := m.GetSpec()
if err != nil {
return err
return m, err
}

vs := istiov1alpha3.VirtualService{}
data, err := json.Marshal(spec)
if err != nil {
return err
return m, err
}
if err := json.Unmarshal(data, &vs); err != nil {
return err
return m, err
}

editableMap := make(map[string]struct{}, len(editableRoutes))
Expand Down Expand Up @@ -359,5 +368,9 @@ func generateVirtualServiceManifestV1Alpha3(m provider.Manifest, host string, ed
http.Route = routes
}

return m.SetStructuredSpec(vs)
if err := m.SetStructuredSpec(vs); err != nil {
return m, err
}

return m, nil
}
4 changes: 2 additions & 2 deletions pkg/app/piped/executor/kubernetes/traffic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestGenerateVirtualServiceManifest(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(manifests))

err = generateVirtualServiceManifest(manifests[0], "helloworld", tc.editableRoutes, 30, 20)
generatedManifest, err := generateVirtualServiceManifest(manifests[0], "helloworld", tc.editableRoutes, 30, 20)
assert.NoError(t, err)

expectedManifests, err := provider.LoadManifestsFromYAMLFile(tc.expectedFile)
Expand All @@ -57,7 +57,7 @@ func TestGenerateVirtualServiceManifest(t *testing.T) {

expected, err := expectedManifests[0].YamlBytes()
require.NoError(t, err)
got, err := manifests[0].YamlBytes()
got, err := generatedManifest.YamlBytes()
require.NoError(t, err)

assert.EqualValues(t, expected, got)
Expand Down

0 comments on commit f2c971c

Please sign in to comment.