Skip to content

Commit

Permalink
pkg/sorted: add read transactions.
Browse files Browse the repository at this point in the history
This is in relation to #580; we will need something like it to hook
Bleve up to the index. Currently only the SQL backends are implemented;
the rest is still TODO.

This also updates the version of the postgres library that we're using;
the old one didn't support setting the isolation level in TxOptions.

Change-Id: I14fdf74832e088d164b757417bfbb500a752d216
  • Loading branch information
zenhack authored and bradfitz committed Jun 24, 2019
1 parent 39fde2e commit 8e63050
Show file tree
Hide file tree
Showing 32 changed files with 6,235 additions and 635 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/jonas-p/go-shp v0.1.1 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lib/pq v0.0.0-20130607063955-9afcd9aa7931
github.com/lib/pq v1.1.1
github.com/mailgun/mailgun-go v0.0.0-20171127222028-17e8bd11e87c
github.com/mattn/go-mastodon v0.0.5-0.20190517015615-8f6192e26b66
github.com/mattn/go-sqlite3 v1.6.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v0.0.0-20130607063955-9afcd9aa7931 h1:iG6qKXF9fh1vkUvi+5NCgWY+4Yms+rYZfXrtZzZt978=
github.com/lib/pq v0.0.0-20130607063955-9afcd9aa7931/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mailgun/mailgun-go v0.0.0-20171127222028-17e8bd11e87c h1:5huPh/MfWW65cx8KWNVD4mCCnwIrNiX4bFJR5OeONg0=
github.com/mailgun/mailgun-go v0.0.0-20171127222028-17e8bd11e87c/go.mod h1:NWTyU+O4aczg/nsGhQnvHL6v2n5Gy6Sv5tNDVvC6FbU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-mastodon v0.0.5-0.20190517015615-8f6192e26b66 h1:TbnaLJhq+sFuqZ1wxdfF5Uk7A2J41iOobCCFnLI+RPE=
github.com/mattn/go-mastodon v0.0.5-0.20190517015615-8f6192e26b66/go.mod h1:ZBkemyyYYhNAN5JJ0H/ZSW8HfPCW45rHFHyWNwSfpTA=
github.com/mattn/go-sqlite3 v1.6.0 h1:TDwTWbeII+88Qy55nWlof0DclgAtI4LqGujkYMzmQII=
github.com/mattn/go-sqlite3 v1.6.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/miekg/dns v0.0.0-20161003181808-3f1f7c8ec9ea h1:OeKTTfcv1UiDsqNpa0rb8hbcH1WFoh9Lx7rgyiQeQvs=
Expand Down
24 changes: 24 additions & 0 deletions pkg/sorted/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ type KeyValue interface {
Close() error
}

// TransactionalReader is an optional interface that may be implemented by storage
// implementations. It may be implemented when a storage backend supports multiple
// atomic reads.
type TransactionalReader interface {
KeyValue

// BeginReadTx begins a read-only transaction.
BeginReadTx() ReadTransaction
}

// Wiper is an optional interface that may be implemented by storage
// implementations.
type Wiper interface {
Expand Down Expand Up @@ -135,6 +145,20 @@ type Iterator interface {
Close() error
}

// ReadTransaction is a read-only transaction on a KeyValue. It admits the same read
// operations as the KeyValue itself, but writes that occur after the transaction is
// created are not observed.
//
// Users should close the transaction as soon as it as no longer needed, as failing
// to do so can tie up resources.
type ReadTransaction interface {
Get(key string) (string, error)
Find(start, end string) Iterator

// End the transaction.
Close() error
}

type BatchMutation interface {
Set(key, value string)
Delete(key string)
Expand Down
64 changes: 64 additions & 0 deletions pkg/sorted/kvtest/kvtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kvtest // import "perkeep.org/pkg/sorted/kvtest"
import (
"reflect"
"testing"
"time"

"perkeep.org/pkg/sorted"
"perkeep.org/pkg/test"
Expand Down Expand Up @@ -80,6 +81,10 @@ func TestSorted(t *testing.T, kv sorted.KeyValue) {
// Deleting a non-existent item in a batch should not be an error
testDeleteNotFoundBatch(t, kv)
testDeletePartialNotFoundBatch(t, kv)

if txReader, ok := kv.(sorted.TransactionalReader); ok {
testReadTransaction(t, txReader)
}
}

// Do not ever insert that key, as it used for testing deletion of non existing entries
Expand Down Expand Up @@ -219,3 +224,62 @@ func isEmpty(t *testing.T, kv sorted.KeyValue) bool {
}
return !hasRow
}

func testReadTransaction(t *testing.T, kv sorted.TransactionalReader) {
set := func(k, v string) {
if err := kv.Set(k, v); err != nil {
t.Fatalf("Error setting %q to %q: %v", k, v, err)
}
}
set("raceKey", "orig")
tx := kv.BeginReadTx()

// We want to be sure the transaction is always closed before exiting,
// but we can't just defer tx.Close(), because on implementations that
// implement transactions with simple locks, the last call to set()
// below cannot run until the read transaction is closed. Furthermore,
// we need to make sure set() completes before returning, because if the
// caller closes the database connection before set() runs, it will
// panic.
//
// On the happy path, the sequence of events looks like:
//
// 1. Explicitly close the transaction.
// 2. Wait for set() to complete.
// 3. Return.
//
// ...but we use the boolean and defer statement below to ensure cleanup
// on errors.
txClosed := false
defer func() {
if !txClosed {
tx.Close()
}
}()

get := func(k string) string {
v, err := tx.Get(k)
if err != nil {
t.Fatalf("Error getting %q: %v", k, err)
}
return v
}
if get("raceKey") != "orig" {
t.Fatalf("Read saw the wrong initial value")
}

done := make(chan struct{}, 1)
go func() {
set("raceKey", "new")
done <- struct{}{}
}()

time.Sleep(time.Second / 5)
if get("raceKey") != "orig" {
t.Fatalf("Read transaction saw an update that happened after it started")
}

tx.Close()
txClosed = true
<-done
}
105 changes: 82 additions & 23 deletions pkg/sorted/sqlkv/sqlkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package sqlkv // import "perkeep.org/pkg/sorted/sqlkv"

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -122,11 +123,39 @@ func (b *batchTx) Delete(key string) {
_, b.err = b.tx.Exec(b.kv.sql("DELETE FROM /*TPRE*/rows WHERE k=?"), key)
}

func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
func (b *batchTx) Find(start, end string) sorted.Iterator {
if b.err != nil {
return &iter{
kv: b.kv,
closeCheck: leak.NewChecker(),
err: b.err,
}
}
return find(b.kv, b.tx, start, end)
}

func (b *batchTx) Get(key string) (value string, err error) {
if b.err != nil {
return "", b.err
}
return get(b.kv, b.tx, key)
}

func (b *batchTx) Close() error {
if b.err != nil {
return b.err
}
if b.kv.Gate != nil {
defer b.kv.Gate.Done()
}
return b.tx.Commit()
}

func (kv *KeyValue) beginTx(txOpts *sql.TxOptions) *batchTx {
if kv.Gate != nil {
kv.Gate.Start()
}
tx, err := kv.DB.Begin()
tx, err := kv.DB.BeginTx(context.TODO(), txOpts)
if err != nil {
log.Printf("SQL BEGIN BATCH: %v", err)
}
Expand All @@ -137,6 +166,10 @@ func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
}
}

func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
return kv.beginTx(nil)
}

