-
Notifications
You must be signed in to change notification settings - Fork 229
/
status.go
104 lines (95 loc) · 2.59 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package dbsync
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/alexeyco/simpletable"
"github.com/pkg/errors"
)
func (s *Sync) status(ctx context.Context) (string, error) {
rows, err := s.oldDB.QueryContext(ctx, `
select count(*), application_name, usename
from pg_stat_activity
where datname=current_database()
group by application_name, usename
order by application_name, usename
`)
if err != nil {
return "", errors.Wrap(err, "check DB connections")
}
defer rows.Close()
table := simpletable.New()
table.Header = &simpletable.Header{
Cells: []*simpletable.Cell{
{Text: "Application"},
{Text: "Username"},
{Text: "Connections"},
},
}
for rows.Next() {
var num int
var name string
var user string
err = rows.Scan(&num, &name, &user)
if err != nil {
return "", errors.Wrap(err, "scan query results")
}
table.Body.Cells = append(table.Body.Cells, []*simpletable.Cell{
{Text: name},
{Text: user},
{Text: strconv.Itoa(num)},
})
}
rows.Close()
buf := new(strings.Builder)
buf.WriteString(table.String() + "\n\n")
table = simpletable.New()
table.Header = &simpletable.Header{
Cells: []*simpletable.Cell{
{Text: "Node ID"},
{Text: "Status"},
{Text: "Offset"},
{Text: "Config"},
{Text: "Last Seen"},
{Text: "ActiveRequests"},
},
}
nodes := s.NodeStatus()
for _, stat := range nodes {
cfg := "Valid"
if !stat.MatchDBNext(s.newURL) {
cfg = "Invalid"
}
table.Body.Cells = append(table.Body.Cells, []*simpletable.Cell{
{Text: stat.NodeID},
{Text: string(stat.State)},
{Text: stat.Offset.String()},
{Text: cfg},
{Text: time.Since(stat.At).Truncate(time.Millisecond).String()},
{Text: strconv.Itoa(stat.ActiveRequests)},
})
}
buf.WriteString(table.String() + "\n\n")
fmt.Fprintf(buf, "Node Count: %d\n", len(nodes))
fmt.Fprintln(buf, "Local offset:", s.Offset())
var stat string
err = s.oldDB.QueryRowContext(ctx, `select current_state from switchover_state`).Scan(&stat)
if err != nil {
return "", errors.Wrap(err, "lookup switchover state")
}
fmt.Fprintln(buf, "Switchover state:", stat)
var changeMax int
err = s.oldDB.QueryRowContext(ctx, `select coalesce(max(id),0) from change_log`).Scan(&changeMax)
if err != nil {
return "", errors.Wrap(err, "lookup change id")
}
fmt.Fprintln(buf, "Max change_log ID:", changeMax)
err = s.newDB.QueryRowContext(ctx, `select coalesce(max(id),0) from change_log`).Scan(&changeMax)
if err != nil {
return "", errors.Wrap(err, "lookup change id (new)")
}
fmt.Fprintln(buf, "Max change_log ID (next DB):", changeMax)
return buf.String(), nil
}