-
Notifications
You must be signed in to change notification settings - Fork 9
/
connection.go
109 lines (97 loc) · 3.51 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package pinot
import (
"fmt"
"math/big"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
// Connection to Pinot, normally created through calls to the {@link ConnectionFactory}.
type Connection struct {
transport clientTransport
brokerSelector brokerSelector
trace bool
useMultistageEngine bool
}
// UseMultistageEngine for the connection
func (c *Connection) UseMultistageEngine(useMultistageEngine bool) {
c.useMultistageEngine = useMultistageEngine
}
// ExecuteSQL for a given table
func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, error) {
brokerAddress, err := c.brokerSelector.selectBroker(table)
if err != nil {
log.Errorf("Unable to find an available broker for table %s, Error: %v\n", table, err)
return nil, err
}
brokerResp, err := c.transport.execute(brokerAddress, &Request{
queryFormat: "sql",
query: query,
trace: c.trace,
useMultistageEngine: c.useMultistageEngine,
})
if err != nil {
log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err)
return nil, err
}
return brokerResp, err
}
// ExecuteSQLWithParams executes an SQL query with parameters for a given table
func (c *Connection) ExecuteSQLWithParams(table string, queryPattern string, params []interface{}) (*BrokerResponse, error) {
query, err := formatQuery(queryPattern, params)
if err != nil {
log.Errorf("Failed to format query: %v\n", err)
return nil, fmt.Errorf("failed to format query: %v", err)
}
return c.ExecuteSQL(table, query)
}
func formatQuery(queryPattern string, params []interface{}) (string, error) {
// Count the number of placeholders in queryPattern
numPlaceholders := strings.Count(queryPattern, "?")
if numPlaceholders != len(params) {
return "", fmt.Errorf("number of placeholders in queryPattern (%d) does not match number of params (%d)", numPlaceholders, len(params))
}
// Split the query by '?' and incrementally build the new query
parts := strings.Split(queryPattern, "?")
var newQuery strings.Builder
for i, part := range parts[:len(parts)-1] {
newQuery.WriteString(part)
formattedParam, err := formatArg(params[i])
if err != nil {
log.Errorf("Failed to format parameter: %v\n", err)
return "", fmt.Errorf("failed to format parameter: %v", err)
}
newQuery.WriteString(formattedParam)
}
// Add the last part of the query, which does not follow a '?'
newQuery.WriteString(parts[len(parts)-1])
return newQuery.String(), nil
}
func formatArg(value interface{}) (string, error) {
switch v := value.(type) {
case string, *big.Int, *big.Float:
// For pinot types - STRING, BIG_DECIMAL and BYTES - enclose in single quotes
return fmt.Sprintf("'%v'", v), nil
case []byte:
// For pinot type - BYTES - convert to Hex string and enclose in single quotes
hexString := fmt.Sprintf("%x", v)
return fmt.Sprintf("'%s'", hexString), nil
case time.Time:
// For pinot type - TIMESTAMP - convert to ISO8601 format and enclose in single quotes
return fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05.000")), nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool:
// For types - INT, LONG, FLOAT, DOUBLE and BOOLEAN use as-is
return fmt.Sprintf("%v", v), nil
default:
// Throw error for unsupported types
return "", fmt.Errorf("unsupported type: %T", v)
}
}
// OpenTrace for the connection
func (c *Connection) OpenTrace() {
c.trace = true
}
// CloseTrace for the connection
func (c *Connection) CloseTrace() {
c.trace = false
}