-
Notifications
You must be signed in to change notification settings - Fork 9
/
migration.go
120 lines (100 loc) · 3.65 KB
/
migration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright 2022 Namespace Labs Inc; All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package orchestration
import (
"context"
"fmt"
"os"
"time"
"github.com/slack-go/slack"
"namespacelabs.dev/foundation/internal/console"
"namespacelabs.dev/foundation/internal/fnerrors"
"namespacelabs.dev/foundation/internal/planning/deploy"
"namespacelabs.dev/foundation/internal/runtime"
"namespacelabs.dev/foundation/orchestration/client"
"namespacelabs.dev/foundation/schema"
orchpb "namespacelabs.dev/foundation/schema/orchestration"
"namespacelabs.dev/foundation/std/cfg"
"namespacelabs.dev/foundation/std/execution"
"namespacelabs.dev/foundation/std/tasks"
)
// Bumping this value leads to an orchestrator upgrade.
const orchestratorVersion = 25
var DeployWithOrchestrator = false
var DeployUpdateSlackChannel, SlackToken string
func ExecuteOpts() execution.ExecuteOpts {
return execution.ExecuteOpts{
ContinueOnErrors: false,
OrchestratorVersion: orchestratorVersion,
}
}
func Deploy(ctx context.Context, env cfg.Context, cluster runtime.ClusterNamespace, plan *schema.DeployPlan, wait, outputProgress bool) error {
if !DeployWithOrchestrator {
if !wait {
return fnerrors.BadInputError("waiting is mandatory without the orchestrator")
}
observeError := func(context.Context, error) {}
if DeployUpdateSlackChannel != "" {
if SlackToken == "" {
return fnerrors.BadInputError("a slack token is required to be able to update a channel")
}
start := time.Now()
slackcli := slack.New(os.ExpandEnv(SlackToken))
chid, ts, err := slackcli.PostMessageContext(ctx, os.ExpandEnv(DeployUpdateSlackChannel), slack.MsgOptionBlocks(renderSlackMessage(plan, start, time.Time{}, nil)...))
if err != nil {
fmt.Fprintf(console.Warnings(ctx), "Failed to post to Slack: %v\n", err)
} else {
observeError = func(ctx context.Context, err error) {
if _, _, _, err := slackcli.UpdateMessageContext(ctx, chid, ts, slack.MsgOptionBlocks(renderSlackMessage(plan, start, time.Now(), err)...)); err != nil {
fmt.Fprintf(console.Warnings(ctx), "Failed to update Slack: %v\n", err)
}
}
}
}
p := execution.NewPlan(plan.Program.Invocation...)
// Make sure that the cluster is accessible to a serialized invocation implementation.
execErr := execution.ExecuteExt(ctx, "deployment.execute", p,
deploy.MaybeRenderBlock(env, cluster, outputProgress),
ExecuteOpts(),
execution.FromContext(env),
runtime.InjectCluster(cluster))
observeError(ctx, execErr)
return execErr
}
return tasks.Action("orchestrator.deploy").Scope(schema.PackageNames(plan.FocusServer...)...).
Run(ctx, func(ctx context.Context) error {
debug := console.Debug(ctx)
fmt.Fprintf(debug, "deploying program:\n")
for k, inv := range plan.GetProgram().GetInvocation() {
fmt.Fprintf(debug, " #%d %q --> cats:%v after:%v\n", k, inv.Description,
inv.GetOrder().GetSchedCategory(),
inv.GetOrder().GetSchedAfterCategory())
}
conn, err := client.ConnectToOrchestrator(ctx, cluster.Cluster())
if err != nil {
return err
}
defer conn.Close()
id, err := client.CallDeploy(ctx, env, conn, plan)
if err != nil {
return err
}
if wait {
var ch chan *orchpb.Event
var cleanup func(ctx context.Context) error
if outputProgress {
ch, cleanup = deploy.MaybeRenderBlock(env, cluster, true)(ctx)
}
err := client.WireDeploymentStatus(ctx, conn, id, ch)
if cleanup != nil {
cleanupErr := cleanup(ctx)
if err == nil {
return cleanupErr
}
}
return err
}
return nil
})
}