-
Notifications
You must be signed in to change notification settings - Fork 261
/
stop_services.go
355 lines (301 loc) · 10.3 KB
/
stop_services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package db_local
import (
"context"
"fmt"
"log"
"os"
"strings"
"syscall"
"time"
psutils "github.com/shirou/gopsutil/process"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/constants/runtime"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/filepaths"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/utils"
"github.com/turbot/steampipe/pluginmanager"
)
// StopStatus is a pseudoEnum for service stop result
type StopStatus int
const (
// start from 1 to prevent confusion with int zero-value
ServiceStopped StopStatus = iota + 1
ServiceNotRunning
ServiceStopFailed
ServiceStopTimedOut
)
// ShutdownService stops the database instance if the given 'invoker' matches
func ShutdownService(ctx context.Context, invoker constants.Invoker) {
utils.LogTime("db_local.ShutdownService start")
defer utils.LogTime("db_local.ShutdownService end")
if error_helpers.IsContextCanceled(ctx) {
ctx = context.Background()
}
status, _ := GetState()
// if the service is not running or it was invoked by 'steampipe service',
// then we don't shut it down
if status == nil || status.Invoker == constants.InvokerService {
return
}
// how many clients are connected
// under a fresh context
clientCounts, err := GetClientCount(context.Background())
// if there are other clients connected
// and if there's no error
if err == nil && clientCounts.SteampipeClients > 0 {
// there are other steampipe clients connected to the database
// we don't need to stop the service
// the last one to exit will shutdown the service
log.Printf("[TRACE] ShutdownService not closing database service - %d steampipe %s connected", clientCounts.SteampipeClients, utils.Pluralize("client", clientCounts.SteampipeClients))
return
}
// we can shut down the database
stopStatus, err := StopServices(ctx, false, invoker)
if err != nil {
error_helpers.ShowError(ctx, err)
}
if stopStatus == ServiceStopped {
return
}
// shutdown failed - try to force stop
_, err = StopServices(ctx, true, invoker)
if err != nil {
error_helpers.ShowError(ctx, err)
}
}
type ClientCount struct {
SteampipeClients int
PluginManagerClients int
TotalClients int
}
// GetClientCount returns the number of connections to the service from anyone other than
// _this_execution_ of steampipe
//
// We assume that any connections from this execution will eventually be closed
// - if there are any other external connections, we cannot shut down the database
//
// this is to handle cases where either a third party tool is connected to the database,
// or other Steampipe sessions are attached to an already running Steampipe service
// - we do not want the db service being closed underneath them
//
// note: we need the PgClientAppName check to handle the case where there may be one or more open DB connections
// from this instance at the time of shutdown - for example when a control run is cancelled
// If we do not exclude connections from this execution, the DB will not be shut down after a cancellation
func GetClientCount(ctx context.Context) (*ClientCount, error) {
utils.LogTime("db_local.GetClientCount start")
defer utils.LogTime(fmt.Sprintf("db_local.GetClientCount end"))
rootClient, err := CreateLocalDbConnection(ctx, &CreateDbOptions{Username: constants.DatabaseSuperUser})
if err != nil {
return nil, err
}
defer rootClient.Close(ctx)
query := `
SELECT
application_name,
count(*)
FROM
pg_stat_activity
WHERE
-- get only the network client processes
client_port IS NOT NULL
AND
-- which are client backends
backend_type=$1
AND
-- which are not connections from this application
application_name!=$2
GROUP BY application_name
`
counts := &ClientCount{}
rows, err := rootClient.Query(ctx, query, "client backend", runtime.PgClientAppName)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var appName string
var count int
if err := rows.Scan(&appName, &count); err != nil {
return nil, err
}
counts.TotalClients += count
if strings.HasPrefix(appName, constants.AppName) {
counts.SteampipeClients += count
}
if strings.HasPrefix(appName, runtime.PgClientAppNamePluginManagerPrefix) {
counts.PluginManagerClients += count
}
}
return counts, nil
}
// StopServices searches for and stops the running instance. Does nothing if an instance was not found
func StopServices(ctx context.Context, force bool, invoker constants.Invoker) (status StopStatus, e error) {
log.Printf("[TRACE] StopDB invoker %s, force %v", invoker, force)
utils.LogTime("db_local.StopDB start")
defer func() {
if e == nil {
os.Remove(filepaths.RunningInfoFilePath())
}
utils.LogTime("db_local.StopDB end")
}()
// stop the plugin manager
// this means it may be stopped even if we fail to stop the service - that is ok - we will restart it if needed
pluginManagerStopError := pluginmanager.Stop()
// stop the DB Service
stopResult, dbStopError := stopDBService(ctx, force)
return stopResult, error_helpers.CombineErrors(dbStopError, pluginManagerStopError)
}
func stopDBService(ctx context.Context, force bool) (StopStatus, error) {
if force {
// check if we have a process from another install-dir
statushooks.SetStatus(ctx, "Checking for running instances...")
defer statushooks.Done(ctx)
// do not use a context that can be cancelled
anyStopped := killInstanceIfAny(context.Background())
if anyStopped {
return ServiceStopped, nil
}
return ServiceNotRunning, nil
}
dbState, err := GetState()
if err != nil {
return ServiceStopFailed, err
}
if dbState == nil {
// we do not have a info file
// assume that the service is not running
return ServiceNotRunning, nil
}
// GetStatus has made sure that the process exists
process, err := psutils.NewProcess(int32(dbState.Pid))
if err != nil {
return ServiceStopFailed, err
}
err = doThreeStepPostgresExit(ctx, process)
if err != nil {
// we couldn't stop it still.
// timeout
return ServiceStopTimedOut, err
}
return ServiceStopped, nil
}
/*
Postgres has three levels of shutdown:
- SIGTERM - Smart Shutdown : Wait for children to end normally - exit self
- SIGINT - Fast Shutdown : SIGTERM children, causing them to abort current
transations and exit - wait for children to exit -
exit self
- SIGQUIT - Immediate Shutdown : SIGQUIT children - wait at most 5 seconds,
send SIGKILL to children - exit self immediately
Postgres recommended shutdown is to send a SIGTERM - which initiates
a Smart-Shutdown sequence.
IMPORTANT:
As per documentation, it is best not to use SIGKILL
to shut down postgres. Doing so will prevent the server
from releasing shared memory and semaphores.
Reference:
https://www.postgresql.org/docs/12/server-shutdown.html
By the time we actually try to run this sequence, we will have
checked that the service can indeed shutdown gracefully,
the sequence is there only as a backup.
*/
func doThreeStepPostgresExit(ctx context.Context, process *psutils.Process) error {
utils.LogTime("db_local.doThreeStepPostgresExit start")
defer utils.LogTime("db_local.doThreeStepPostgresExit end")
var err error
var exitSuccessful bool
// send a SIGTERM
err = process.SendSignal(syscall.SIGTERM)
if err != nil {
return err
}
exitSuccessful = waitForProcessExit(process, 2*time.Second)
if !exitSuccessful {
// process didn't quit
// set status, as this is taking time
statushooks.SetStatus(ctx, "Shutting down...")
defer statushooks.Done(ctx)
// try a SIGINT
err = process.SendSignal(syscall.SIGINT)
if err != nil {
return err
}
exitSuccessful = waitForProcessExit(process, 2*time.Second)
}
if !exitSuccessful {
// process didn't quit
// desperation prevails
err = process.SendSignal(syscall.SIGQUIT)
if err != nil {
return err
}
exitSuccessful = waitForProcessExit(process, 5*time.Second)
}
if !exitSuccessful {
log.Println("[ERROR] Failed to stop service")
log.Printf("[ERROR] Service Details:\n%s\n", getPrintableProcessDetails(process, 0))
return fmt.Errorf("service shutdown timed out")
}
return nil
}
func waitForProcessExit(process *psutils.Process, waitFor time.Duration) bool {
utils.LogTime("db_local.waitForProcessExit start")
defer utils.LogTime("db_local.waitForProcessExit end")
checkTimer := time.NewTicker(50 * time.Millisecond)
timeoutAt := time.After(waitFor)
for {
select {
case <-checkTimer.C:
pEx, _ := utils.PidExists(int(process.Pid))
if pEx {
continue
}
return true
case <-timeoutAt:
checkTimer.Stop()
return false
}
}
}
func getPrintableProcessDetails(process *psutils.Process, indent int) string {
utils.LogTime("db_local.getPrintableProcessDetails start")
defer utils.LogTime("db_local.getPrintableProcessDetails end")
indentString := strings.Repeat(" ", indent)
appendTo := []string{}
if name, err := process.Name(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> Name: %s", indentString, name))
}
if cmdLine, err := process.Cmdline(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> CmdLine: %s", indentString, cmdLine))
}
if status, err := process.Status(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> Status: %s", indentString, status))
}
if cwd, err := process.Cwd(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> CWD: %s", indentString, cwd))
}
if executable, err := process.Exe(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> Executable: %s", indentString, executable))
}
if username, err := process.Username(); err == nil {
appendTo = append(appendTo, fmt.Sprintf("%s> Username: %s", indentString, username))
}
if indent == 0 {
// I do not care about the parent of my parent
if parent, err := process.Parent(); err == nil && parent != nil {
appendTo = append(appendTo, "", fmt.Sprintf("%s> Parent Details", indentString))
parentLog := getPrintableProcessDetails(parent, indent+1)
appendTo = append(appendTo, parentLog, "")
}
// I do not care about all the children of my parent
if children, err := process.Children(); err == nil && len(children) > 0 {
appendTo = append(appendTo, fmt.Sprintf("%s> Children Details", indentString))
for _, child := range children {
childLog := getPrintableProcessDetails(child, indent+1)
appendTo = append(appendTo, childLog, "")
}
}
}
return strings.Join(appendTo, "\n")
}