Skip to content

Commit

Permalink
feat: popx and dockertestx for database connection (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
sawadashota committed May 11, 2021
1 parent ec5f3ba commit 63e2636
Show file tree
Hide file tree
Showing 11 changed files with 1,036 additions and 5 deletions.
88 changes: 88 additions & 0 deletions dockertestx/dockertest_postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package dockertestx

import (
"database/sql"
"fmt"
"time"

"github.com/gobuffalo/pop/v5"
_ "github.com/lib/pq"
"github.com/ory/dockertest/v3"
"github.com/pkg/errors"

"github.com/tier4/x/runtimex"
)

type PurgeFunc func() error

// NewPostgres is to create PostgreSQL container and to return its connection and close function
func NewPostgres(tag string) (*pop.Connection, PurgeFunc, error) {
pool, err := dockertest.NewPool("")
if err != nil {
return nil, nil, errors.WithMessage(err, "Could not connect to docker")
}

dbUser := "dockertest"
dbPassword := "passw0rd"
dbName := "test"

resource, err := pool.Run(
"postgres",
tag,
[]string{
fmt.Sprintf("POSTGRES_USER=%s", dbUser),
fmt.Sprintf("POSTGRES_PASSWORD=%s", dbPassword),
fmt.Sprintf("POSTGRES_DB=%s", dbName),
})
if err != nil {
return nil, nil, errors.WithMessage(err, "Could not start resource")
}

getDSN := func() string {
return fmt.Sprintf(
"postgres://%s:%s@localhost:%s/%s?sslmode=disable",
dbUser,
dbPassword,
resource.GetPort("5432/tcp"),
dbName,
)
}

if err := pool.Retry(func() error {
var err error
db, err := sql.Open("postgres", getDSN())
if err != nil {
return err
}
return db.Ping()
}); err != nil {
return nil, nil, errors.WithMessage(err, "Could not connect to docker")
}

conn, err := pop.NewConnection(&pop.ConnectionDetails{
URL: getDSN(),
Pool: runtimex.MaxParallelism() * 2,
IdlePool: runtimex.MaxParallelism(),
ConnMaxLifetime: time.Duration(0),
})
if err != nil {
return nil, nil, errors.WithMessage(err, "Could not connect resource")
}
if err := conn.Open(); err != nil {
return nil, nil, errors.WithMessage(err, "Could not open connection")
}

var purgeFunc PurgeFunc = func() error {
if conn.Store != nil {
if err := conn.Close(); err != nil {
return errors.WithMessage(err, "Could not close connection")
}
}
if err := pool.Purge(resource); err != nil {
return errors.WithMessage(err, "Could not purge resource")
}
return nil
}

return conn, purgeFunc, nil
}
27 changes: 27 additions & 0 deletions dockertestx/dockertest_postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dockertestx_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tier4/x/dockertestx"
)

func TestNewPostgres(t *testing.T) {
t.Parallel()

conn, purge, err := dockertestx.NewPostgres("13.2-alpine")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, purge())
})

assert.Regexp(t, `postgres:\/\/dockertest:passw0rd@localhost:\d{4,5}/test\?sslmode=disable`, conn.String())

type pinger interface {
Ping() error
}
assert.NoError(t, conn.Store.(pinger).Ping())
}
12 changes: 9 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ module github.com/tier4/x
go 1.16

