Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite ydb
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [Firebird](database/firebird)
* [MS SQL Server](database/sqlserver)
* [rqlite](database/rqlite)
* [YDB](database/ydb)

### Database URLs

Expand Down
18 changes: 18 additions & 0 deletions database/ydb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# [YDB](https://ydb.tech/docs/en/)

`ydb://[user:password@]host:port/database?QUERY_PARAMS`

| URL Query | Description |
|------------|-------------|
| `user` | The user to sign in as |
| `password` | The user's password |
| `host` | The host to connect to |
| `port` | The port to bind to |
| `database` | The name of the database to connect to |
| `x-migrations-table`| Name of the migrations table. Default: `schema_migrations` |
| `x-use-grpcs` | Enable GRPCS protocol for connecting to YDB (default GRPC) |

## Warning
- It is not possible to use DDL and DML queries simultaneously within a single migration.
- Beware of race conditions between migrations initiated from different processes (on the same machine or on different machines).
- Beware of partial migrations, because currently in YDB it is not possible to execute DDL SQL statements in a transaction.
1 change: 1 addition & 0 deletions database/ydb/examples/migrations/1_init.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE test;
4 changes: 4 additions & 0 deletions database/ydb/examples/migrations/1_init.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE test (
id Int,
PRIMARY KEY(id)
);
337 changes: 337 additions & 0 deletions database/ydb/ydb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
package ydb

import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"net/url"
"strings"
"sync/atomic"
"time"

ydb "github.com/ydb-platform/ydb-go-sdk/v3"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
)

const (
migrationsTableQueryParam = "x-migrations-table"
useGRPCSQueryParam = "x-use-grpcs"

defaultMigrationsTable = "schema_migrations"
)

var _ database.Driver = (*YDB)(nil)

func init() {
database.Register("ydb", &YDB{})
}

const (
createVersionTableQueryTemplate = `
CREATE TABLE %s (
version Int32,
dirty Bool,
applied_at Timestamp,
PRIMARY KEY (version)
);
`

deleteVersionsQueryTemplate = `
DELETE FROM %s;`

setVersionQueryTemplate = `
DECLARE $version AS Int32;
DECLARE $dirty AS Bool;
DECLARE $applied_at AS Timestamp;
UPSERT INTO %s (version, dirty, applied_at)
VALUES ($version, $dirty, $applied_at);`

getCurrentVersionQueryTemplate = `
SELECT version, dirty FROM %s
ORDER BY version DESC LIMIT 1;`

dropTablesQueryTemplate = "DROP TABLE `%s`;"

getAllTablesQueryTemplate = "SELECT Path FROM `%s.sys/partition_stats` WHERE Path NOT LIKE '%%.sys%%'"
)

type Config struct {
MigrationsTable string
Path string
}

func WithInstance(db *sql.DB, config Config) (database.Driver, error) {
if err := db.Ping(); err != nil {
return nil, err
}

if config.MigrationsTable == "" {
config.MigrationsTable = defaultMigrationsTable
}

if config.Path != "" && !strings.HasSuffix(config.Path, "/") {
config.Path += "/"
}

ydbDriver := &YDB{
db: db,
config: config,
}

err := ydbDriver.createMigrationsTable(context.Background())
if err != nil {
return nil, database.Error{
OrigErr: err,
Err: "failed to create migrations table",
}
}

return ydbDriver, nil
}

type YDB struct {
db *sql.DB
locked atomic.Bool
config Config
}

func (y *YDB) tableWithPrefix(table string) string {
if y.config.Path == "" {
return table
}

return fmt.Sprintf("`%s%s`", y.config.Path, table)
}

func (y *YDB) Lock() error {
if !y.locked.CompareAndSwap(false, true) {
return database.ErrLocked
}

return nil
}
func (y *YDB) Unlock() error {
if !y.locked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}

return nil
}

func (y *YDB) Open(dsn string) (database.Driver, error) {
customUrl, err := url.Parse(dsn)
if err != nil {
return nil, err
}

connUrl := migrate.FilterCustomQuery(customUrl)
if customUrl.Query().Get(useGRPCSQueryParam) != "" {
connUrl.Scheme = "grpcs"
} else {
connUrl.Scheme = "grpc"
}

nativeDriver, err := ydb.Open(context.Background(), connUrl.String())
if err != nil {
return nil, err
}

connector, err := ydb.Connector(nativeDriver)
if err != nil {
return nil, err
}

migrationsTable := customUrl.Query().Get(migrationsTableQueryParam)

if migrationsTable == "" {
migrationsTable = defaultMigrationsTable
}

databaseName := nativeDriver.Name()

if databaseName != "" {
databaseName += "/"
}

ydbDriver := &YDB{
db: sql.OpenDB(connector),
config: Config{
MigrationsTable: migrationsTable,
Path: databaseName,
},
}

err = ydbDriver.createMigrationsTable(context.Background())
if err != nil {
return nil, database.Error{
OrigErr: err,
Err: "failed to create migrations table",
}
}

return ydbDriver, nil
}