func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error {
if kv.Gate != nil {
defer kv.Gate.Done()
Expand All @@ -154,16 +187,22 @@ func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error {
return bt.tx.Commit()
}

func (kv *KeyValue) BeginReadTx() sorted.ReadTransaction {
return kv.beginTx(&sql.TxOptions{
ReadOnly: true,
// Needed so that repeated reads of the same data are always
// consistent:
Isolation: sql.LevelSerializable,
})

}

func (kv *KeyValue) Get(key string) (value string, err error) {
if kv.Gate != nil {
kv.Gate.Start()
defer kv.Gate.Done()
}
err = kv.DB.QueryRow(kv.sql("SELECT v FROM /*TPRE*/rows WHERE k=?"), key).Scan(&value)
if err == sql.ErrNoRows {
err = sorted.ErrNotFound
}
return
return get(kv, kv.DB, key)
}

func (kv *KeyValue) Set(key, value string) error {
Expand Down Expand Up @@ -205,33 +244,53 @@ func (kv *KeyValue) Wipe() error {

func (kv *KeyValue) Close() error { return kv.DB.Close() }

func (kv *KeyValue) Find(start, end string) sorted.Iterator {
var releaseGate func() // nil if unused
if kv.Gate != nil {
var once sync.Once
kv.Gate.Start()
releaseGate = func() {
once.Do(kv.Gate.Done)
}
}
// Something we can make queries on. This will either be an *sql.DB or an *sql.Tx.
type queryObject interface {
QueryRow(query string, args ...interface{}) *sql.Row
Query(query string, args ...interface{}) (*sql.Rows, error)
}

// Common logic for KeyValue.Find and batchTx.Find.
func find(kv *KeyValue, qobj queryObject, start, end string) *iter {
var rows *sql.Rows
var err error
if end == "" {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? ORDER BY k "), start)
rows, err = qobj.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? ORDER BY k "), start)
} else {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? AND k < ? ORDER BY k "), start, end)
rows, err = qobj.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? AND k < ? ORDER BY k "), start, end)
}
if err != nil {
log.Printf("unexpected query error: %v", err)
return &iter{err: err}
}

it := &iter{
kv: kv,
rows: rows,
closeCheck: leak.NewChecker(),
releaseGate: releaseGate,
return &iter{
kv: kv,
rows: rows,
closeCheck: leak.NewChecker(),
}
}

// Common logic for KeyValue.Get and batchTx.Get
func get(kv *KeyValue, qobj queryObject, key string) (value string, err error) {
err = qobj.QueryRow(kv.sql("SELECT v FROM /*TPRE*/rows WHERE k=?"), key).Scan(&value)
if err == sql.ErrNoRows {
err = sorted.ErrNotFound
}
return
}

func (kv *KeyValue) Find(start, end string) sorted.Iterator {
var releaseGate func() // nil if unused
if kv.Gate != nil {
var once sync.Once
kv.Gate.Start()
releaseGate = func() {
once.Do(kv.Gate.Done)
}
}
it := find(kv, kv.DB, start, end)
it.releaseGate = releaseGate
return it
}

Expand Down
86 changes: 86 additions & 0 deletions vendor/github.com/lib/pq/.travis.sh

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8e63050

Please sign in to comment.