-
Notifications
You must be signed in to change notification settings - Fork 769
/
db.go
129 lines (112 loc) · 3.64 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package postgresql
import (
"context"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"go.temporal.io/server/common/persistence/schema"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
postgresqlschema "go.temporal.io/server/schema/postgresql"
)
// ErrDupEntryCode indicates a duplicate primary key i.e. the row already exists,
// check http://www.postgresql.org/docs/9.3/static/errcodes-appendix.html
const ErrDupEntryCode = pq.ErrorCode("23505")
func (pdb *db) IsDupEntryError(err error) bool {
sqlErr, ok := err.(*pq.Error)
return ok && sqlErr.Code == ErrDupEntryCode
}
// db represents a logical connection to mysql database
type db struct {
dbKind sqlplugin.DbKind
dbName string
db *sqlx.DB
tx *sqlx.Tx
conn sqlplugin.Conn
converter DataConverter
}
var _ sqlplugin.DB = (*db)(nil)
var _ sqlplugin.Tx = (*db)(nil)
// newDB returns an instance of DB, which is a logical
// connection to the underlying postgresql database
func newDB(
dbKind sqlplugin.DbKind,
dbName string,
xdb *sqlx.DB,
tx *sqlx.Tx,
) *db {
mdb := &db{
dbKind: dbKind,
dbName: dbName,
db: xdb,
tx: tx,
}
mdb.conn = xdb
if tx != nil {
mdb.conn = tx
}
mdb.converter = &converter{}
return mdb
}
// BeginTx starts a new transaction and returns a reference to the Tx object
func (pdb *db) BeginTx(ctx context.Context) (sqlplugin.Tx, error) {
xtx, err := pdb.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
return newDB(pdb.dbKind, pdb.dbName, pdb.db, xtx), nil
}
// Commit commits a previously started transaction
func (pdb *db) Commit() error {
return pdb.tx.Commit()
}
// Rollback triggers rollback of a previously started transaction
func (pdb *db) Rollback() error {
return pdb.tx.Rollback()
}
// Close closes the connection to the mysql db
func (pdb *db) Close() error {
return pdb.db.Close()
}
// PluginName returns the name of the mysql plugin
func (pdb *db) PluginName() string {
return PluginName
}
// ExpectedVersion returns expected version.
func (pdb *db) ExpectedVersion() string {
switch pdb.dbKind {
case sqlplugin.DbKindMain:
return postgresqlschema.Version
case sqlplugin.DbKindVisibility:
return postgresqlschema.VisibilityVersion
default:
panic(fmt.Sprintf("unknown db kind %v", pdb.dbKind))
}
}
// VerifyVersion verify schema version is up to date
func (pdb *db) VerifyVersion() error {
expectedVersion := pdb.ExpectedVersion()
return schema.VerifyCompatibleVersion(pdb, pdb.dbName, expectedVersion)
}