Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: add hashSUM file support #5352

Merged
merged 16 commits into from Jul 7, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
225 changes: 222 additions & 3 deletions fs/operations/check.go
@@ -1,16 +1,21 @@
package operations

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"regexp"
"strings"
"sync"
"sync/atomic"

"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/march"
Expand Down Expand Up @@ -56,14 +61,18 @@ type checkMarch struct {

// report outputs the fileName to out if required and to the combined log
func (c *checkMarch) report(o fs.DirEntry, out io.Writer, sigil rune) {
c.reportFilename(o.String(), out, sigil)
}

func (c *checkMarch) reportFilename(filename string, out io.Writer, sigil rune) {
if out != nil {
c.ioMu.Lock()
_, _ = fmt.Fprintf(out, "%v\n", o)
_, _ = fmt.Fprintf(out, "%s\n", filename)
c.ioMu.Unlock()
}
if c.opt.Combined != nil {
c.ioMu.Lock()
_, _ = fmt.Fprintf(c.opt.Combined, "%c %v\n", sigil, o)
_, _ = fmt.Fprintf(c.opt.Combined, "%c %s\n", sigil, filename)
c.ioMu.Unlock()
}
}
Expand Down Expand Up @@ -224,11 +233,19 @@ func CheckFn(ctx context.Context, opt *CheckOpt) error {
err := m.Run(ctx)
c.wg.Wait() // wait for background go-routines

return c.reportResults(ctx, err)
}

func (c *checkMarch) reportResults(ctx context.Context, err error) error {
if c.dstFilesMissing > 0 {
fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing)
}
if c.srcFilesMissing > 0 {
fs.Logf(c.opt.Fsrc, "%d files missing", c.srcFilesMissing)
entity := "files"
if c.opt.Fsrc == nil {
entity = "hashes"
}
fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing, entity)
}

fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors())
Expand Down Expand Up @@ -358,3 +375,205 @@ func CheckDownload(ctx context.Context, opt *CheckOpt) error {
}
return CheckFn(ctx, &optCopy)
}

// CheckSum checks filesystem hashes against a SUM file
func CheckSum(ctx context.Context, fsrc, fsum fs.Fs, sumFile string, hashType hash.Type, opt *CheckOpt, download bool) error {
var options CheckOpt
if opt != nil {
options = *opt
} else {
// default options for hashsum -c
options.Combined = os.Stdout
}
// CheckSum treats Fsrc and Fdst specially:
options.Fsrc = nil // no file system here, corresponds to the sum list
options.Fdst = fsrc // denotes the file system to check
opt = &options // override supplied argument

if hashType == hash.None || !opt.Fdst.Hashes().Contains(hashType) {
return errors.Errorf("%s: hash type is not supported by file system: %s", hashType, opt.Fdst)
}

if sumFile == "" {
return errors.Errorf("not a sum file: %s", fsum)
}
sumObj, err := fsum.NewObject(ctx, sumFile)
if err != nil {
return errors.Wrap(err, "cannot open sum file")
}
hashes, err := ParseSumFile(ctx, sumObj)
if err != nil {
return errors.Wrap(err, "failed to parse sum file")
}

ci := fs.GetConfig(ctx)
c := &checkMarch{
tokens: make(chan struct{}, ci.Checkers),
opt: *opt,
}
listErr := ListFn(ctx, opt.Fdst, func(obj fs.Object) {
c.checkSum(ctx, obj, download, hashes, hashType)
})
c.wg.Wait() // wait for background go-routines

// make census of unhandled sums
fi := filter.GetConfig(ctx)
for filename, hash := range hashes {
if hash == "" { // the sum has been successfully consumed
continue
}
if !fi.IncludeRemote(filename) { // the file was filtered out
continue
}
// filesystem missed the file, sum wasn't consumed
err := errors.Errorf("File not in %v", opt.Fdst)
fs.Errorf(filename, "%v", err)
_ = fs.CountError(err)
atomic.AddInt32(&c.differences, 1)
atomic.AddInt32(&c.dstFilesMissing, 1)
c.reportFilename(filename, opt.MissingOnDst, '+')
}

return c.reportResults(ctx, listErr)
}

