/
fetcher.go
159 lines (132 loc) · 3.76 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package main
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"github.com/golang/groupcache"
"github.com/LeKovr/dbrpc/workman"
"github.com/LeKovr/go-base/logger"
)
// -----------------------------------------------------------------------------
// Processor gets value from cache and converts it into Result struct
func cacheFetcher(log *logger.Log, cacheGroup *groupcache.Group) workman.WorkerFunc {
// https://github.com/capotej/groupcache-db-experiment
return func(payload string) workman.Result {
var data []byte
log.Printf("asked for %s from groupcache", payload)
err := cacheGroup.Get(nil, payload,
groupcache.AllocatingByteSliceSink(&data))
var res workman.Result
if err != nil {
res = workman.Result{Success: false, Error: err.Error()}
} else {
d := data[1:]
if data[0] == 1 { // First byte stores success state (1: true, 0: false)
raw := json.RawMessage(d)
res = workman.Result{Success: true, Result: &raw}
} else {
res = workman.Result{Success: false, Error: string(d)}
}
}
return res
}
}
// -----------------------------------------------------------------------------
func dbFetcher(cfg *AplFlags, log *logger.Log, db *sql.DB) groupcache.GetterFunc {
return func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
log.Printf("asking for %s from dbserver", key)
var args []string
var data []byte
//err := json.Unmarshal(key, &args)
json.Unmarshal([]byte(key), &args)
if args[0] == cfg.ArgDefFunc {
q := fmt.Sprintf("select * from %s.%s($1)", cfg.Schema, args[0])
rows, err := db.Query(q, args[1])
if err != nil {
return err
}
defer rows.Close()
var res []ArgDef
for rows.Next() {
var a ArgDef
err = rows.Scan(&a.ID, &a.Name, &a.Type, &a.Default, &a.AllowNull)
if err != nil {
return err
}
res = append(res, a)
}
data, err = json.Marshal(res)
if err != nil {
return err
}
} else {
q, vals := PrepareFuncSQL(cfg, args)
log.Printf("Query: %s (%+v)", q, vals)
rows, err := db.Query(q, vals...)
if err != nil {
return err
}
defer rows.Close()
data, err = FetchSQLResult(rows)
if err != nil {
return err
}
}
result := []byte{1} // status: success
result = append(result, data...)
dd := result[1:]
log.Printf("Save data: %s", dd)
dest.SetBytes([]byte(result))
return nil
}
}
// -----------------------------------------------------------------------------
// PrepareFuncSQL prepares sql query with args placeholders
func PrepareFuncSQL(cfg *AplFlags, args []string) (string, []interface{}) {
mtd := args[0]
argVals := args[1:]
argValPrep := make([]interface{}, len(argVals))
argIDs := make([]string, len(argVals))
for i, v := range argVals {
argIDs[i] = fmt.Sprintf("$%d", i+1)
argValPrep[i] = v
}
argIDStr := strings.Join(argIDs, ",")
q := fmt.Sprintf("select * from %s.%s(%s)", cfg.Schema, mtd, argIDStr)
return q, argValPrep
}
// -----------------------------------------------------------------------------
// FetchSQLResult fetches sql result and marshalls it into json
func FetchSQLResult(rows *sql.Rows) (data []byte, err error) {
// http://stackoverflow.com/a/29164115
columns, err := rows.Columns()
if err != nil {
return
}
count := len(columns)
var tableData []map[string]interface{}
values := make([]interface{}, count)
valuePtrs := make([]interface{}, count)
for rows.Next() {
for i := 0; i < count; i++ {
valuePtrs[i] = &values[i]
}
rows.Scan(valuePtrs...)
entry := make(map[string]interface{})
for i, col := range columns {
var v interface{}
val := values[i]
b, ok := val.([]byte)
if ok {
v = string(b)
} else {
v = val
}
entry[col] = v
}
tableData = append(tableData, entry)
}
data, err = json.Marshal(tableData)
return
}