-
Notifications
You must be signed in to change notification settings - Fork 10
/
db_helper.go
135 lines (120 loc) · 3.76 KB
/
db_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package testutil
import (
"context"
"fmt"
"math"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/jaeger-ydb-store/schema"
)
var (
db struct {
once sync.Once
pool table.Client
done bool
}
defaultTXC = table.TxControl(
table.BeginTx(table.WithSerializableReadWrite()),
table.CommitTx(),
)
)
func initDb(tb testing.TB) {
db.once.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
dbPath := schema.DbPath{Path: os.Getenv("YDB_PATH"), Folder: os.Getenv("YDB_FOLDER")}
require.NotEmpty(tb, os.Getenv("YDB_ADDRESS"))
conn, err := ydb.Open(ctx,
sugar.DSN(os.Getenv("YDB_ADDRESS"), dbPath.Path, os.Getenv("YDB_SECURE") == "1"),
ydb.WithSessionPoolSizeLimit(10),
ydb.WithAccessTokenCredentials(os.Getenv("YDB_TOKEN")),
)
if err != nil {
tb.Fatalf("ydb connect failed: %v", err)
}
db.pool = conn.Table()
err = conn.Table().Do(ctx, func(ctx context.Context, session table.Session) error {
return CreateTables(ctx, dbPath, session)
})
if err != nil {
tb.Fatalf("failed to create static tables: %v", err)
}
err = conn.Table().Do(ctx, func(ctx context.Context, session table.Session) error {
_, _, err := session.Execute(ctx, defaultTXC, schema.BuildQuery(dbPath, schema.DeleteAllParts), nil)
return err
})
if err != nil {
tb.Fatalf("failed to clean part table: %v", err)
}
err = CreatePartitionTables(ctx, conn.Table(), partRange(time.Now(), time.Now().Add(time.Hour*2))...)
if err != nil {
tb.Fatalf("failed to create partition tables: %v", err)
}
db.done = true
})
if !db.done {
tb.Fatal("initDb failed")
}
}
func YdbSessionPool(tb testing.TB) table.Client {
initDb(tb)
return db.pool
}
func partRange(start, stop time.Time) []schema.PartitionKey {
result := make([]schema.PartitionKey, 0)
diff := stop.Sub(start)
if diff < 0 {
panic("stop < start")
}
numParts := int(math.Floor(diff.Hours())) + 1
for i := 0; i < numParts; i++ {
result = append(result, schema.PartitionFromTime(start.Add(time.Hour*time.Duration(i))))
}
return result
}
func CreateTables(ctx context.Context, dbPath schema.DbPath, session table.Session) error {
for name, definition := range schema.Tables {
fullPath := dbPath.FullTable(name)
if err := session.CreateTable(ctx, fullPath, definition()...); err != nil {
return err
}
}
return nil
}
func CreatePartitionTables(ctx context.Context, tc table.Client, parts ...schema.PartitionKey) error {
var err error
dbPath := schema.DbPath{Path: os.Getenv("YDB_PATH"), Folder: os.Getenv("YDB_FOLDER")}
for _, part := range parts {
err = tc.Do(ctx, func(ctx context.Context, session table.Session) error {
_, _, err = session.Execute(ctx, defaultTXC, schema.BuildQuery(dbPath, schema.InsertPart), part.QueryParams())
return err
})
if err != nil {
return fmt.Errorf("failed to insert part '%+v': %v", part, err)
}
for name, tableOptions := range schema.PartitionTables {
fullPath := part.BuildFullTableName(dbPath.String(), name)
tbl := dbPath.Table(name)
err = tc.Do(ctx, func(ctx context.Context, session table.Session) error {
return session.CreateTable(ctx, fullPath, tableOptions(3)...)
})
if err != nil {
return fmt.Errorf("failed to create table '%s': %v", fullPath, err)
}
err = tc.Do(ctx, func(ctx context.Context, session table.Session) error {
_, _, err = session.Execute(ctx, defaultTXC, fmt.Sprintf("DELETE FROM `%s_%s`", tbl, part.Suffix()), table.NewQueryParameters())
return err
})
if err != nil {
return fmt.Errorf("failed to clean table '%s': %v", fullPath, err)
}
}
}
return nil
}