Skip to content

Commit

Permalink
Implement streaming chunker using io.Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
fd0 committed Feb 8, 2015
1 parent a5c33d8 commit bda33e6
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 144 deletions.
120 changes: 46 additions & 74 deletions archiver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package restic

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand All @@ -15,8 +16,9 @@ import (
)

const (
maxConcurrentFiles = 8
maxConcurrentBlobs = 8
maxConcurrentFiles = 16
maxConcurrentBlobs = 16
chunkerBufSize = 512 * chunker.KiB
)

type Archiver struct {
Expand Down Expand Up @@ -61,10 +63,7 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) {
return arch, nil
}

func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
// compute plaintext hash
id := backend.Hash(data)

func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str())

// test if this blob is already known
Expand All @@ -76,7 +75,7 @@ func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
}

// else encrypt and save data
blob, err = arch.s.Save(t, data, id)
blob, err = arch.s.SaveFrom(t, id, length, rd)

// store blob in storage map
smapblob := arch.m.Insert(blob)
Expand Down Expand Up @@ -161,94 +160,67 @@ func (arch *Archiver) SaveFile(node *Node) (Blobs, error) {

var blobs Blobs

// if the file is small enough, store it directly
if node.Size < chunker.MinSize {
// acquire token
token := <-arch.blobToken
defer func() {
arch.blobToken <- token
}()
// store all chunks
chnker := chunker.New(file, chunkerBufSize, sha256.New)
chans := [](<-chan Blob){}
//defer chnker.Free()

buf := GetChunkBuf("blob single file")
defer FreeChunkBuf("blob single file", buf)
n, err := io.ReadFull(file, buf)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return nil, arrar.Annotate(err, "SaveFile() read small file")
}
chunks := 0

for {
buf := GetChunkBuf("blob chunker")
chunk, err := chnker.Next()
if err == io.EOF {
// use empty blob list for empty files
blobs = Blobs{}
} else {
blob, err := arch.Save(backend.Data, buf[:n])
if err != nil {
return nil, arrar.Annotate(err, "SaveFile() save chunk")
}

arch.p.Report(Stat{Bytes: blob.Size})

blobs = Blobs{blob}
FreeChunkBuf("blob chunker", buf)
break
}
} else {
// else store all chunks
chnker := chunker.New(file)
chans := [](<-chan Blob){}
defer chnker.Free()

chunks := 0

for {
buf := GetChunkBuf("blob chunker")
chunk, err := chnker.Next(buf)
if err == io.EOF {
FreeChunkBuf("blob chunker", buf)
break
}

if err != nil {
FreeChunkBuf("blob chunker", buf)
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
}
if err != nil {
FreeChunkBuf("blob chunker", buf)
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
}

chunks++
chunks++

// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan Blob, 1)
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan Blob, 1)

go func(ch chan<- Blob) {
blob, err := arch.Save(backend.Data, chunk.Data)
// TODO handle error
if err != nil {
panic(err)
}
go func(ch chan<- Blob) {
blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
panic(err)
}

FreeChunkBuf("blob chunker", buf)
FreeChunkBuf("blob chunker", buf)

arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)
arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)

chans = append(chans, resCh)
}
chans = append(chans, resCh)
}

blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
}
blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
}

if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
}
if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
}

var bytes uint64

node.Content = make([]backend.ID, len(blobs))
debug.Log("Archiver.Save", "checking size for file %s", node.path)
for i, blob := range blobs {
node.Content[i] = blob.ID
bytes += blob.Size

debug.Log("Archiver.Save", " adding blob %s", blob)
}

if bytes != node.Size {
Expand Down
20 changes: 15 additions & 5 deletions archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restic_test

import (
"bytes"
"crypto/sha256"
"io"
"math/rand"
"testing"
Expand All @@ -25,31 +26,40 @@ func get_random(seed, count int) []byte {
return buf
}

const bufSize = chunker.MiB

func BenchmarkChunkEncrypt(b *testing.B) {
data := get_random(23, 10<<20) // 10MiB
rd := bytes.NewReader(data)

be := setupBackend(b)
defer teardownBackend(b, be)
key := setupKey(b, be, "geheim")
chunkBuf := make([]byte, chunker.MaxSize)
chunkBuf := make([]byte, restic.CiphertextExtension+chunker.MaxSize)

b.ResetTimer()
b.SetBytes(int64(len(data)))

for i := 0; i < b.N; i++ {
ch := chunker.New(bytes.NewReader(data))
rd.Seek(0, 0)
ch := chunker.New(rd, bufSize, sha256.New)

for {
chunk_data, err := ch.Next(chunkBuf)
chunk, err := ch.Next()

if err == io.EOF {
break
}

ok(b, err)

buf := make([]byte, restic.CiphertextExtension+chunker.MaxSize)
_, err = key.Encrypt(buf, chunk_data.Data)
// reduce length of chunkBuf
chunkBuf = chunkBuf[:chunk.Length]
n, err := io.ReadFull(chunk.Reader(rd), chunkBuf)
ok(b, err)
assert(b, uint(n) == chunk.Length, "invalid length: got %d, expected %d", n, chunk.Length)

_, err = key.Encrypt(chunkBuf, chunkBuf)
ok(b, err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (b Blob) Valid() bool {
}

func (b Blob) String() string {
return fmt.Sprintf("Blob<%s -> %s>",
b.ID.Str(),
b.Storage.Str())
return fmt.Sprintf("Blob<%s (%d) -> %s (%d)>",
b.ID.Str(), b.Size,
b.Storage.Str(), b.StorageSize)
}
Loading

0 comments on commit bda33e6

Please sign in to comment.