Skip to content

Commit

Permalink
Applies PR #7180 (#7208)
Browse files Browse the repository at this point in the history
  • Loading branch information
brendoncarroll committed Jan 4, 2022
1 parent 0a3c12e commit 224506f
Show file tree
Hide file tree
Showing 11 changed files with 917 additions and 198 deletions.
3 changes: 3 additions & 0 deletions src/client/transaction.go
Expand Up @@ -419,6 +419,9 @@ func (c *pfsBuilderClient) RunLoadTest(ctx context.Context, req *pfs.RunLoadTest
func (c *pfsBuilderClient) RunLoadTestDefault(ctx context.Context, req *types.Empty, opts ...grpc.CallOption) (*pfs.RunLoadTestResponse, error) {
return nil, unsupportedError("RunLoadTestDefault")
}
func (c *pfsBuilderClient) CheckStorage(ctx context.Context, req *pfs.CheckStorageRequest, opts ...grpc.CallOption) (*pfs.CheckStorageResponse, error) {
return nil, unsupportedError("CheckStorage")
}

func (c *ppsBuilderClient) InspectJobSet(ctx context.Context, req *pps.InspectJobSetRequest, opts ...grpc.CallOption) (pps.API_InspectJobSetClient, error) {
return nil, unsupportedError("InspectJobSet")
Expand Down
1 change: 1 addition & 0 deletions src/internal/middleware/auth/interceptor.go
Expand Up @@ -160,6 +160,7 @@ var authHandlers = map[string]authHandler{
"/pfs_v2.API/ComposeFileSet": authDisabledOr(authenticated),
"/pfs_v2.API/RunLoadTest": authDisabledOr(authenticated),
"/pfs_v2.API/RunLoadTestDefault": authDisabledOr(authenticated),
"/pfs_v2.API/CheckStorage": authDisabledOr(authenticated),

//
// PPS API
Expand Down
25 changes: 25 additions & 0 deletions src/internal/storage/chunk/chunk_test.go
Expand Up @@ -98,6 +98,31 @@ func TestCopy(t *testing.T) {
}
}

func TestCheck(t *testing.T) {
ctx := context.Background()
objC, chunks := newTestStorage(t)
writeRandom(t, chunks)
n, err := chunks.Check(ctx, nil, nil, false)
require.NoError(t, err)
require.True(t, n > 0)
deleteOne(t, objC)
_, err = chunks.Check(ctx, nil, nil, false)
require.YesError(t, err)
}

func deleteOne(t testing.TB, objC obj.Client) {
ctx := context.Background()
done := false
err := objC.Walk(ctx, "", func(p string) error {
if !done {
require.NoError(t, objC.Delete(ctx, p))
done = true
}
return nil
})
require.NoError(t, err)
}

func BenchmarkWriter(b *testing.B) {
_, chunks := newTestStorage(b)
seed := time.Now().UTC().UnixNano()
Expand Down
145 changes: 126 additions & 19 deletions src/internal/storage/chunk/client.go
@@ -1,6 +1,7 @@
package chunk

import (
"bytes"
"context"
"database/sql"
fmt "fmt"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/pachyderm/pachyderm/v2/src/internal/dbutil"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/pacherr"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/kv"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/track"
)
Expand Down Expand Up @@ -50,14 +52,35 @@ func (c *trackedClient) Create(ctx context.Context, md Metadata, chunkData []byt
panic("client must have a renewer to create chunks")
}
chunkID := Hash(chunkData)
needUpload, gen, err := c.beforeUpload(ctx, chunkID, md)
if err != nil {
return nil, err
}
if err := c.renewer.Add(ctx, chunkID); err != nil {
return nil, err
}
if !needUpload {
return chunkID, nil
}
key := chunkKey(chunkID, gen)
if err := c.store.Put(ctx, key, chunkData); err != nil {
return nil, err
}
if err := c.afterUpload(ctx, chunkID, gen); err != nil {
return nil, err
}
return chunkID, nil
}

// beforeUpload checks the table in postgres to see if a chunk with chunkID already exists.
func (c *trackedClient) beforeUpload(ctx context.Context, chunkID ID, md Metadata) (needUpload bool, gen uint64, _ error) {
var pointsTo []string
for _, cid := range md.PointsTo {
pointsTo = append(pointsTo, cid.TrackerID())
}
chunkTID := chunkID.TrackerID()
var needUpload bool
var gen uint64
if err := dbutil.WithTx(ctx, c.db, func(tx *sqlx.Tx) error {
if err := dbutil.WithTx(ctx, c.db, func(tx *sqlx.Tx) (retErr error) {
needUpload, gen = false, 0
if err := c.tracker.CreateTx(tx, chunkTID, pointsTo, c.ttl); err != nil {
return err
}
Expand All @@ -82,31 +105,36 @@ func (c *trackedClient) Create(ctx context.Context, md Metadata, chunkData []byt
needUpload = true
return nil
}); err != nil {
return nil, err
}
if err := c.renewer.Add(ctx, chunkID); err != nil {
return nil, err
}
if !needUpload {
return chunkID, nil
}
key := chunkKey(chunkID, gen)
if err := c.store.Put(ctx, key, chunkData); err != nil {
return nil, err
return false, 0, err
}
_, err := c.db.ExecContext(ctx, `
return needUpload, gen, nil
}

// afterUpload marks the (chunkID, gen) pair as a successfully uploaded object in postgres.
func (c *trackedClient) afterUpload(ctx context.Context, chunkID ID, gen uint64) error {
res, err := c.db.ExecContext(ctx, `
UPDATE storage.chunk_objects
SET uploaded = TRUE
WHERE chunk_id = $1 AND gen = $2
`, chunkID, gen)
if err != nil {
return nil, err
return err
}
return chunkID, nil
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected < 1 {
return errors.Errorf("no chunk entry for object post upload: chunk=%v gen=%v", chunkID, gen)
}
if affected > 1 {
panic("(chunk_id, gen) is not unique")
}
return nil
}

// Get writes data for a chunk with ID chunkID to w.
func (c *trackedClient) Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) (retErr error) {
func (c *trackedClient) Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) error {
var gen uint64
err := c.db.Get(&gen, `
SELECT gen
Expand All @@ -132,6 +160,81 @@ func (c *trackedClient) Close() error {
return nil
}

// Check checks that there are entries for the chunk and they have successfully uploaded objects.
func (c *trackedClient) Check(ctx context.Context, id ID, readChunk bool) error {
_, last, err := c.CheckEntries(ctx, id, 1, readChunk)
if err != nil {
return err
}
if last == nil {
return errors.Errorf("no entries for chunk %v", id)
}
return nil
}

// CheckEntries runs an integrity check on the objects in object storage.
// It lists through chunks with IDs >= first, lexicographically.
func (c *trackedClient) CheckEntries(ctx context.Context, first []byte, limit int, readChunks bool) (n int, last ID, _ error) {
if first == nil {
first = []byte{} // in SQL: nothing is comparable to nil
}
var ents []Entry
if err := c.db.SelectContext(ctx, &ents,
`SELECT chunk_id, gen, uploaded, tombstone FROM storage.chunk_objects
WHERE chunk_id >= $1 AND uploaded = true AND tombstone = false
ORDER BY chunk_id
LIMIT $2
`, first, limit); err != nil {
return 0, nil, err
}
for _, ent := range ents {
if readChunks {
if err := c.store.Get(ctx, chunkKey(ent.ChunkID, ent.Gen), func(data []byte) error {
return verifyData(ent.ChunkID, data)
}); err != nil {
if pacherr.IsNotExist(err) {
if exists, err := c.entryExists(ctx, ent.ChunkID, ent.Gen); err != nil {
return n, nil, err
} else if exists {
return n, nil, newErrMissingObject(ent)
}
}
}
} else {
exists, err := c.store.Exists(ctx, chunkKey(ent.ChunkID, ent.Gen))
if err != nil {
return n, nil, err
}
if !exists {
if exists2, err := c.entryExists(ctx, ent.ChunkID, ent.Gen); err != nil {
return n, nil, err
} else if exists2 {
return n, nil, newErrMissingObject(ent)
}
}
}
last = ent.ChunkID
n++
}
if n == limit && bytes.Equal(first, last) {
return n, nil, errors.Errorf("limit too small to check all chunk entries limit=%d", limit)
}
return n, last, nil
}

func (c *trackedClient) entryExists(ctx context.Context, chunkID ID, gen uint64) (bool, error) {
var x int
err := c.db.GetContext(ctx, &x, `SELECT FROM storage.chunk_objects WHERE chunk_id = $1 AND gen = $2`, chunkID, gen)
switch err {
case sql.ErrNoRows:
return false, nil
case nil:
return true, nil
default:
return false, err
}
}

func chunkPath(chunkID ID, gen uint64) string {
if len(chunkID) == 0 {
panic("chunkID cannot be empty")
Expand All @@ -143,14 +246,18 @@ func chunkKey(chunkID ID, gen uint64) []byte {
return []byte(chunkPath(chunkID, gen))
}

func newErrMissingObject(ent Entry) error {
return errors.Errorf("missing object for chunk entry: chunkID=%v gen=%v uploaded=%v tombstone=%v", ent.ChunkID, ent.Gen, ent.Uploaded, ent.Tombstone)
}

var _ track.Deleter = &deleter{}

type deleter struct{}

func (d *deleter) DeleteTx(tx *sqlx.Tx, id string) error {
chunkID, err := ParseTrackerID(id)
if err != nil {
return errors.Wrapf(err, "cannot delete")
return errors.Wrapf(err, "deleting chunk")
}
_, err = tx.Exec(`
UPDATE storage.chunk_objects
Expand Down
5 changes: 3 additions & 2 deletions src/internal/storage/chunk/gc_test.go
Expand Up @@ -21,7 +21,7 @@ func TestGC(t *testing.T) {
tracker := track.NewTestTracker(t, db)
oc, s := NewTestStorage(t, db, tracker)

writeRandom(ctx, t, s)
writeRandom(t, s)
count, err := countObjects(ctx, oc)
require.NoError(t, err)
require.True(t, count > 0)
Expand Down Expand Up @@ -64,7 +64,8 @@ func countObjects(ctx context.Context, client obj.Client) (int, error) {
return count, nil
}

func writeRandom(ctx context.Context, t testing.TB, s *Storage) {
func writeRandom(t testing.TB, s *Storage) {
ctx := context.Background()
const seed = 10
const size = 1e8
rng := rand.New(rand.NewSource(seed))
Expand Down
33 changes: 33 additions & 0 deletions src/internal/storage/chunk/storage.go
@@ -1,6 +1,7 @@
package chunk

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -81,3 +82,35 @@ func (s *Storage) List(ctx context.Context, cb func(id ID) error) error {
func (s *Storage) NewDeleter() track.Deleter {
return &deleter{}
}

// Check runs an integrity check on the objects in object storage.
// It will check objects for chunks with IDs in the range [first, last)
// As a special case: if len(end) == 0 then it is ignored.
func (s *Storage) Check(ctx context.Context, begin, end []byte, readChunks bool) (int, error) {
c := NewClient(s.store, s.db, s.tracker, nil).(*trackedClient)
first := append([]byte{}, begin...)
var count int
for {
n, last, err := c.CheckEntries(ctx, first, 100, readChunks)
count += n
if err != nil {
return count, err
}
if last == nil {
break
}
if len(end) > 0 && bytes.Compare(last, end) > 0 {
break
}
first = keyAfter(last)
}
return count, nil
}

// keyAfter returns a byte slice ordered immediately after x lexicographically
// the motivating use case is iteration.
func keyAfter(x []byte) []byte {
y := append([]byte{}, x...)
y = append(y, 0x00)
return y
}
10 changes: 10 additions & 0 deletions src/internal/testpachd/mock_pachd.go
Expand Up @@ -442,6 +442,7 @@ type renewFileSetFunc func(context.Context, *pfs.RenewFileSetRequest) (*types.Em
type composeFileSetFunc func(context.Context, *pfs.ComposeFileSetRequest) (*pfs.CreateFileSetResponse, error)
type runLoadTestFunc func(context.Context, *pfs.RunLoadTestRequest) (*pfs.RunLoadTestResponse, error)
type runLoadTestDefaultFunc func(context.Context, *types.Empty) (*pfs.RunLoadTestResponse, error)
type checkStorageFunc func(context.Context, *pfs.CheckStorageRequest) (*pfs.CheckStorageResponse, error)

type mockActivateAuthPFS struct{ handler activateAuthPFSFunc }
type mockCreateRepo struct{ handler createRepoFunc }
Expand Down Expand Up @@ -479,6 +480,7 @@ type mockRenewFileSet struct{ handler renewFileSetFunc }
type mockComposeFileSet struct{ handler composeFileSetFunc }
type mockRunLoadTest struct{ handler runLoadTestFunc }
type mockRunLoadTestDefault struct{ handler runLoadTestDefaultFunc }
type mockCheckStorage struct{ handler checkStorageFunc }

func (mock *mockActivateAuthPFS) Use(cb activateAuthPFSFunc) { mock.handler = cb }
func (mock *mockCreateRepo) Use(cb createRepoFunc) { mock.handler = cb }
Expand Down Expand Up @@ -516,6 +518,7 @@ func (mock *mockRenewFileSet) Use(cb renewFileSetFunc) { mock.handle
func (mock *mockComposeFileSet) Use(cb composeFileSetFunc) { mock.handler = cb }
func (mock *mockRunLoadTest) Use(cb runLoadTestFunc) { mock.handler = cb }
func (mock *mockRunLoadTestDefault) Use(cb runLoadTestDefaultFunc) { mock.handler = cb }
func (mock *mockCheckStorage) Use(cb checkStorageFunc) { mock.handler = cb }

type pfsServerAPI struct {
mock *mockPFSServer
Expand Down Expand Up @@ -559,6 +562,7 @@ type mockPFSServer struct {
ComposeFileSet mockComposeFileSet
RunLoadTest mockRunLoadTest
RunLoadTestDefault mockRunLoadTestDefault
CheckStorage mockCheckStorage
}

func (api *pfsServerAPI) ActivateAuth(ctx context.Context, req *pfs.ActivateAuthRequest) (*pfs.ActivateAuthResponse, error) {
Expand Down Expand Up @@ -777,6 +781,12 @@ func (api *pfsServerAPI) RunLoadTestDefault(ctx context.Context, req *types.Empt
}
return nil, errors.Errorf("unhandled pachd mock pfs.RunLoadTestDefault")
}
func (api *pfsServerAPI) CheckStorage(ctx context.Context, req *pfs.CheckStorageRequest) (*pfs.CheckStorageResponse, error) {
if api.mock.CheckStorage.handler != nil {
return api.mock.CheckStorage.handler(ctx, req)
}
return nil, errors.Errorf("unhandled pachd mock CheckStorage")
}

/* PPS Server Mocks */

Expand Down

0 comments on commit 224506f

Please sign in to comment.