Skip to content

Commit

Permalink
Merge pull request #30 from sunesimonsen/ssimonsen/refactor-transacti…
Browse files Browse the repository at this point in the history
…on-for-type-safety

Introduce generic helper functions to make transaction type safe
  • Loading branch information
sunesimonsen committed May 30, 2023
2 parents 896b184 + d098179 commit a0b01cc
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 42 deletions.
8 changes: 3 additions & 5 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ func (blob *Blob) Id() Id {

// Returns the length of the content of the blob.
func (blob *Blob) Len() (uint64, error) {
length, err := blob.db.ReadTransact(func(tr fdb.ReadTransaction) (any, error) {
return readTransact(blob.db, func(tr fdb.ReadTransaction) (uint64, error) {
data, error := tr.Get(blob.dir.Sub("len")).Get()

return decodeUInt64(data), error
})

return length.(uint64), err
}

// Returns the time the blob was created at.
func (blob *Blob) CreatedAt() (time.Time, error) {
data, err := blob.db.ReadTransact(func(tr fdb.ReadTransaction) (any, error) {
data, err := readTransact(blob.db, func(tr fdb.ReadTransaction) ([]byte, error) {
return tr.Get(blob.dir.Sub("createdAt")).Get()

})

return time.Unix(int64(decodeUInt64(data.([]byte))), 0), err
return time.Unix(int64(decodeUInt64(data)), 0), err
}

// Returns the content of the blob as a byte slice.
Expand Down
2 changes: 1 addition & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (br *reader) Read(buf []byte) (int, error) {
br.dir.Sub("chunkSize")
bytesSpace := br.dir.Sub("bytes")

_, err := br.db.ReadTransact(func(tr fdb.ReadTransaction) (any, error) {
_, err := readTransact(br.db, func(tr fdb.ReadTransaction) (any, error) {
startChunk := br.off
endChunk := br.off + int(math.Ceil(float64(len(buf)-read)/float64(br.chunkSize)))
endChunkCap := int(math.Min(float64(startChunk+br.chunksPerTransaction), float64(endChunk))) + 1
Expand Down
20 changes: 9 additions & 11 deletions removed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
// can still access the removed blob. The removed blobs can be fully deleted
// using the [Store.DeleteRemovedBlobsBefore] method.
func (store *Store) RemoveBlob(id Id) error {
_, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
return updateTransact(store.db, func(tr fdb.Transaction) error {
blobDir, err := store.openBlobDir(id)
if err != nil {
return nil, err
return err
}

removedPath := append(store.removedDir.GetPath(), string(id))
Expand All @@ -24,43 +24,41 @@ func (store *Store) RemoveBlob(id Id) error {
unixTimestamp := store.systemTime.Now().Unix()
tr.Set(dst.Sub("deletedAt"), encodeUInt64(uint64(unixTimestamp)))

return nil, err
return nil
})

return err
}

// Deletes blobs that was marked as removed before a given date.
//
// This is useful to make a periodical cleaning job.
func (store *Store) DeleteRemovedBlobsBefore(date time.Time) ([]Id, error) {
var deletedIds []Id
_, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
err := updateTransact(store.db, func(tr fdb.Transaction) error {
ids, err := store.removedDir.List(tr, []string{})

if err != nil {
return nil, err
return err
}

for _, id := range ids {
removedBlobDir, err := store.removedDir.Open(tr, []string{id}, nil)

if err != nil {
return nil, err
return err
}

data, err := tr.Get(removedBlobDir.Sub("deletedAt")).Get()

if err != nil {
return nil, err
return err
}

deletedAt := time.Unix(int64(decodeUInt64(data)), 0)

if deletedAt.Before(date) {
deleted, err := store.removedDir.Remove(tr, []string{id})
if err != nil {
return nil, err
return err
}

if deleted {
Expand All @@ -69,7 +67,7 @@ func (store *Store) DeleteRemovedBlobsBefore(date time.Time) ([]Id, error) {
}
}

return nil, nil
return nil
})

return deletedIds, err
Expand Down
8 changes: 4 additions & 4 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ func (store *Store) Blob(id Id) (*Blob, error) {
return nil, err
}

data, err := store.db.ReadTransact(func(tr fdb.ReadTransaction) (any, error) {
data, err := readTransact(store.db, func(tr fdb.ReadTransaction) ([]byte, error) {
return tr.Get(blobDir.Sub("chunkSize")).Get()
})

if err != nil {
return nil, err
}

chunkSize := int(decodeUInt64(data.([]byte)))
chunkSize := int(decodeUInt64(data))

blob := &Blob{
db: store.db,
Expand All @@ -112,13 +112,13 @@ func (store *Store) Create(ctx context.Context, r io.Reader) (*Blob, error) {
return nil, err
}

id, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
id, err := transact(store.db, func(tr fdb.Transaction) (Id, error) {
return store.CommitUpload(tr, token)
})

if err != nil {
return nil, err
}

return store.Blob(id.(Id))
return store.Blob(id)
}
8 changes: 4 additions & 4 deletions store_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func ExampleStore_CommitUpload() {
r := strings.NewReader("My blob content")
token, err := store.Upload(ctx, r)

id, err := db.Transact(func(tr fdb.Transaction) (any, error) {
id, err := transact(db, func(tr fdb.Transaction) (Id, error) {
return store.CommitUpload(tr, token)
})

blob, err := store.Blob(id.(Id))
blob, err := store.Blob(id)
if err != nil {
log.Fatal("Could not retrieve blob")
}
Expand Down Expand Up @@ -209,11 +209,11 @@ func ExampleStore_Upload() {
r := strings.NewReader("My blob content")
token, err := store.Upload(ctx, r)

id, err := db.Transact(func(tr fdb.Transaction) (any, error) {
id, err := transact(db, func(tr fdb.Transaction) (Id, error) {
return store.CommitUpload(tr, token)
})

blob, err := store.Blob(id.(Id))
blob, err := store.Blob(id)
if err != nil {
log.Fatal("Could not retrieve blob")
}
Expand Down
27 changes: 27 additions & 0 deletions transact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package blobs

import "github.com/apple/foundationdb/bindings/go/src/fdb"

func transact[T any](db fdb.Transactor, cb func(tr fdb.Transaction) (T, error)) (T, error) {
result, err := db.Transact(func(tr fdb.Transaction) (any, error) {
return cb(tr)
})

return result.(T), err
}

func readTransact[T any](db fdb.ReadTransactor, cb func(tr fdb.ReadTransaction) (T, error)) (T, error) {
result, err := db.ReadTransact(func(tr fdb.ReadTransaction) (any, error) {
return cb(tr)
})

return result.(T), err
}

func updateTransact(db fdb.Transactor, cb func(tr fdb.Transaction) error) error {
_, err := db.Transact(func(tr fdb.Transaction) (any, error) {
return nil, cb(tr)
})

return err
}
26 changes: 12 additions & 14 deletions uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (store *Store) write(ctx context.Context, blobDir subspace.Subspace, r io.R
bytesSpace := blobDir.Sub("bytes")

for {
finished, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
finished, err := transact(store.db, func(tr fdb.Transaction) (bool, error) {
for i := 0; i < store.chunksPerTransaction; i++ {
err := ctx.Err()
if err != nil {
Expand All @@ -44,7 +44,7 @@ func (store *Store) write(ctx context.Context, blobDir subspace.Subspace, r io.R
return false, nil
})

if finished.(bool) {
if finished {
break
}

Expand All @@ -53,13 +53,11 @@ func (store *Store) write(ctx context.Context, blobDir subspace.Subspace, r io.R
}
}

_, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
return updateTransact(store.db, func(tr fdb.Transaction) error {
tr.Set(blobDir.Sub("len"), encodeUInt64(written))
tr.Set(blobDir.Sub("chunkSize"), encodeUInt64(uint64(store.chunkSize)))
return nil, nil
return nil
})

return err
}

// Uploads the content of the given reader r into a temporary location and
Expand All @@ -75,10 +73,10 @@ func (store *Store) Upload(ctx context.Context, r io.Reader) (UploadToken, error
return token, err
}

_, err = store.db.Transact(func(tr fdb.Transaction) (any, error) {
err = updateTransact(store.db, func(tr fdb.Transaction) error {
unixTimestamp := store.systemTime.Now().Unix()
tr.Set(uploadDir.Sub("uploadStartedAt"), encodeUInt64(uint64(unixTimestamp)))
return nil, nil
return nil
})

if err != nil {
Expand Down Expand Up @@ -119,32 +117,32 @@ func (store *Store) CommitUpload(tr fdb.Transaction, token UploadToken) (Id, err
// This is useful to make a periodical cleaning job.
func (store *Store) DeleteUploadsStartedBefore(date time.Time) ([]Id, error) {
var deletedIds []Id
_, err := store.db.Transact(func(tr fdb.Transaction) (any, error) {
err := updateTransact(store.db, func(tr fdb.Transaction) error {
ids, err := store.uploadsDir.List(tr, []string{})

if err != nil {
return nil, err
return err
}

for _, id := range ids {
uploadDir, err := store.uploadsDir.Open(tr, []string{id}, nil)

if err != nil {
return nil, err
return err
}

data, err := tr.Get(uploadDir.Sub("uploadStartedAt")).Get()

if err != nil {
return nil, err
return err
}

uploadStartedAt := time.Unix(int64(decodeUInt64(data)), 0)

if uploadStartedAt.Before(date) {
deleted, err := store.uploadsDir.Remove(tr, []string{id})
if err != nil {
return nil, err
return err
}

if deleted {
Expand All @@ -153,7 +151,7 @@ func (store *Store) DeleteUploadsStartedBefore(date time.Time) ([]Id, error) {
}
}

return nil, nil
return nil
})

return deletedIds, err
Expand Down
6 changes: 3 additions & 3 deletions uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func TestUploadCommit(t *testing.T) {
token, err := store.Upload(ctx, strings.NewReader(input))
assert.NoError(t, err)

id, err := db.Transact(func(tr fdb.Transaction) (any, error) {
id, err := transact(db, func(tr fdb.Transaction) (Id, error) {
return store.CommitUpload(tr, token)
})
assert.NoError(t, err)

blob, err := store.Blob(id.(Id))
blob, err := store.Blob(id)
assert.NoError(t, err)

content, err := blob.Content(ctx)
Expand All @@ -64,7 +64,7 @@ func TestUploadCommit(t *testing.T) {

t.Run("rejects invalid tokens", func(t *testing.T) {

_, err := db.Transact(func(tr fdb.Transaction) (any, error) {
_, err := transact(db, func(tr fdb.Transaction) (any, error) {
return store.CommitUpload(tr, UploadToken{})
})
assert.EqualError(t, err, "Invalid upload token, tokens needs to be produced by the upload method")
Expand Down

0 comments on commit a0b01cc

Please sign in to comment.