func (y *YDB) createMigrationsTable(ctx context.Context) (err error) {
if err = y.Lock(); err != nil {
return err
}

defer func() {
if ierr := y.Unlock(); ierr != nil {
err = errors.Join(err, ierr)
}
}()

ctx = ydb.WithQueryMode(ctx, ydb.SchemeQueryMode)

createTableQuery := fmt.Sprintf(createVersionTableQueryTemplate, y.tableWithPrefix(y.config.MigrationsTable))
if _, err := y.db.ExecContext(ctx, createTableQuery); err != nil {
return database.Error{
OrigErr: err,
Query: []byte(createTableQuery),
}
}

return nil
}

func (y *YDB) Close() error {
return y.db.Close()
}

func (y *YDB) Drop() (err error) {
tablesQuery := fmt.Sprintf(
getAllTablesQueryTemplate,
y.config.Path,
)

rows, err := y.db.QueryContext(ydb.WithQueryMode(context.Background(), ydb.ScanQueryMode), tablesQuery)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(tablesQuery)}
}
defer func() {
if ierr := rows.Close(); ierr != nil {
err = errors.Join(err, ierr)
}
}()

if !rows.NextResultSet() {
return nil
}

for rows.Next() {
var table string
err = rows.Scan(&table)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(tablesQuery)}
}

query := fmt.Sprintf(dropTablesQueryTemplate, table)

if _, err = y.db.ExecContext(ydb.WithQueryMode(context.Background(), ydb.SchemeQueryMode), query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}

if err = rows.Err(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(tablesQuery)}
}

return nil
}

func (y *YDB) Run(migration io.Reader) error {
data, err := io.ReadAll(migration)
if err != nil {
return err
}

_, err = y.db.ExecContext(ydb.WithQueryMode(context.Background(), ydb.ScriptingQueryMode), string(data))
if err != nil {
return database.Error{
OrigErr: err,
Err: "migration failed",
Query: data,
}
}

return nil
}

func (y *YDB) SetVersion(version int, dirty bool) error {
tx, err := y.db.BeginTx(context.Background(), &sql.TxOptions{
ReadOnly: false,
Isolation: sql.LevelDefault,
})
if err != nil {
return err
}

ctx := ydb.WithQueryMode(context.Background(), ydb.DataQueryMode)

deleteVersions := fmt.Sprintf(deleteVersionsQueryTemplate, y.tableWithPrefix(y.config.MigrationsTable))
if _, err = tx.ExecContext(ctx, deleteVersions); err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
err = errors.Join(err, rollbackErr)
}

return database.Error{
OrigErr: err,
Err: "failed to delete versions",
Query: []byte(deleteVersions),
}
}

versionQuery := fmt.Sprintf(setVersionQueryTemplate, y.tableWithPrefix(y.config.MigrationsTable))
_, err = tx.ExecContext(
ctx,
versionQuery,
sql.Named("version", version),
sql.Named("dirty", dirty),
sql.Named("applied_at", time.Now()),
)
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
err = errors.Join(err, rollbackErr)
}

return database.Error{
OrigErr: err,
Err: "failed to set version",
Query: []byte(versionQuery),
}
}

return tx.Commit()
}

func (y *YDB) Version() (version int, dirty bool, err error) {
versionQuery := fmt.Sprintf(getCurrentVersionQueryTemplate, y.tableWithPrefix(y.config.MigrationsTable))

row, err := y.db.QueryContext(ydb.WithQueryMode(context.Background(), ydb.ScanQueryMode), versionQuery)
if err != nil {
return 0, false, database.Error{
OrigErr: err,
Err: "failed to get version",
Query: []byte(versionQuery),
}
}
if !row.NextResultSet() || !row.Next() {
return database.NilVersion, false, nil
}

if err = row.Scan(&version, &dirty); err != nil {
return 0, false, &database.Error{
OrigErr: err,
Err: "failed to scan version",
Query: []byte(versionQuery),
}
}

return version, dirty, err
}
Loading