/
kxtsdb.go
154 lines (134 loc) · 4.13 KB
/
kxtsdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package kxcommon
import (
"fmt"
"time"
_ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod
influxdb "github.com/influxdata/influxdb1-client/v2"
)
// TSDB Time Series DB Class
type TSDB struct {
hostName string
port int
userName string
password string
precision string
connection influxdb.Client
isConnected bool
}
// TSRecord Time Series record structure
type TSRecord struct {
TimeStamp time.Time
Tags map[string]string
Fields map[string]interface{}
}
// KXHistTSRecord defines the structure of the time series record in TSDB
type KXHistTSRecord struct {
Tag string
PType string
Value float64
ValueStr string
Quality string
TimeStamp time.Time
}
var (
)
// TSDBNew creates a new Time Series DB connection object
func TSDBNew (hostName string, port int, userName string, password string) *TSDB {
return &TSDB{hostName, port, userName, password, "ns", nil, false}
}
// OpenTSDB opens the Time-Stamped DB
func (tsdb *TSDB) OpenTSDB () error {
con, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%d", tsdb.hostName, tsdb.port),
Username: tsdb.userName,
Password: tsdb.password,
})
if err != nil {
return err
}
tsdb.connection = con
tsdb.isConnected = true
return nil
}
// CloseTSDB closes the Time-Stamped DB
func (tsdb *TSDB) CloseTSDB() error {
return tsdb.connection.Close()
}
// QueryTSOneTagTimeRange get records from TimeStamped database for one tag in a time range
func (tsdb *TSDB) QueryTSOneTagTimeRange(database string, table string, tag string, startTimeStamp time.Time, endTimeStamp time.Time) ([]map[string]interface{}, error){
// start building the query sentence
// select time, "tag", value from timeseries where "tag" = 'IED1.A.TOTALSCANS' and "time" = 1553356273872000000
startTimeMs := startTimeStamp.UnixNano()
endTimeMs := endTimeStamp.UnixNano()
var rtn []map[string]interface{}
//fmt.Printf("start Time: %d - end time: %d\n", startTimeMs, endTimeMs)
queryStr := " select %s from %s %s"
fieldStr := "*"
whereClause := fmt.Sprintf(" where \"tag\" = '%s' and time >= %d and time <= %d", tag, startTimeMs, endTimeMs)
queryStr = fmt.Sprintf(queryStr, fieldStr, table, whereClause)
query := influxdb.Query {
Command: queryStr,
Database: database,
Precision: tsdb.precision,
}
resp, err := tsdb.connection.Query(query)
if err != nil {
return nil, err
}
//
// any rows returned?
//
if (len(resp.Results) > 0) {
//res := make (map[string]string, len(res.Series.Columns))
for _, res := range resp.Results {
for _,sr := range res.Series {
for _, val := range sr.Values {
rec := make(map[string]interface{})
for j, col := range sr.Columns {
rec[col] = val[j]
}
rtn = append(rtn, rec)
}
}
}
}
return rtn, nil
}
// QueryTSOneTagLastValue get lasr record from TimeStamped database for one tag where time < specified
func (tsdb *TSDB) QueryTSOneTagLastValue(database string, table string, tag string, endTimeStamp time.Time) (map[string]interface{}, error){
// start building the query sentence
// select time, "tag", value from timeseries where "tag" = 'IED1.A.TOTALSCANS' and "time" = 1553356273872000000
endTimeMs := endTimeStamp.UnixNano()
var rtn map[string]interface{}
//fmt.Printf("end time: %d\n", endTimeMs)
queryStr := " select %s from %s %s"
fieldStr := "*"
whereClause := fmt.Sprintf(" where \"tag\" = '%s' and time < %d order by time desc limit 1", tag, endTimeMs)
queryStr = fmt.Sprintf(queryStr, fieldStr, table, whereClause)
//fmt.Printf("Query: %s\n", queryStr)
query := influxdb.Query {
Command: queryStr,
Database: database,
}
resp, err := tsdb.connection.Query(query)
if err != nil {
return nil, err
}
//
// any rows returned?
//
if (len(resp.Results) > 0) {
//res := make (map[string]string, len(res.Series.Columns))
// create an array of KXHistTSRecord out of the response
for _, res := range resp.Results {
for _,sr := range res.Series {
//fmt.Printf("name: %s\n", sr.Name)
rtn = make(map[string]interface{})
for j, col := range sr.Columns {
rtn[col] = sr.Values[0][j]
}
}
}
}
return rtn, nil
}