-
Notifications
You must be signed in to change notification settings - Fork 262
/
init_data.go
182 lines (156 loc) · 5.71 KB
/
init_data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package initialisation
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/telemetry"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_client"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/export"
"github.com/turbot/steampipe/pkg/modinstaller"
"github.com/turbot/steampipe/pkg/plugin"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/versionmap"
"github.com/turbot/steampipe/pkg/workspace"
)
type InitData struct {
Workspace *workspace.Workspace
Client db_common.Client
Result *db_common.InitResult
ShutdownTelemetry func()
ExportManager *export.Manager
}
func NewErrorInitData(err error) *InitData {
return &InitData{
Result: &db_common.InitResult{Error: err},
}
}
func NewInitData() *InitData {
i := &InitData{
Result: &db_common.InitResult{},
ExportManager: export.NewManager(),
}
return i
}
func (i *InitData) RegisterExporters(exporters ...export.Exporter) *InitData {
for _, e := range exporters {
i.ExportManager.Register(e)
}
return i
}
func (i *InitData) Init(ctx context.Context, invoker constants.Invoker) {
defer func() {
if r := recover(); r != nil {
i.Result.Error = helpers.ToError(r)
}
// if there is no error, return context cancellation error (if any)
if i.Result.Error == nil {
i.Result.Error = ctx.Err()
}
}()
statushooks.SetStatus(ctx, "Initializing")
// initialise telemetry
shutdownTelemetry, err := telemetry.Init(constants.AppName)
if err != nil {
i.Result.AddWarnings(err.Error())
} else {
i.ShutdownTelemetry = shutdownTelemetry
}
// install mod dependencies if needed
if viper.GetBool(constants.ArgModInstall) {
statushooks.SetStatus(ctx, "Installing workspace dependencies")
opts := modinstaller.NewInstallOpts(i.Workspace.Mod)
// use force install so that errors are ignored during installation
// (we are validating prereqs later)
opts.Force = true
_, err := modinstaller.InstallWorkspaceDependencies(ctx, opts)
if err != nil {
i.Result.Error = err
return
}
}
// retrieve cloud metadata
cloudMetadata, err := getCloudMetadata(ctx)
if err != nil {
i.Result.Error = err
return
}
// set cloud metadata (may be nil)
i.Workspace.CloudMetadata = cloudMetadata
statushooks.SetStatus(ctx, "Checking for required plugins")
pluginsInstalled, err := plugin.GetInstalledPlugins()
if err != nil {
i.Result.Error = err
return
}
//validate steampipe version
validationWarnings := validateModRequirementsRecursively(i.Workspace.Mod, pluginsInstalled)
i.Result.AddWarnings(validationWarnings...)
// if introspection tables are enabled, setup the session data callback
var ensureSessionData db_client.DbConnectionCallback
if viper.GetString(constants.ArgIntrospection) != constants.IntrospectionNone {
ensureSessionData = func(ctx context.Context, conn *pgx.Conn) error {
return workspace.EnsureSessionData(ctx, i.Workspace.GetResourceMaps(), conn)
}
}
// get a client
// add a message rendering function to the context - this is used for the fdw update message and
// allows us to render it as a standard initialisation message
getClientCtx := statushooks.AddMessageRendererToContext(ctx, func(format string, a ...any) {
i.Result.AddMessage(fmt.Sprintf(format, a...))
})
statushooks.SetStatus(ctx, "Connecting to steampipe")
client, errorsAndWarnings := GetDbClient(getClientCtx, invoker, ensureSessionData)
if errorsAndWarnings.Error != nil {
i.Result.Error = errorsAndWarnings.Error
return
}
i.Result.AddWarnings(errorsAndWarnings.Warnings...)
i.Client = client
}
func validateModRequirementsRecursively(mod *modconfig.Mod, pluginVersionMap versionmap.VersionMap) []string {
validationErrors := []string{}
// validate this mod
for _, err := range mod.ValidateRequirements(pluginVersionMap) {
validationErrors = append(validationErrors, err.Error())
}
// validate dependent mods
for childDependencyName, childMod := range mod.ResourceMaps.Mods {
// TODO : The 'mod.DependencyName == childMod.DependencyName' check has to be done because
// of a bug in the resource loading code which also puts the mod itself into the resource map
// [https://github.com/turbot/steampipe/issues/3341]
if childDependencyName == "local" || mod.DependencyName == childMod.DependencyName {
// this is a reference to self - skip (otherwise we will end up with a recursion loop)
continue
}
childValidationErrors := validateModRequirementsRecursively(childMod, pluginVersionMap)
validationErrors = append(validationErrors, childValidationErrors...)
}
return validationErrors
}
// GetDbClient either creates a DB client using the configured connection string (if present) or creates a LocalDbClient
func GetDbClient(ctx context.Context, invoker constants.Invoker, onConnectionCallback db_client.DbConnectionCallback) (db_common.Client, *modconfig.ErrorAndWarnings) {
if connectionString := viper.GetString(constants.ArgConnectionString); connectionString != "" {
statushooks.SetStatus(ctx, "Connecting to remote Steampipe database")
client, err := db_client.NewDbClient(ctx, connectionString, onConnectionCallback)
return client, modconfig.NewErrorsAndWarning(err)
}
statushooks.SetStatus(ctx, "Starting local Steampipe database")
return db_local.GetLocalClient(ctx, invoker, onConnectionCallback)
}
func (i *InitData) Cleanup(ctx context.Context) {
if i.Client != nil {
i.Client.Close(ctx)
}
if i.ShutdownTelemetry != nil {
i.ShutdownTelemetry()
}
if i.Workspace != nil {
i.Workspace.Close()
}
}