forked from timescale/tsbs
/
db_creator.go
158 lines (144 loc) · 4.9 KB
/
db_creator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package timestream
import (
"fmt"
"github.com/aws/aws-sdk-go/service/timestreamwrite"
"github.com/pkg/errors"
"github.com/keshav-kk/tsbs/pkg/targets"
"log"
"time"
)
const (
checkTablesMaxAttempts = 10
checkTablesSecondsBetweenChecks = 10
)
type dbCreator struct {
writeSvc *timestreamwrite.TimestreamWrite
ds targets.DataSource
memoryRetentionPeriodInHours int64
magneticStoreRetentionPeriodInDays int64
}
func (d *dbCreator) Init() {
// read headers from data source so PostCreate can create the tables
d.ds.Headers()
}
func (d *dbCreator) DBExists(dbName string) bool {
describeDatabaseInput := ×treamwrite.DescribeDatabaseInput{
DatabaseName: &dbName,
}
_, err := d.writeSvc.DescribeDatabase(describeDatabaseInput)
if err != nil {
// Check if error was "database doesn't exist"
_, ok := err.(*timestreamwrite.ResourceNotFoundException)
if ok {
return false
}
panic("could not execute 'describe database': " + err.Error())
}
return true
}
func (d *dbCreator) CreateDB(dbName string) error {
log.Println("Creating database " + dbName)
createDatabaseInput := ×treamwrite.CreateDatabaseInput{
DatabaseName: &dbName,
}
if _, err := d.writeSvc.CreateDatabase(createDatabaseInput); err != nil {
return errors.Wrap(err, "could not create database "+dbName)
}
return nil
}
func (d *dbCreator) RemoveOldDB(dbName string) error {
log.Println("Removing existing database " + dbName)
listTables := ×treamwrite.ListTablesInput{DatabaseName: &dbName}
tablesOutput, err := d.writeSvc.ListTables(listTables)
if err != nil {
return errors.Wrap(err, "could not check existing tables in "+dbName)
}
for _, table := range tablesOutput.Tables {
deleteTableInput := ×treamwrite.DeleteTableInput{
DatabaseName: &dbName,
TableName: table.TableName,
}
if _, err := d.writeSvc.DeleteTable(deleteTableInput); err != nil {
return errors.Wrap(err, "could not delete table "+*table.TableName+" in db "+dbName)
}
}
deleteDatabaseInput := ×treamwrite.DeleteDatabaseInput{DatabaseName: &dbName}
if _, err := d.writeSvc.DeleteDatabase(deleteDatabaseInput); err != nil {
return errors.Wrap(err, "could not delete database "+dbName)
}
return nil
}
// Timestream doesn't need to create the complete schema, just the tables
func (d *dbCreator) PostCreateDB(dbName string) error {
log.Println("Creating Timestream tables")
headers := d.ds.Headers()
var requiredTables []string
for tableName := range headers.FieldKeys {
requiredTables = append(requiredTables, tableName)
createTableInput := ×treamwrite.CreateTableInput{
DatabaseName: &dbName,
RetentionProperties: ×treamwrite.RetentionProperties{
MagneticStoreRetentionPeriodInDays: &d.magneticStoreRetentionPeriodInDays,
MemoryStoreRetentionPeriodInHours: &d.memoryRetentionPeriodInHours,
},
TableName: &tableName,
}
_, err := d.writeSvc.CreateTable(createTableInput)
if _, ok := err.(*timestreamwrite.ConflictException); !ok {
return errors.Wrap(err, "could not create table '"+tableName+"': ")
} else {
log.Println("Table " + tableName + " exists, skipping create")
}
}
fmt.Println("DB created, checking table status")
if err := d.waitForTables(dbName, requiredTables); err != nil {
return errors.Wrap(err, "could not create timestream tables")
}
return nil
}
func (d *dbCreator) waitForTables(dbName string, requiredTables []string) error {
numAttempts := 0
for {
tablesInDb, err := d.listTableStatus(dbName)
if err != nil {
return errors.Wrap(err, "could not check if all tables were created")
}
if allTablesActive, err := checkTableStatus(tablesInDb, requiredTables); err != nil {
return err
} else if allTablesActive {
break
}
log.Printf("Not all tables are active, waiting %d seconds", checkTablesSecondsBetweenChecks)
numAttempts++
if numAttempts >= checkTablesMaxAttempts {
return fmt.Errorf("tables not created and active in time")
}
time.Sleep(checkTablesSecondsBetweenChecks * time.Second)
}
return nil
}
func (d *dbCreator) listTableStatus(dbName string) (tableStatus map[string]string, err error) {
listTables := ×treamwrite.ListTablesInput{DatabaseName: &dbName}
tablesOutput, err := d.writeSvc.ListTables(listTables)
if err != nil {
return nil, errors.Wrap(err, "could not check existing tables in "+dbName)
}
tableStatus = make(map[string]string, len(tablesOutput.Tables))
for _, table := range tablesOutput.Tables {
tableName := *table.TableName
tableStatus[tableName] = *table.TableStatus
}
return tableStatus, nil
}
func checkTableStatus(tableStatus map[string]string, requiredTables []string) (bool, error) {
for _, table := range requiredTables {
status, ok := tableStatus[table]
if !ok {
return false, fmt.Errorf("required table '%s' not found in db", table)
}
if status != timestreamwrite.TableStatusActive {
return false, nil
}
}
return true, nil
}