-
Notifications
You must be signed in to change notification settings - Fork 262
/
connection_state_table_updater.go
137 lines (118 loc) · 5.11 KB
/
connection_state_table_updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package connection
import (
"context"
"log"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/introspection"
"github.com/turbot/steampipe/pkg/steampipeconfig"
)
type connectionStateTableUpdater struct {
updates *steampipeconfig.ConnectionUpdates
pool *pgxpool.Pool
}
func newConnectionStateTableUpdater(updates *steampipeconfig.ConnectionUpdates, pool *pgxpool.Pool) *connectionStateTableUpdater {
log.Println("[DEBUG] newConnectionStateTableUpdater start")
defer log.Println("[DEBUG] newConnectionStateTableUpdater end")
return &connectionStateTableUpdater{
updates: updates,
pool: pool,
}
}
// update connection state table to indicate the updates that will be done
func (u *connectionStateTableUpdater) start(ctx context.Context) error {
log.Println("[DEBUG] connectionStateTableUpdater.start start")
defer log.Println("[DEBUG] connectionStateTableUpdater.start end")
var queries []db_common.QueryWithArgs
// update the conection state table to set appropriate state for all connections
// set updates to "updating"
for name, connectionState := range u.updates.FinalConnectionState {
// set the connection data state based on whether this connection is being created or deleted
if _, updatingConnection := u.updates.Update[name]; updatingConnection {
connectionState.State = constants.ConnectionStateUpdating
connectionState.CommentsSet = false
} else if validationError, connectionIsInvalid := u.updates.InvalidConnections[name]; connectionIsInvalid {
// if this connection has an error, set to error
connectionState.State = constants.ConnectionStateError
connectionState.ConnectionError = &validationError.Message
}
// get the sql to update the connection state in the table to match the struct
queries = append(queries, introspection.GetUpsertConnectionStateSql(connectionState)...)
}
// set deletions to "deleting"
for name := range u.updates.Delete {
// if we are we deleting the schema because schema_import="disabled", DO NOT set state to deleting -
// it will be set to "disabled below
if _, connectionDisabled := u.updates.Disabled[name]; connectionDisabled {
continue
}
queries = append(queries, introspection.GetSetConnectionStateSql(name, constants.ConnectionStateDeleting)...)
}
// set any connections with import_schema=disabled to "disabled"
// also build a lookup of disabled connections
for name := range u.updates.Disabled {
queries = append(queries, introspection.GetSetConnectionStateSql(name, constants.ConnectionStateDisabled)...)
}
conn, err := u.pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
if _, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...); err != nil {
return err
}
return nil
}
func (u *connectionStateTableUpdater) onConnectionReady(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionReady start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionReady end")
connection := u.updates.FinalConnectionState[name]
queries := introspection.GetSetConnectionStateSql(connection.ConnectionName, constants.ConnectionStateReady)
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
func (u *connectionStateTableUpdater) onConnectionCommentsLoaded(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionCommentsLoaded start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionCommentsLoaded end")
connection := u.updates.FinalConnectionState[name]
queries := introspection.GetSetConnectionStateCommentLoadedSql(connection.ConnectionName, true)
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
func (u *connectionStateTableUpdater) onConnectionDeleted(ctx context.Context, conn *pgx.Conn, name string) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionDeleted start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionDeleted end")
// if this connection has schema import disabled, DO NOT delete from the conneciotn state table
if _, connectionDisabled := u.updates.Disabled[name]; connectionDisabled {
return nil
}
queries := introspection.GetDeleteConnectionStateSql(name)
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}
func (u *connectionStateTableUpdater) onConnectionError(ctx context.Context, conn *pgx.Conn, connectionName string, err error) error {
log.Println("[DEBUG] connectionStateTableUpdater.onConnectionError start")
defer log.Println("[DEBUG] connectionStateTableUpdater.onConnectionError end")
queries := introspection.GetConnectionStateErrorSql(connectionName, err)
for _, q := range queries {
if _, err := conn.Exec(ctx, q.Query, q.Args...); err != nil {
return err
}
}
return nil
}