// checkSum checks single object against golden hashes
func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool, hashes HashSums, hashType hash.Type) {
remote := obj.Remote()
c.ioMu.Lock()
sumHash, sumFound := hashes[remote]
hashes[remote] = "" // mark sum as consumed
ivandeex marked this conversation as resolved.
Show resolved Hide resolved
c.ioMu.Unlock()

if !sumFound && c.opt.OneWay {
return
}
if !sumFound {
err := errors.New("sum not found")
_ = fs.CountError(err)
fs.Errorf(obj, "%v", err)
atomic.AddInt32(&c.differences, 1)
atomic.AddInt32(&c.srcFilesMissing, 1)
c.report(obj, c.opt.MissingOnSrc, '-')
return
}

if !download {
objHash, err := obj.Hash(ctx, hashType)
c.matchSum(ctx, sumHash, objHash, obj, err, hashType)
return
}

c.wg.Add(1)
c.tokens <- struct{}{} // put a token to limit concurrency
go func() {
ivandeex marked this conversation as resolved.
Show resolved Hide resolved
var (
objHash string
err error
in io.ReadCloser
)
defer func() {
c.matchSum(ctx, sumHash, objHash, obj, err, hashType)
<-c.tokens // get the token back to free up a slot
c.wg.Done()
}()
if in, err = obj.Open(ctx); err != nil {
return
}
tr := accounting.Stats(ctx).NewTransfer(obj)
in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
defer func() {
tr.Done(ctx, nil) // will close the stream
}()
hashVals, err2 := hash.StreamTypes(in, hash.NewHashSet(hashType))
if err2 != nil {
err = err2 // pass to matchSum
return
}
objHash = hashVals[hashType]
}()
}

// matchSum sums up the results of hashsum matching for an object
func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj fs.Object, err error, hashType hash.Type) {
switch {
ivandeex marked this conversation as resolved.
Show resolved Hide resolved
case err != nil:
_ = fs.CountError(err)
fs.Errorf(obj, "Failed to calculate hash: %v", err)
c.report(obj, c.opt.Error, '!')
case sumHash == "":
err = errors.New("duplicate file")
_ = fs.CountError(err)
fs.Errorf(obj, "%v", err)
c.report(obj, c.opt.Error, '!')
case objHash == "":
fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
fs.Debugf(obj, "%v - could not check hash (%v)", hashType, c.opt.Fdst)
atomic.AddInt32(&c.noHashes, 1)
atomic.AddInt32(&c.matches, 1)
c.report(obj, c.opt.Match, '=')
case objHash == sumHash:
atomic.AddInt32(&c.matches, 1)
fs.Debugf(obj, "%v = %s OK", hashType, sumHash)
atomic.AddInt32(&c.matches, 1)
c.report(obj, c.opt.Match, '=')
default:
err = errors.New("files differ")
_ = fs.CountError(err)
fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
fs.Debugf(obj, "%v = %s (%v)", hashType, objHash, c.opt.Fdst)
fs.Errorf(obj, "%v", err)
atomic.AddInt32(&c.differences, 1)
c.report(obj, c.opt.Differ, '*')
}
}

// HashSums represents a parsed SUM file
type HashSums map[string]string

// ParseSumFile parses a hash SUM file and returns hashes as a map
func ParseSumFile(ctx context.Context, sumFile fs.Object) (HashSums, error) {
rd, err := sumFile.Open(ctx)
if err != nil {
return nil, err
}
parser := bufio.NewReader(rd)

const maxWarn = 4
numWarn := 0

re := regexp.MustCompile(`^([^ ]+) [ *](.+)$`)
hashes := HashSums{}
for lineNo := 0; true; lineNo++ {
lineBytes, _, err := parser.ReadLine()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

line := strings.TrimSpace(string(lineBytes))
if line == "" {
continue
}
//line = strings.TrimPrefix(line, "\\")

if fields := re.FindStringSubmatch(line); fields != nil {
hashes[fields[2]] = fields[1]
continue
}

numWarn++
if numWarn < maxWarn {
fs.Logf(sumFile, "improperly formatted checksum line %d", lineNo)
} else if numWarn == maxWarn {
fs.Logf(sumFile, "more warnings suppressed...")
}
}

if err = rd.Close(); err != nil {
return nil, err
}
return hashes, nil
}