Skip to content

Commit

Permalink
Merge pull request #3713 from pachyderm/fileset-serialization-mvp
Browse files Browse the repository at this point in the history
Fileset Serialization MVP
  • Loading branch information
brycemcanally committed May 9, 2019
2 parents b82ca81 + 8739ea8 commit add47ac
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 64 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ test-pfs-cmds:
test-pfs-storage:
go test ./src/server/pkg/storage/chunk -count 1 -timeout $(TIMEOUT)
go test ./src/server/pkg/storage/fileset/index -count 1 -timeout $(TIMEOUT)
go test ./src/server/pkg/storage/fileset -count 1 -timeout $(TIMEOUT)

test-deploy-cmds:
go test ./src/server/pkg/deploy/cmds -count 1 -timeout $(TIMEOUT)
Expand Down
16 changes: 2 additions & 14 deletions src/server/pkg/storage/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,11 @@ import (
"bytes"
"context"
"io"
"math/rand"
"testing"

"github.com/pachyderm/pachyderm/src/client/pkg/require"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// generates random sequence of data (n is number of bytes)
func randSeq(n int) []byte {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return []byte(string(b))
}

func TestWriteThenRead(t *testing.T) {
objC, chunks := LocalStorage(t)
defer func() {
Expand All @@ -35,7 +23,7 @@ func TestWriteThenRead(t *testing.T) {
finalDataRefs = append(finalDataRefs, dataRefs...)
return nil
}
seq = randSeq(100 * MB)
seq = RandSeq(100 * MB)
for i := 0; i < 100; i++ {
w.StartRange(cb)
_, err := w.Write(seq[i*MB : (i+1)*MB])
Expand Down Expand Up @@ -71,7 +59,7 @@ func BenchmarkWriter(b *testing.B) {
require.NoError(b, chunks.DeleteAll(context.Background()))
require.NoError(b, objC.Delete(context.Background(), Prefix))
}()
seq := randSeq(100 * MB)
seq := RandSeq(100 * MB)
b.SetBytes(100 * MB)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
12 changes: 12 additions & 0 deletions src/server/pkg/storage/chunk/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chunk

import (
"math/rand"
"os"
"testing"

Expand All @@ -22,3 +23,14 @@ func LocalStorage(tb testing.TB) (obj.Client, *Storage) {
require.NoError(tb, err)
return objC, NewStorage(objC, Prefix)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// RandSeq generates a random sequence of data (n is number of bytes)
func RandSeq(n int) []byte {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return []byte(string(b))
}
7 changes: 6 additions & 1 deletion src/server/pkg/storage/chunk/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
WindowSize = 64
)

var initialWindow = make([]byte, WindowSize)

// Writer splits a byte stream into content defined chunks that are hashed and deduplicated/uploaded to object storage.
// Chunk split points are determined by a bit pattern in a rolling hash function (buzhash64 at https://github.com/chmduquesne/rollinghash).
// (bryce) The chunking/hashing/uploading could be made concurrent by reading ahead a certain amount and splitting the data among chunking/hashing/uploading workers
Expand All @@ -47,7 +49,7 @@ type Writer struct {
func newWriter(ctx context.Context, objC obj.Client, prefix string) *Writer {
// Initialize buzhash64 with WindowSize window.
hash := buzhash64.New()
hash.Write(make([]byte, WindowSize))
hash.Write(initialWindow)
return &Writer{
ctx: ctx,
objC: objC,
Expand Down Expand Up @@ -80,6 +82,9 @@ func (w *Writer) finishRange() {
data := w.buf.Bytes()[lastDataRef.OffsetBytes:w.buf.Len()]
lastDataRef.Hash = hash.EncodeHash(hash.Sum(data))
w.done = append(w.done, w.dataRefs)
// Reset hash between ranges.
w.hash.Reset()
w.hash.Write(initialWindow)
}

// RangeSize returns the size of the current range.
Expand Down
105 changes: 105 additions & 0 deletions src/server/pkg/storage/fileset/fileset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package fileset

import (
"archive/tar"
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
"testing"
"time"

"github.com/pachyderm/pachyderm/src/client/pkg/require"
"github.com/pachyderm/pachyderm/src/server/pkg/storage/chunk"
"github.com/pachyderm/pachyderm/src/server/pkg/storage/fileset/index"
)

const (
max = 20 * chunk.MB
)

type file struct {
hashes []string
data []byte
}

func dataRefsToHashes(dataRefs []*chunk.DataRef) []string {
var hashes []string
for _, dataRef := range dataRefs {
if dataRef.Hash == "" {
hashes = append(hashes, dataRef.Chunk.Hash)
continue
}
hashes = append(hashes, dataRef.Hash)
}
return hashes
}

func seedStr(seed int64) string {
return fmt.Sprint("seed: ", strconv.FormatInt(seed, 10))
}

func TestWriteThenRead(t *testing.T) {
objC, chunks := chunk.LocalStorage(t)
defer func() {
chunks.DeleteAll(context.Background())
objC.Delete(context.Background(), chunk.Prefix)
}()
fileNames := index.Generate("abc")
files := make(map[string]*file)
seed := time.Now().UTC().UnixNano()
rand.Seed(seed)
for _, fileName := range fileNames {
files[fileName] = &file{
data: chunk.RandSeq(rand.Intn(max)),
}
}
// Write out ten filesets where each subsequent fileset has the content of one random file changed.
// Confirm that all of the content and hashes other than the changed file remain the same.
// (bryce) we are going to want a dedupe test somewhere, not sure if it makes sense here or in the chunk
// storage layer (probably in the chunk storage layer).
for i := 0; i < 10; i++ {
// Write files to file set.
w := NewWriter(context.Background(), chunks)
for _, fileName := range fileNames {
hdr := &index.Header{
Hdr: &tar.Header{
Name: fileName,
Size: int64(len(files[fileName].data)),
},
}
require.NoError(t, w.WriteHeader(hdr), seedStr(seed))
_, err := w.Write(files[fileName].data)
require.NoError(t, err, seedStr(seed))
}
idx, err := w.Close()
require.NoError(t, err, seedStr(seed))
// Read files from file set, checking against recorded data and hashes.
r := NewReader(context.Background(), chunks, idx, "")
for _, fileName := range fileNames {
hdr, err := r.Next()
require.NoError(t, err, seedStr(seed))
actualHashes := dataRefsToHashes(hdr.Idx.DataOp.DataRefs)
// If no hashes are recorded (first iteration or changed file),
// then set them based on what was read.
if len(files[fileName].hashes) == 0 {
files[fileName].hashes = actualHashes
}
require.Equal(t, files[fileName].hashes, actualHashes, seedStr(seed))
actualData := &bytes.Buffer{}
_, err = io.Copy(actualData, r)
require.NoError(t, err, seedStr(seed))
require.Equal(t, files[fileName].data, actualData.Bytes(), seedStr(seed))
}
// Change one random file
for fileName := range files {
files[fileName] = &file{
data: chunk.RandSeq(rand.Intn(max)),
}
break
}
require.NoError(t, chunks.DeleteAll(context.Background()), seedStr(seed))
}
}
33 changes: 2 additions & 31 deletions src/server/pkg/storage/fileset/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,18 @@ import (
"bytes"
"context"
"io"
"sort"
"strings"
"testing"

"github.com/pachyderm/pachyderm/src/client/pkg/require"
"github.com/pachyderm/pachyderm/src/server/pkg/storage/chunk"
)

// Perm calls f with each permutation of a.
func Perm(a []rune, f func([]rune)) {
perm(a, f, 0)
}

// Permute the values at index i to len(a)-1.
func perm(a []rune, f func([]rune), i int) {
if i > len(a) {
f(a)
return
}
perm(a, f, i+1)
for j := i + 1; j < len(a); j++ {
a[i], a[j] = a[j], a[i]
perm(a, f, i+1)
a[i], a[j] = a[j], a[i]
}
}

func Generate(s string) []string {
fileNames := []string{}
Perm([]rune(s), func(fileName []rune) {
fileNames = append(fileNames, string(fileName))
})
sort.Strings(fileNames)
return fileNames
}

func Write(tb testing.TB, chunks *chunk.Storage, rangeSize int64, fileNames []string) io.Reader {
iw := NewWriter(context.Background(), chunks, rangeSize)
for _, fileName := range fileNames {
hdr := &Header{
hdr: &tar.Header{Name: fileName},
Hdr: &tar.Header{Name: fileName},
}
require.NoError(tb, iw.WriteHeader(hdr))
}
Expand All @@ -63,7 +34,7 @@ func Actual(tb testing.TB, chunks *chunk.Storage, r io.Reader, prefix string) []
return result
}
require.NoError(tb, err)
result = append(result, hdr.hdr.Name)
result = append(result, hdr.Hdr.Name)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/server/pkg/storage/fileset/index/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *Reader) Next() (*Header, error) {
return nil, err
}
// Skip to the starting header.
if strings.Compare(fullHdr.idx.Range.LastPath, r.prefix) < 0 {
if strings.Compare(fullHdr.Idx.Range.LastPath, r.prefix) < 0 {
continue
}
// If a header with the prefix cannot show up after the current header,
Expand All @@ -90,7 +90,7 @@ func (r *Reader) Next() (*Header, error) {
})
}
// Set the next range.
r.levels[r.currLevel].cr.NextRange(fullHdr.idx.DataOp.DataRefs)
r.levels[r.currLevel].cr.NextRange(fullHdr.Idx.DataOp.DataRefs)
r.levels[r.currLevel].tr = tar.NewReader(r.levels[r.currLevel].cr)
}
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func deserialize(tr *tar.Reader, hdr *tar.Header) (*Header, error) {
return nil, err
}
return &Header{
hdr: hdr,
idx: idx,
Hdr: hdr,
Idx: idx,
}, nil
}
32 changes: 32 additions & 0 deletions src/server/pkg/storage/fileset/index/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package index

import "sort"

// Perm calls f with each permutation of a.
func Perm(a []rune, f func([]rune)) {
perm(a, f, 0)
}

// Permute the values at index i to len(a)-1.
func perm(a []rune, f func([]rune), i int) {
if i > len(a) {
f(a)
return
}
perm(a, f, i+1)
for j := i + 1; j < len(a); j++ {
a[i], a[j] = a[j], a[i]
perm(a, f, i+1)
a[i], a[j] = a[j], a[i]
}
}

// Generate generates the permutations of the passed in string and returns them sorted.
func Generate(s string) []string {
fileNames := []string{}
Perm([]rune(s), func(fileName []rune) {
fileNames = append(fileNames, string(fileName))
})
sort.Strings(fileNames)
return fileNames
}

0 comments on commit add47ac

Please sign in to comment.