Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Translation store refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Oct 9, 2019
1 parent f9e4787 commit e844e1a
Show file tree
Hide file tree
Showing 28 changed files with 1,901 additions and 2,825 deletions.
71 changes: 34 additions & 37 deletions api.go
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/url"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -540,15 +541,15 @@ func (api *API) ExportCSV(ctx context.Context, indexName string, fieldName strin
var err error

if field.keys() {
if rowStr, err = api.holder.translateFile.TranslateRowToString(index.Name(), field.Name(), rowID); err != nil {
if rowStr, err = field.translateStore.TranslateID(rowID); err != nil {
return errors.Wrap(err, "translating row")
}
} else {
rowStr = strconv.FormatUint(rowID, 10)
}

if index.Keys() {
if colStr, err = api.holder.translateFile.TranslateColumnToString(index.Name(), columnID); err != nil {
if colStr, err = index.translateStore.TranslateID(columnID); err != nil {
return errors.Wrap(err, "translating column")
}
} else {
Expand Down Expand Up @@ -944,7 +945,7 @@ func (api *API) Import(ctx context.Context, req *ImportRequest, opts ...ImportOp
if len(req.RowIDs) != 0 {
return errors.New("row ids cannot be used because field uses string keys")
}
if req.RowIDs, err = api.holder.translateFile.TranslateRowsToUint64(index.Name(), field.Name(), req.RowKeys); err != nil {
if req.RowIDs, err = field.translateStore.TranslateKeys(req.RowKeys); err != nil {
return errors.Wrap(err, "translating rows")
}
}
Expand All @@ -954,7 +955,7 @@ func (api *API) Import(ctx context.Context, req *ImportRequest, opts ...ImportOp
if len(req.ColumnIDs) != 0 {
return errors.New("column ids cannot be used because index uses string keys")
}
if req.ColumnIDs, err = api.holder.translateFile.TranslateColumnsToUint64(index.Name(), req.ColumnKeys); err != nil {
if req.ColumnIDs, err = index.translateStore.TranslateKeys(req.ColumnKeys); err != nil {
return errors.Wrap(err, "translating columns")
}
}
Expand Down Expand Up @@ -1055,7 +1056,7 @@ func (api *API) ImportValue(ctx context.Context, req *ImportValueRequest, opts .
if len(req.ColumnIDs) != 0 {
return errors.New("column ids cannot be used because index uses string keys")
}
if req.ColumnIDs, err = api.holder.translateFile.TranslateColumnsToUint64(index.Name(), req.ColumnKeys); err != nil {
if req.ColumnIDs, err = index.translateStore.TranslateKeys(req.ColumnKeys); err != nil {
return errors.Wrap(err, "translating columns")
}

Expand Down Expand Up @@ -1255,22 +1256,6 @@ func (api *API) ResizeAbort() error {
return errors.Wrap(err, "complete current job")
}

// GetTranslateData provides a reader for key translation logs starting at offset.
func (api *API) GetTranslateData(ctx context.Context, offset int64) (io.ReadCloser, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "API.GetTranslateData")
defer span.Finish()

rc, err := api.holder.translateFile.Reader(ctx, offset)
if err != nil {
return nil, errors.Wrap(err, "read from translate store")
}

// Ensure reader is closed when the client disconnects.
go func() { <-ctx.Done(); rc.Close() }()

return rc, nil
}

// State returns the cluster state which is usually "NORMAL", but could be
// "STARTING", "RESIZING", or potentially others. See cluster.go for more
// details.
Expand Down Expand Up @@ -1300,37 +1285,49 @@ func (api *API) Info() serverInfo {
}
}

// GetTranslateEntryReader provides an entry reader for key translation logs starting at offset.
func (api *API) GetTranslateEntryReader(ctx context.Context, offsets TranslateOffsetMap) (TranslateEntryReader, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "API.GetTranslateEntryReader")
defer span.Finish()
return api.holder.TranslateEntryReader(ctx, offsets)
}

// TranslateKeys handles a TranslateKeyRequest.
func (api *API) TranslateKeys(body io.Reader) ([]byte, error) {
reqBytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, NewBadRequestError(errors.Wrap(err, "read body error"))
}
func (api *API) TranslateKeys(r io.Reader) ([]byte, error) {
var req TranslateKeysRequest
if err := api.Serializer.Unmarshal(reqBytes, &req); err != nil {
return nil, NewBadRequestError(errors.Wrap(err, "unmarshal body error"))
if buf, err := ioutil.ReadAll(r); err != nil {
return nil, NewBadRequestError(errors.Wrap(err, "read translate keys request error"))
} else if err := api.Serializer.Unmarshal(buf, &req); err != nil {
return nil, NewBadRequestError(errors.Wrap(err, "unmarshal translate keys request error"))
}
var ids []uint64
if req.Field == "" {
ids, err = api.holder.translateFile.TranslateColumnsToUint64(req.Index, req.Keys)
} else {
ids, err = api.holder.translateFile.TranslateRowsToUint64(req.Index, req.Field, req.Keys)

// Lookup store for either index or field and translate keys.
store, err := api.holder.TranslateStore(req.Index, req.Field)
if err != nil {
return nil, err
}
ids, err := store.TranslateKeys(req.Keys)
if err != nil {
return nil, err
}

resp := TranslateKeysResponse{
IDs: ids,
}
// Encode response.
buf, err := api.Serializer.Marshal(&resp)
buf, err := api.Serializer.Marshal(&TranslateKeysResponse{IDs: ids})
if err != nil {
return nil, errors.Wrap(err, "translate keys response encoding error")
}
return buf, nil
}

// PrimaryReplicaNodeURL returns the URL of the cluster's primary replica.
func (api *API) PrimaryReplicaNodeURL() url.URL {
node := api.cluster.PrimaryReplicaNode()
if node == nil {
return url.URL{}
}
return node.URI.URL()
}

type serverInfo struct {
ShardWidth uint64 `json:"shardWidth"`
Memory uint64 `json:"memory"`
Expand Down
34 changes: 26 additions & 8 deletions api_test.go
Expand Up @@ -21,8 +21,11 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/pilosa/pilosa/v2"
"github.com/pilosa/pilosa/v2/boltdb"
"github.com/pilosa/pilosa/v2/http"
"github.com/pilosa/pilosa/v2/server"
"github.com/pilosa/pilosa/v2/test"
)
Expand All @@ -33,11 +36,15 @@ func TestAPI_Import(t *testing.T) {
server.OptCommandServerOptions(
pilosa.OptServerNodeID("node0"),
pilosa.OptServerClusterHasher(&offsetModHasher{}),
pilosa.OptServerOpenTranslateStore(boltdb.OpenTranslateStore),
pilosa.OptServerOpenTranslateReader(http.OpenTranslateReader),
)},
[]server.CommandOption{
server.OptCommandServerOptions(
pilosa.OptServerNodeID("node1"),
pilosa.OptServerClusterHasher(&offsetModHasher{}),
pilosa.OptServerOpenTranslateStore(boltdb.OpenTranslateStore),
pilosa.OptServerOpenTranslateReader(http.OpenTranslateReader),
)},
)
defer c.Close()
Expand Down Expand Up @@ -92,16 +99,20 @@ func TestAPI_Import(t *testing.T) {
if res, err := m0.API.Query(ctx, &pilosa.QueryRequest{Index: index, Query: pql}); err != nil {
t.Fatal(err)
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, colKeys) {
t.Fatalf("unexpected column keys: %+v", keys)
t.Fatalf("unexpected column keys: %#v", keys)
}

// Query node1.
if res, err := m1.API.Query(ctx, &pilosa.QueryRequest{Index: index, Query: pql}); err != nil {
if err := test.RetryUntil(5*time.Second, func() error {
if res, err := m1.API.Query(ctx, &pilosa.QueryRequest{Index: index, Query: pql}); err != nil {
return err
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, colKeys) {
return fmt.Errorf("unexpected column keys: %#v", keys)
}
return nil
}); err != nil {
t.Fatal(err)
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, colKeys) {
t.Fatalf("unexpected column keys: %+v", keys)
}

})

// Relies on the previous test creating an index with TrackExistence and
Expand Down Expand Up @@ -178,11 +189,13 @@ func TestAPI_ImportValue(t *testing.T) {
server.OptCommandServerOptions(
pilosa.OptServerNodeID("node0"),
pilosa.OptServerClusterHasher(&offsetModHasher{}),
pilosa.OptServerOpenTranslateReader(http.OpenTranslateReader),
)},
[]server.CommandOption{
server.OptCommandServerOptions(
pilosa.OptServerNodeID("node1"),
pilosa.OptServerClusterHasher(&offsetModHasher{}),
pilosa.OptServerOpenTranslateReader(http.OpenTranslateReader),
)},
)
defer c.Close()
Expand Down Expand Up @@ -234,10 +247,15 @@ func TestAPI_ImportValue(t *testing.T) {
}

// Query node1.
if res, err := m1.API.Query(ctx, &pilosa.QueryRequest{Index: index, Query: pql}); err != nil {
if err := test.RetryUntil(5*time.Second, func() error {
if res, err := m1.API.Query(ctx, &pilosa.QueryRequest{Index: index, Query: pql}); err != nil {
return err
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, colKeys) {
return fmt.Errorf("unexpected column keys: %+v", keys)
}
return nil
}); err != nil {
t.Fatal(err)
} else if keys := res.Results[0].(*pilosa.Row).Keys; !reflect.DeepEqual(keys, colKeys) {
t.Fatalf("unexpected column keys: %+v", keys)
}
})
}
Expand Down

0 comments on commit e844e1a

Please sign in to comment.