Skip to content

Commit

Permalink
Expose translation mvcc (FeatureBaseDB#2381)
Browse files Browse the repository at this point in the history
* expose Transaction on TranslateStore for DAX Snapshotting

* try to fix ramdisk nonsense

apparently, we were running in either a shell env or docker env
randomly, so this could sometimes pass and sometimes fail since the
shell env had the ramdisk set up and docker didn't.

Now we force to run in docker always and set up ramdisk explicitly.

* ramdisk mount should be defined on gitlab runner config now

* debug ramdisk issue?

* fix tests... and a buncha other stuff

Took retry out of CI config because I think it's doing more harm than
good at this point.

The executor test I modified failed when I changed DefaultPartitionN
to 8, but just because stuff was out of order so I made it more
robust.

I edited some data gen stuff to make shorter lines because it was
making grep results unusable.

the actual fix is in translate_boltdb_test.go

* clean up, fix code review feedback
  • Loading branch information
jaffee committed Dec 20, 2022
1 parent 0212199 commit 3693b99
Show file tree
Hide file tree
Showing 9 changed files with 3,076 additions and 72 deletions.
6 changes: 2 additions & 4 deletions .gitlab/.gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,13 @@ run go tests race:
extends: .go-cache
rules:
- if: '$CI_PIPELINE_SOURCE == "push" || $CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"'
retry: 1
needs: ["smoke build"] # we do block on smoke build though bc it's pretty dumb to test stuff if it doesn't build
script:
- echo "Running featurebase race tests..."
- PKG_LIST=$(go list ./... | grep -Ev 'internal/clustertests|simulacraData|batch|idk|v3/dax/test/dax' | paste -s -d, -)
- RAMDISK=/mnt/ramdisk go test -race -v -timeout=10m ${PKG_LIST//,/ }
tags:
- aws
- docker

# We run our base tests against $GOVERSION (a reasonably current version that we trust)
# and use shardwidth22 for them. This gives us a canary for things breaking for
Expand All @@ -269,7 +268,6 @@ run go tests:
extends: .go-cache
rules:
- if: '$CI_PIPELINE_SOURCE == "push" || $CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"'
retry: 1
script:
- echo "Running featurebase unit tests..."
- PKG_LIST=$(go list ./... | grep -Ev 'internal/clustertests|simulacraData|batch|idk|v3/dax/test/dax' | paste -s -d, -)
Expand All @@ -278,7 +276,7 @@ run go tests:
paths:
- coverage.out
tags:
- aws
- docker

run go tests dax/test/dax:
stage: test
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ testvsub:
echo; echo "999 done testing subpkg $$pkg"; \
done

# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramfs
# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/mnt/ramdisk
ramdisk-linux:
mount -o size=2G -t tmpfs none /mnt/ramfs
mount -o size=2G -t tmpfs none /mnt/ramdisk

# make a 2GB RAMDisk. Speed up tests by running them with RAMDISK=/Volumes/RAMDisk
ramdisk-osx:
Expand Down
29 changes: 19 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (r RedirectError) Error() string {
}

// TranslateData returns all translation data in the specified partition.
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (io.WriterTo, error) {
func (api *API) TranslateData(ctx context.Context, indexName string, partition int) (TranslateStore, error) {
span, _ := tracing.StartSpanFromContext(ctx, "API.TranslateData")
defer span.Finish()

Expand Down Expand Up @@ -1023,7 +1023,7 @@ func (api *API) TranslateData(ctx context.Context, indexName string, partition i
}

// FieldTranslateData returns all translation data in the specified field.
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (io.WriterTo, error) {
func (api *API) FieldTranslateData(ctx context.Context, indexName, fieldName string) (TranslateStore, error) {
span, _ := tracing.StartSpanFromContext(ctx, "API.FieldTranslateData")
defer span.Finish()
if err := api.validate(apiFieldTranslateData); err != nil {
Expand Down Expand Up @@ -3137,18 +3137,22 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey
qtid := req.TableKey.QualifiedTableID()

// Create the snapshot for the current version.
wrTo, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
trans, err := api.TranslateData(ctx, string(req.TableKey), int(req.PartitionNum))
if err != nil {
return errors.Wrapf(err, "getting index/partition writeto: %s/%d", req.TableKey, req.PartitionNum)
return errors.Wrapf(err, "getting index/partition translate store: %s/%d", req.TableKey, req.PartitionNum)
}
// get a write tx to ensure no other writes while incrementing WL version.
wrTo, err := trans.Begin(true)
if err != nil {
return errors.Wrap(err, "beginning table translate write tx")
}
defer wrTo.Rollback()

// TODO(jaffee) need to ensure writes to translation data can't
// occur while this is happening.
resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum)
if err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing write log version")
}
// TODO(jaffee) downgrade (currently non-existent) lock to read-only
// TODO(jaffee) downgrade write tx to read-only
err = resource.SnapshotTo(wrTo)
return errors.Wrap(err, "snapshotting table keys")
}
Expand All @@ -3159,17 +3163,22 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey
qtid := req.TableKey.QualifiedTableID()

// Create the snapshot for the current version.
// TODO(jaffee) change this to get write lock
wrTo, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
if err != nil {
return errors.Wrap(err, "getting index/field writeto")
}
// get a write tx to ensure no other writes while incrementing WL version.
wrTo, err := trans.Begin(true)
if err != nil {
return errors.Wrap(err, "beginning field translate write tx")
}
defer wrTo.Rollback()

resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field)
if err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing writelog version")
}
// TODO(jaffee) downgrade to read lock
// TODO(jaffee) downgrade to read tx
err = resource.SnapshotTo(wrTo)
return errors.Wrap(err, "snapshotTo in FieldKeys")
}
Expand Down
64 changes: 34 additions & 30 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5122,47 +5122,50 @@ func TestExecutor_Execute_Extract_Keyed(t *testing.T) {
`)

resp := c.Query(t, c.Idx(), `Extract(All(), Rows(set))`)
expect := []interface{}{
pilosa.ExtractedTable{
Fields: []pilosa.ExtractedTableField{
{
Name: "set",
Type: "[]uint64",
},
expect := pilosa.ExtractedTable{
Fields: []pilosa.ExtractedTableField{
{
Name: "set",
Type: "[]uint64",
},
// The order of these probably shouldn't matter, but currently depends indirectly on the
// index.
Columns: []pilosa.ExtractedTableColumn{
{
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
Rows: []interface{}{
[]uint64{
1,
2,
},
},
// The order of these probably shouldn't matter, but currently depends indirectly on the
// index.
Columns: []pilosa.ExtractedTableColumn{
{
Column: pilosa.KeyOrID{Keyed: true, Key: "h"},
Rows: []interface{}{
[]uint64{
1,
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
Rows: []interface{}{
[]uint64{
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "xyzzy"},
Rows: []interface{}{
[]uint64{
2,
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
Rows: []interface{}{
[]uint64{},
},
},
{
Column: pilosa.KeyOrID{Keyed: true, Key: "plugh"},
Rows: []interface{}{
[]uint64{},
},
},
},
}

if !reflect.DeepEqual(expect, resp.Results) {
t.Errorf("expected %v but got %v", expect, resp.Results)
if len(resp.Results) != 1 {
t.Fail()
}
res := resp.Results[0].(pilosa.ExtractedTable)
if !reflect.DeepEqual(expect.Fields, res.Fields) {
t.Errorf("expected:\n%v\nbut got:\n%v", expect, resp.Results)
}
assert.ElementsMatch(t, expect.Columns, res.Columns)
}

func TestExecutor_Execute_MaxMemory(t *testing.T) {
Expand Down Expand Up @@ -7419,6 +7422,7 @@ func backupCluster(t *testing.T, c *test.Cluster, index string) (backupDir strin

buf := &bytes.Buffer{}
backupLog := logger.NewStandardLogger(buf)

backupCommand := ctl.NewBackupCommand(backupLog)
backupCommand.Host = c.Nodes[len(c.Nodes)-1].URL() // don't pick node 0 so we don't always get primary (better code coverage)
backupCommand.Index = index
Expand Down
17 changes: 15 additions & 2 deletions http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2789,8 +2789,15 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
tx, err := p.Begin(false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer tx.Rollback()

// Stream translate data to response body.
if _, err := p.WriteTo(w); err != nil {
if _, err := tx.WriteTo(w); err != nil {
h.logger.Errorf("error streaming translation data: %s", err)
}
return
Expand All @@ -2815,8 +2822,14 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
tx, err := p.Begin(false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer tx.Rollback()
// Stream translate partition to response body.
if _, err := p.WriteTo(w); err != nil {
if _, err := tx.WriteTo(w); err != nil {
h.logger.Errorf("error streaming translation data: %s", err)
}
}
Expand Down
2,983 changes: 2,976 additions & 7 deletions idk/datagen/dell.data.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
// Returns a reader from the given ID offset.
EntryReader(ctx context.Context, offset uint64) (TranslateEntryReader, error)

// WriteTo ensures that the TranslateStore implements io.WriterTo.
// It should write the contents of the store to the writer.
WriteTo(io.Writer) (int64, error)
Begin(write bool) (TranslatorTx, error)

// ReadFrom ensures that the TranslateStore implements io.ReaderFrom.
// It should read from the reader and replace the data store with
Expand All @@ -87,6 +85,16 @@ type TranslateStore interface { // TODO: refactor this interface; readonly shoul
Delete(records *roaring.Bitmap) (Commitor, error)
}

// TranslatorTx reproduces a subset of the methods on the BoltDB Tx
// object. Others may be needed in the future and we should just add
// them here. The idea is not to scatter direct references to bolt
// stuff throughout the whole codebase.
type TranslatorTx interface {
WriteTo(io.Writer) (int64, error)
Rollback() error
// e.g. Commit() error
}

// OpenTranslateStoreFunc represents a function for instantiating and opening a TranslateStore.
type OpenTranslateStoreFunc func(path, index, field string, partitionID, partitionN int, fsyncEnabled bool) (TranslateStore, error)

Expand Down
11 changes: 3 additions & 8 deletions translate_boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,9 @@ func (s *BoltTranslateStore) MaxID() (max uint64, err error) {
return max, nil
}

// WriteTo writes the contents of the store to the writer.
func (s *BoltTranslateStore) WriteTo(w io.Writer) (int64, error) {
tx, err := s.db.Begin(false)
if err != nil {
return 0, err
}
defer func() { _ = tx.Rollback() }()
return tx.WriteTo(w)
// Begin starts and returns a transaction on the underlying store.
func (s *BoltTranslateStore) Begin(write bool) (TranslatorTx, error) {
return s.db.Begin(write)
}

// ReadFrom reads the content and overwrites the existing store.
Expand Down
20 changes: 14 additions & 6 deletions translate_boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/molecula/featurebase/v3/disco"
"github.com/molecula/featurebase/v3/roaring"
"github.com/molecula/featurebase/v3/testhook"
"github.com/stretchr/testify/require"
)

//var vv = pilosa.VV
Expand Down Expand Up @@ -456,12 +457,19 @@ func TestTranslateStore_ReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
expN := s.Size()

// After this, the buffer should contain batch0.
if n, err := s.WriteTo(buf); err != nil {
t.Fatalf("writing to buffer: %s", err)
} else if n != expN {
t.Fatalf("expected buffer size: %d, but got: %d", expN, n)
}
// wrap in a func so we can defer rollback. Need rollback to
// happen before the end of the test. I'm not entirely sure
// why, but it hangs if you don't.
func() {
tx, err := s.Begin(false)
require.NoError(t, err)
defer tx.Rollback()

// After this, the buffer should contain batch0.
n, err := tx.WriteTo(buf)
require.NoError(t, err)
require.Equal(t, expN, n)
}()

// Populate the store with the keys in batch1.
batch1IDs, err := s.CreateKeys(batch1...)
Expand Down

0 comments on commit 3693b99

Please sign in to comment.