/
logclient.go
128 lines (92 loc) · 2.61 KB
/
logclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package direktiv
import (
"time"
"database/sql"
"encoding/json"
)
type logClient struct {
db *sql.DB
}
func newLogDBClient() *logClient {
return &logClient{}
}
func (lc *logClient) name() string {
return "logclient"
}
func runLogQuery(rows *sql.Rows) ([]map[string]interface{}, error) {
var (
data string
retArray []map[string]interface{}
)
for rows.Next() {
err := rows.Scan(&data)
if err != nil {
appLog.Errorf("error scanning namespace log results: %v", err)
return retArray, err
}
result := make(map[string]interface{})
err = json.Unmarshal([]byte(data), &result)
if err != nil {
appLog.Errorf("error unmarshaling namespace logs: %v", err)
return retArray, err
}
retArray = append(retArray, result)
}
return retArray, nil
}
func (lc *logClient) deleteInstanceLogs(id string) error {
_, err := lc.db.Exec("delete from fluentbit where data->>'instance' = $1", id)
return err
}
func (lc *logClient) deleteNamespaceLogs() error {
delTime := time.Now().Add(time.Duration(-10) * time.Minute)
_, err := lc.db.Exec("delete from fluentbit where data->>'instance' is null and time < $1",
delTime.Format("2006-01-02 15:04:05"))
return err
}
func (lc *logClient) logsForInstance(id string, offset, limit int32) ([]map[string]interface{}, error) {
rows, err := lc.db.Query(`SELECT data FROM fluentbit WHERE
data->>'instance' = $1 ORDER BY time asc LIMIT $2 OFFSET $3;`,
id, limit, offset)
if err != nil {
appLog.Errorf("error querying namespace logs: %v", err)
return nil, err
}
return runLogQuery(rows)
}
func (lc *logClient) logsForInstanceAfterTime(id string, time float64) ([]map[string]interface{}, error) {
rows, err := lc.db.Query(`SELECT data FROM fluentbit WHERE
data->> 'instance' = $1 AND data->> 'ts' > $2 ORDER BY time;`,
id, time)
if err != nil {
appLog.Errorf("error querying namespace logs: %v", err)
return nil, err
}
return runLogQuery(rows)
}
func (lc *logClient) logsForNamespace(ns string, offset, limit int32) ([]map[string]interface{}, error) {
rows, err := lc.db.Query(`SELECT data FROM fluentbit WHERE data->>'namespace' = $1
and data->>'instance' is null ORDER BY time desc LIMIT $2 OFFSET $3;`,
ns, limit, offset)
if err != nil {
appLog.Errorf("error querying namespace logs: %v", err)
return nil, err
}
return runLogQuery(rows)
}
func (lc *logClient) stop() {
lc.db.Close()
}
func (lc *logClient) start(s *WorkflowServer) error {
db, err := sql.Open("postgres", s.config.Database.DB)
if err != nil {
return err
}
lc.db = db
appLog.Debug("ping database")
err = lc.db.Ping()
if err != nil {
return err
}
return nil
}