require (
github.com/google/uuid v1.2.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/gobuffalo/packd v1.0.0
github.com/gobuffalo/pop/v5 v5.3.4
github.com/google/uuid v1.2.0
github.com/jmoiron/sqlx v1.3.3
github.com/lib/pq v1.10.1
github.com/ory/dockertest/v3 v3.6.5
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
631 changes: 629 additions & 2 deletions go.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions popx/.soda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
development:
url: postgres://a/b
50 changes: 50 additions & 0 deletions popx/migration_box.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package popx

import (
"bytes"
"embed"
"io/fs"
"strings"

"github.com/gobuffalo/packd"
)

type migrationsFS struct {
dir embed.FS
}

func NewMigrationBox(fs embed.FS) packd.Walkable {
return &migrationsFS{dir: fs}
}

func (m *migrationsFS) Walk(wf packd.WalkFunc) error {
return fs.WalkDir(m.dir, ".", func(path string, info fs.DirEntry, err error) error {
if err != nil {
return err
}

if info.IsDir() {
return nil
}

content, err := m.dir.ReadFile(path)
if err != nil {
return err
}
f, err := packd.NewFile(info.Name(), bytes.NewReader(content))
if err != nil {
return err
}

return wf(path, f)
})
}

func (m *migrationsFS) WalkPrefix(prefix string, wf packd.WalkFunc) error {
return m.Walk(func(path string, file packd.File) error {
if strings.HasPrefix(path, prefix) {
return wf(path, file)
}
return nil
})
}
48 changes: 48 additions & 0 deletions popx/popx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package popx

import (
"context"
"io"

"github.com/gobuffalo/pop/v5"
"github.com/pkg/errors"
)

type Client struct {
c *pop.Connection
mb *pop.MigrationBox
}

func New(conn *pop.Connection, box *pop.MigrationBox) (*Client, error) {
return &Client{
c: conn,
mb: box,
}, nil
}

// MigrationStatus returns migration status
func (c *Client) MigrationStatus(_ context.Context, w io.Writer) error {
return c.mb.Status(w)
}

// MigrateDown rollbacks given steps
func (c *Client) MigrateDown(_ context.Context, steps int) error {
return c.mb.Down(steps)
}

// MigrateUp migrates all of un-executed
func (c *Client) MigrateUp(_ context.Context) error {
return c.mb.Up()
}

func (c *Client) Close(ctx context.Context) error {
return errors.WithStack(c.GetConnection(ctx).Close())
}

func (c *Client) Ping() error {
type pinger interface {
Ping() error
}
// This can not be contextualized because of some gobuffalo/pop limitations.
return errors.WithStack(c.c.Store.(pinger).Ping())
}
94 changes: 94 additions & 0 deletions popx/postgres_transaction_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package popx

import (
"context"

"github.com/gobuffalo/pop/v5"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
)

var (
ErrDataLockTaken = errors.Errorf("data lock taken")
)

type transactionContextKey int

const transactionKey transactionContextKey = 0

func WithTransaction(ctx context.Context, tx *pop.Connection) context.Context {
return context.WithValue(ctx, transactionKey, tx)
}

func (c *Client) Transaction(ctx context.Context, callback func(ctx context.Context, connection *pop.Connection) error) error {
txCtx := ctx.Value(transactionKey)
if c != nil {
if conn, ok := txCtx.(*pop.Connection); ok {
return callback(ctx, conn.WithContext(ctx))
}
}

return c.c.WithContext(ctx).Transaction(func(tx *pop.Connection) error {
return callback(WithTransaction(ctx, tx), tx)
})
}

// TransactionWithTryAdvisoryLock is Transaction with pg_try_advisory_xact_lock
// if cannot take lock, returns error immediately
func (c *Client) TransactionWithTryAdvisoryLock(ctx context.Context, key string, callback func(ctx context.Context, connection *pop.Connection) error) error {
txCtx := ctx.Value(transactionKey)
if c != nil {
if conn, ok := txCtx.(*pop.Connection); ok {
return callback(ctx, conn)
}
}

return c.c.Transaction(func(tx *pop.Connection) error {
if err := tryTakeAdvisoryLock(tx, key); err != nil {
return err
}
return callback(WithTransaction(ctx, tx), tx)
})
}

func tryTakeAdvisoryLock(tx *pop.Connection, key string) error {
rows, err := tx.Store.(sqlx.QueryerContext).
QueryxContext(tx.Context(), `select pg_try_advisory_xact_lock(hashtext($1))`, key)
if err != nil {
return err
}
if !rows.Next() {
return errors.New("unexpected error: try to take advisory lock but no rows returned")
}

var result bool
defer rows.Close()
if err := rows.Scan(&result); err != nil {
return err
}
if !result {
return errors.WithMessagef(ErrDataLockTaken, "data lock taken at the key %s", key)
}
return nil
}

func (c *Client) GetConnection(ctx context.Context) *pop.Connection {
txCtx := ctx.Value(transactionKey)
if c != nil {
if conn, ok := txCtx.(*pop.Connection); ok {
return conn.WithContext(ctx)
}
}
return c.c.WithContext(ctx)
}

// GetSqlxQueryer returns sqlx.QueryerContext wrapped by pop
// This is useful for join query
func (c *Client) GetSqlxQueryer(ctx context.Context) sqlx.QueryerContext {
return c.GetConnection(ctx).Store.(sqlx.QueryerContext)
}

// GetSqlxExecer returns sqlx.ExecerContext wrapped by pop
func (c *Client) GetSqlxExecer(ctx context.Context) sqlx.ExecerContext {
return c.GetConnection(ctx).Store.(sqlx.ExecerContext)
}
Loading

0 comments on commit 63e2636

Please sign in to comment.