Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from stripe/manifests
Browse files Browse the repository at this point in the history
Add the ability to save and read manifests
  • Loading branch information
colinmarc committed Mar 12, 2015
2 parents 4d0648a + 813bcf8 commit 9cf5aff
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
@@ -1,2 +1,4 @@
sequins
testdata
.index
.manifest
2 changes: 1 addition & 1 deletion hdfs_sequins_test.go
Expand Up @@ -3,8 +3,8 @@ package main
import (
"encoding/json"
"github.com/colinmarc/hdfs"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stripe/sequins/backend"
"io/ioutil"
"net/http"
Expand Down
127 changes: 109 additions & 18 deletions index/index.go
Expand Up @@ -43,16 +43,96 @@ func New(path string) *Index {
return &index
}

// BuildIndex reads each of the files and adds a key -> (file, offset) pair
// to the master index for every key in every file.
func (index *Index) BuildIndex() error {
// Load reads each of the files and adds a key -> (file, offset) pair to the
// master index for every key in every file. If a manifest file and index
// already exist in the directory, it'll use that.
func (index *Index) Load() error {
err := index.buildFileList()
if err != nil {
return err
}

// Try loading and checking the manifest first.
manifestPath := filepath.Join(index.Path, ".manifest")
manifest, err := readManifest(manifestPath)
if err == nil {
log.Println("Loading index from existing manifest at", manifestPath)
err = index.loadIndexFromManifest(manifest)
if err != nil {
log.Println("Failed to load existing manifest with error:", err)
} else {
index.Ready = true
return nil
}
}

err = index.buildNewIndex()
if err != nil {
return err
}

index.Ready = true
return nil
}

func (index *Index) buildFileList() error {
infos, err := ioutil.ReadDir(index.Path)
if err != nil {
return err
}

index.files = make([]indexFile, 0, len(infos))
index.readLocks = make([]sync.Mutex, len(infos))

for _, info := range infos {
if !info.IsDir() && !strings.HasPrefix(info.Name(), "_") && !strings.HasPrefix(info.Name(), ".") {
err := index.addFile(info.Name())
if err != nil {
return err
}
}
}

return nil
}

func (index *Index) loadIndexFromManifest(m manifest) error {
for i, entry := range m.Files {
indexFile := index.files[i]
baseName := filepath.Base(indexFile.file.Name())
if baseName != filepath.Base(entry.Name) {
return fmt.Errorf("unmatched file: %s", entry.Name)
}

crc, err := fileCrc(indexFile.file.Name())
if err != nil {
return err
}

if crc != entry.CRC {
return fmt.Errorf("local file %s has an invalid CRC, according to the manifest", baseName)
}
}

indexPath := filepath.Join(index.Path, ".index")
err = os.RemoveAll(indexPath)
info, err := os.Stat(indexPath)
if err != nil || !info.IsDir() {
return fmt.Errorf("missing or invalid ldb index at %s", indexPath)
}

ldb, err := leveldb.OpenFile(indexPath, nil)
if err != nil {
return err
}

index.ldb = ldb
index.count = m.Count
return nil
}

func (index *Index) buildNewIndex() error {
indexPath := filepath.Join(index.Path, ".index")
err := os.RemoveAll(indexPath)
if err != nil {
return err
}
Expand All @@ -76,29 +156,40 @@ func (index *Index) BuildIndex() error {
log.Println("Finished indexing", path)
}

index.Ready = true
manifest, err := index.buildManifest()
if err != nil {
return fmt.Errorf("error building manifest: %s", err)
}

manifestPath := filepath.Join(index.Path, ".manifest")
log.Println("Writing manifest file to", manifestPath)
err = writeManifest(manifestPath, manifest)
if err != nil {
return fmt.Errorf("error writing manifest: %s", err)
}

return nil
}

func (index *Index) buildFileList() error {
infos, err := ioutil.ReadDir(index.Path)
if err != nil {
return err
func (index *Index) buildManifest() (manifest, error) {
m := manifest{
Files: make([]manifestEntry, len(index.files)),
Count: index.count,
}

index.files = make([]indexFile, 0, len(infos))
index.readLocks = make([]sync.Mutex, len(infos))
for i, f := range index.files {
crc, err := fileCrc(f.file.Name())
if err != nil {
return m, err
}

for _, info := range infos {
if !info.IsDir() && !strings.HasPrefix(info.Name(), "_") {
err := index.addFile(info.Name())
if err != nil {
return err
}
m.Files[i] = manifestEntry{
Name: filepath.Base(f.file.Name()),
CRC: crc,
}
}

return nil
return m, nil
}

func (index *Index) addFile(subPath string) error {
Expand Down
35 changes: 27 additions & 8 deletions index/index_test.go
Expand Up @@ -3,30 +3,49 @@ package index
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"os"
"testing"
)

func TestDB(t *testing.T) {
func TestIndex(t *testing.T) {
os.Remove("../test_data/0/.manifest")
index := New("../test_data/0")
err := index.BuildIndex()
require.Nil(t, err)
if err != nil {
t.FailNow()
}
err := index.Load()
require.NoError(t, err)

assert.Equal(t, index.Path, "../test_data/0")
assert.Equal(t, len(index.files), 2)
assert.Equal(t, index.files[0].file.Name(), "../test_data/0/part-00000")
assert.Equal(t, index.files[1].file.Name(), "../test_data/0/part-00001")

val, err := index.Get("Alice")
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, string(val), "Practice")

val, err = index.Get("foo")
assert.Equal(t, ErrNotFound, err)

count, err := index.Count()
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, 3, count)
}

func TestIndexManifest(t *testing.T) {
os.Remove("../test_data/0/.manifest")
index := New("../test_data/0")
err := index.Load()
require.NoError(t, err)

count, err := index.Count()
require.NoError(t, err)
assert.Equal(t, 3, count)

index.Close()
index = New("../test_data/0")
err = index.Load()
require.NoError(t, err)

newCount, err := index.Count()
require.NoError(t, err)
assert.Equal(t, count, newCount)
}
68 changes: 68 additions & 0 deletions index/manifest.go
@@ -0,0 +1,68 @@
package index

import (
"encoding/json"
"hash/crc32"
"io"
"io/ioutil"
"os"
)

type manifest struct {
Files []manifestEntry `json:"files"`
Count int `json:"count"`
}

type manifestEntry struct {
Name string `json:"name"`
CRC uint32 `json:"crc"`
}

func readManifest(path string) (manifest, error) {
m := manifest{}

reader, err := os.Open(path)
if err != nil {
return m, err
}

defer reader.Close()
bytes, err := ioutil.ReadAll(reader)
if err != nil {
return m, err
}

err = json.Unmarshal(bytes, &m)
return m, err
}

func writeManifest(path string, m manifest) error {
bytes, err := json.Marshal(m)
if err != nil {
return err
}

writer, err := os.Create(path)
if err != nil {
return err
}

defer writer.Close()
_, err = writer.Write(bytes)
return err
}

func fileCrc(path string) (uint32, error) {
file, err := os.Open(path)
if err != nil {
return 0, err
}

hash := crc32.NewIEEE()
_, err = io.Copy(hash, file)
if err != nil {
return 0, err
}

return hash.Sum32(), nil
}
10 changes: 5 additions & 5 deletions sequins.go
Expand Up @@ -91,23 +91,23 @@ func (s *sequins) refresh() error {
}

if os.IsExist(err) {
log.Printf("Version %s is already downloaded.", version)
log.Printf("Version %s is already downloaded", version)
} else {
log.Printf("Downloading version %s from %s.", version, s.backend.DisplayPath(version))
log.Printf("Downloading version %s from %s", version, s.backend.DisplayPath(version))
err = s.backend.Download(version, path)
if err != nil {
return err
}
}

log.Printf("Building index over version %s at %s.", version, path)
log.Printf("Preparing version %s at %s", version, path)
index := index.New(path)
err = index.BuildIndex()
err = index.Load()
if err != nil {
return fmt.Errorf("Error while indexing: %s", err)
}

log.Println("Switching to new version!")
log.Printf("Switching to version %s!", version)
oldIndex := s.index
s.currentVersion = version
s.index = index
Expand Down
5 changes: 4 additions & 1 deletion sequins_test.go
Expand Up @@ -7,11 +7,14 @@ import (
"github.com/stripe/sequins/backend"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
)

func getSequins(t *testing.T, opts sequinsOptions) *sequins {
os.RemoveAll("test_data/0/.manifest")
os.RemoveAll("test_data/1/.manifest")
backend := backend.NewLocalBackend("test_data")
s := newSequins(backend, opts)

Expand Down Expand Up @@ -51,7 +54,7 @@ func TestSequins(t *testing.T) {
status := &status{}
err := json.Unmarshal(w.Body.Bytes(), status)
require.NoError(t, err)
assert.Equal(t, 200, w.Code, 200)
assert.Equal(t, 200, w.Code)
assert.Equal(t, "test_data/1", status.Path)
assert.True(t, status.Started >= now)
assert.Equal(t, 3, status.Count)
Expand Down

0 comments on commit 9cf5aff

Please sign in to comment.