/
txdbexecuter.go
107 lines (98 loc) · 2.87 KB
/
txdbexecuter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package mysql
import (
"context"
"database/sql"
"strings"
"sync"
"time"
opentracing "github.com/opentracing/opentracing-go"
tags "github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
)
type txDBExecuter struct {
tx *sql.Tx
counter *prometheus.CounterVec
histogram *prometheus.HistogramVec
statementMap *sync.Map
invalidateFunc []InvalidateFunc
}
func (s *txDBExecuter) runInvalidateFuncs() {
var wg sync.WaitGroup
for _, v := range s.invalidateFunc {
wg.Add(1)
go func(invalidate InvalidateFunc) {
defer wg.Done()
// XXX: invalidation errors are silent.
_ = invalidate()
}(v)
}
wg.Wait()
}
// It is not thread-safe and should not be called in concurrent goroutines
func (s *txDBExecuter) Invalidate(f InvalidateFunc) error {
s.invalidateFunc = append(s.invalidateFunc, f)
return nil
}
func (s *txDBExecuter) cleanStatement(statement string) string {
if raw, ok := s.statementMap.Load(statement); ok {
return raw.(string)
}
ret := strings.Replace(statement, "\n", " ", -1)
ret = strings.Replace(ret, "\t", " ", -1)
ret = strings.TrimSpace(ret)
if strings.HasPrefix(ret, "SELECT") {
kl := strings.Split(ret, "FROM")
if len(kl) >= 2 {
if len(kl[0]) > maxSelectLen {
ret = kl[0][:maxSelectLen] + "... FROM" + strings.Join(kl[1:], "FROM")
}
}
}
s.statementMap.Store(statement, ret)
return ret
}
func (s *txDBExecuter) Query(ctx context.Context, unprepared string, args ...interface{}) (*sql.Rows, error) {
if s.counter != nil {
s.counter.WithLabelValues(s.cleanStatement(unprepared), "QUERY").Inc()
}
if s.histogram != nil {
startTime := time.Now()
defer func() {
s.histogram.WithLabelValues(s.cleanStatement(unprepared), "QUERY").Observe(
time.Since(startTime).Seconds())
}()
}
span, ctx := opentracing.StartSpanFromContext(ctx, "SQL TX QUERY")
tags.SpanKindRPCClient.Set(span)
tags.PeerService.Set(span, "mysql")
span.SetTag("db.statement", unprepared)
r, err := s.tx.QueryContext(ctx, unprepared, args...)
if err != nil {
return r, err
}
return r, nil
}
func (s *txDBExecuter) Exec(ctx context.Context, unprepared string, args ...interface{}) (sql.Result, error) {
if s.counter != nil {
s.counter.WithLabelValues(s.cleanStatement(unprepared), "QUERY").Inc()
}
if s.histogram != nil {
startTime := time.Now()
defer func() {
s.histogram.WithLabelValues(s.cleanStatement(unprepared), "QUERY").Observe(
time.Since(startTime).Seconds())
}()
}
span, ctx := opentracing.StartSpanFromContext(ctx, "SQL TX EXEC")
tags.SpanKindRPCClient.Set(span)
tags.PeerService.Set(span, "mysql")
span.SetTag("db.statement", unprepared)
r, err := s.tx.ExecContext(ctx, unprepared, args...)
if err != nil {
return r, err
}
return r, nil
}
func (s *txDBExecuter) Prepare(ctx context.Context, query string) (*sql.Stmt, error) {
return s.tx.PrepareContext(ctx, query)
}