Skip to content

Commit

Permalink
Use pgx/stdlib to access pgx Conns directly from the DB
Browse files Browse the repository at this point in the history
  • Loading branch information
arielshaqed committed Oct 4, 2020
1 parent 6076288 commit 589b4f0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
4 changes: 2 additions & 2 deletions parade/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ var TaskDataColumnNames = []string{
var tasksTable = pgx.Identifier{"tasks"}

// InsertTasks adds multiple tasks efficiently.
func InsertTasks(ctx context.Context, pgConn *pgx.Conn, source pgx.CopyFromSource) error {
_, err := pgConn.CopyFrom(ctx, tasksTable, TaskDataColumnNames, source)
func InsertTasks(ctx context.Context, conn *pgx.Conn, source pgx.CopyFromSource) error {
_, err := conn.CopyFrom(ctx, tasksTable, TaskDataColumnNames, source)
return err
}

Expand Down
28 changes: 19 additions & 9 deletions parade/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

"github.com/go-test/deep"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/treeverse/lakefs/parade"

Expand Down Expand Up @@ -156,11 +156,17 @@ func (w wrapper) stripActor(actor parade.TaskID) parade.ActorID {
func (w wrapper) insertTasks(tasks []parade.TaskData) func() {
w.t.Helper()
ctx := context.Background()
conn, err := pgx.Connect(ctx, databaseURI)
sqlConn, err := w.db.Conn(ctx)
if err != nil {
w.t.Fatalf("pgx.Connect: %s", err)
w.t.Fatalf("sqlx.DB.Conn: %s", err)
}
defer conn.Close(ctx)
defer sqlConn.Close()

conn, err := stdlib.AcquireConn(w.db.DB)
if err != nil {
w.t.Fatalf("stdlib.AcquireConn: %s", err)
}
defer stdlib.ReleaseConn(w.db.DB, conn)

prefixedTasks := make([]parade.TaskData, len(tasks))
for i := 0; i < len(tasks); i++ {
Expand All @@ -185,9 +191,11 @@ func (w wrapper) insertTasks(tasks []parade.TaskData) func() {

// Create cleanup callback. Compute the ids now, tasks may change later.
ids := make([]parade.TaskID, 0, len(tasks))

for _, task := range tasks {
ids = append(ids, task.ID)
}

return func() { w.deleteTasks(ids) }
}

Expand All @@ -197,10 +205,12 @@ func (w wrapper) deleteTasks(ids []parade.TaskID) error {
prefixedIDs[i] = w.prefixTask(ids[i])
}
ctx := context.Background()
conn, err := pgx.Connect(ctx, databaseURI)
conn, err := stdlib.AcquireConn(w.db.DB)
if err != nil {
return fmt.Errorf("connect to DB: %w", err)
w.t.Fatalf("stdlib.AcquireConn: %s", err)
}
defer stdlib.ReleaseConn(w.db.DB, conn)

tx, err := conn.Begin(ctx)
if err != nil {
return fmt.Errorf("BEGIN: %w", err)
Expand Down Expand Up @@ -692,11 +702,11 @@ func TestNotification(t *testing.T) {
t.Fatalf("expected to own single task but got %+v", tasks)
}

conn, err := pgx.Connect(ctx, databaseURI)
conn, err := stdlib.AcquireConn(w.db.DB)
if err != nil {
t.Fatalf("pgx.Connect: %s", err)
w.t.Fatalf("stdlib.AcquireConn: %s", err)
}
defer conn.Close(ctx)
defer stdlib.ReleaseConn(w.db.DB, conn)

type result struct {
status string
Expand Down

0 comments on commit 589b4f0

Please sign in to comment.