Skip to content

Commit

Permalink
Merge pull request #14 from fabriziomello/issue/13
Browse files Browse the repository at this point in the history
[WIP] Use UNLOGGED TABLES on PostgreSQL
  • Loading branch information
knadh committed Apr 27, 2021
2 parents 4944bd6 + d5e51f8 commit c02d185
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 24 deletions.
48 changes: 32 additions & 16 deletions backends/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ const (
dbTypeMysql = "mysql"
)

// Opt represents SQL DB backend's options.
type Opt struct {
DBType string
ResultsTable string
UnloggedTables bool
}

// sqlDB represents the sqlDB backend.
type sqlDB struct {
db *sql.DB
dbType string
resultsTable string
logger *log.Logger
db *sql.DB
opt Opt
logger *log.Logger

// The result schemas (CREATE TABLE ...) are dynamically
// generated everytime queries are executed based on their result columns.
Expand Down Expand Up @@ -53,22 +59,22 @@ type insertSchema struct {

// NewSQLBackend returns a new sqlDB result backend instance.
// It accepts an *sql.DB connection
func NewSQLBackend(db *sql.DB, dbType string, resTable string, l *log.Logger) (ResultBackend, error) {
func NewSQLBackend(db *sql.DB, opt Opt, l *log.Logger) (ResultBackend, error) {
var (
r = sqlDB{
db: db,
dbType: dbType,
opt: opt,
resTableSchemas: make(map[string]insertSchema),
schemaMutex: sync.RWMutex{},
logger: l,
}
)

// Config.
if resTable != "" {
r.resultsTable = resTable
if opt.ResultsTable != "" {
r.opt.ResultsTable = opt.ResultsTable
} else {
r.resultsTable = "results_%s"
r.opt.ResultsTable = "results_%s"
}

return &r, nil
Expand All @@ -87,7 +93,7 @@ func (s *sqlDB) NewResultSet(jobID, taskName string, ttl time.Duration) (ResultS
jobID: jobID,
taskName: taskName,
backend: s,
tbl: fmt.Sprintf(s.resultsTable, jobID),
tbl: fmt.Sprintf(s.opt.ResultsTable, jobID),
tx: tx,
}, nil
}
Expand Down Expand Up @@ -115,7 +121,7 @@ func (w *sqlDBWriter) RegisterColTypes(cols []string, colTypes []*sql.ColumnType
colNameHolder[i] = fmt.Sprintf(`"%s"`, w.cols[i])

// This will be filled by the driver.
if w.backend.dbType == dbTypePostgres {
if w.backend.opt.DBType == dbTypePostgres {
// Postgres placeholders are $1, $2 ...
colValHolder[i] = fmt.Sprintf("$%d", i+1)
} else {
Expand Down Expand Up @@ -223,11 +229,12 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
colNameHolder = make([]string, len(cols))
colValHolder = make([]string, len(cols))
)

for i := range cols {
colNameHolder[i] = fmt.Sprintf(`"%s"`, cols[i])

// This will be filled by the driver.
if s.dbType == dbTypePostgres {
if s.opt.DBType == dbTypePostgres {
// Postgres placeholders are $1, $2 ...
colValHolder[i] = fmt.Sprintf("$%d", i+1)
} else {
Expand All @@ -236,9 +243,11 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
}

var (
fields = make([]string, len(cols))
typ = ""
fields = make([]string, len(cols))
typ = ""
unlogged = ""
)

for i := 0; i < len(cols); i++ {
typ = colTypes[i].DatabaseTypeName()
switch colTypes[i].DatabaseTypeName() {
Expand All @@ -256,7 +265,7 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
case "BOOLEAN": // Postgres, MySQL
typ = "BOOLEAN"
case "JSON", "JSONB": // Postgres
if s.dbType != dbTypePostgres {
if s.opt.DBType != dbTypePostgres {
typ = "TEXT"
}
// _INT4, _INT8, _TEXT represent array types in Postgres
Expand All @@ -277,9 +286,16 @@ func (s *sqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
fields[i] = fmt.Sprintf(`"%s" %s`, cols[i], typ)
}

// If the DB is Postgres, optionally create an "unlogged" table that disables
// WAL, improving performance of throw-away cache tables.
// https://www.postgresql.org/docs/9.1/sql-createtable.html
if s.opt.DBType == dbTypePostgres && s.opt.UnloggedTables {
unlogged = "UNLOGGED"
}

return insertSchema{
dropTable: `DROP TABLE IF EXISTS "%s";`,
createTable: fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%%s" (%s);`, strings.Join(fields, ",")),
createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s);`, unlogged, strings.Join(fields, ",")),
insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, strings.Join(colNameHolder, ","),
strings.Join(colValHolder, ",")),
}
Expand Down
13 changes: 9 additions & 4 deletions cmd/jobber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ func setup() {
// retain result db to perform queries on this db
testResultDB = conn

var (
opt = backends.Opt{
DBType: cfg.Type,
ResultsTable: ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
UnloggedTables: cfg.Unlogged,
}
)

// Create a new backend instance.
backend, err := backends.NewSQLBackend(conn,
cfg.Type,
ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
sysLog)
backend, err := backends.NewSQLBackend(conn, opt, sysLog)
if err != nil {
sysLog.Fatalf("error initializing result backend: %v", err)
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Jobber struct {
type DBConfig struct {
Type string `mapstructure:"type"`
DSN string `mapstructure:"dsn"`
Unlogged bool `mapstructure:"unlogged"`
MaxIdleConns int `mapstructure:"max_idle"`
MaxActiveConns int `mapstructure:"max_active"`
ConnectTimeout time.Duration `mapstructure:"connect_timeout"`
Expand Down Expand Up @@ -163,11 +164,14 @@ func main() {
log.Fatal(err)
}

opt := backends.Opt{
DBType: cfg.Type,
ResultsTable: ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
UnloggedTables: cfg.Unlogged,
}

// Create a new backend instance.
backend, err := backends.NewSQLBackend(conn,
cfg.Type,
ko.String(fmt.Sprintf("results.%s.results_table", dbName)),
sysLog)
backend, err := backends.NewSQLBackend(conn, opt, sysLog)
if err != nil {
log.Fatalf("error initializing result backend: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ state_address = "redis://127.0.0.1:6379/1"
# Each job can specify where its results should be stored.
# If there are multiple backends and jobs don't specify
# a particular backend, the results will be saved to a *random* one.
# The optional `unlogged = true` (for postgres) creates faster, unlogged (WAL) tables.
#
# type = "postgres" dsn = "postgres://user:password@host:5432/dbname?sslmode=disable"
# type = "mysql" dsn = "user:password@tcp(host:3306)/dbname"
Expand Down

0 comments on commit c02d185

Please sign in to comment.