Skip to content

Commit

Permalink
Merge pull request #174 from uwedeportivo/populate_bloom
Browse files Browse the repository at this point in the history
populate bloom filter command
  • Loading branch information
uwedeportivo committed Jun 23, 2020
2 parents 5e1ae40 + cf58c8e commit 2ef4913
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 149 deletions.
26 changes: 13 additions & 13 deletions archive/archive.go
Expand Up @@ -69,10 +69,10 @@ type archiveWorker struct {
hh *Hashes
md5crcBuffer []byte
index int
pm *archiveMaster
pm *archiveGru
}

type archiveMaster struct {
type archiveGru struct {
depot *Depot
resumePath string
numWorkers int
Expand Down Expand Up @@ -189,7 +189,7 @@ func (depot *Depot) Archive(paths []string, resumePath string, includezips int,

glog.Infof("resuming with path %s", resumePoint)

pm := new(archiveMaster)
pm := new(archiveGru)
pm.depot = depot
pm.resumePath = resumePoint
pm.pt = pt
Expand All @@ -210,14 +210,14 @@ func (depot *Depot) Archive(paths []string, resumePath string, includezips int,
return worker.Work("archive roms", paths, pm)
}

func (pm *archiveMaster) Accept(path string) bool {
func (pm *archiveGru) Accept(path string) bool {
if pm.resumePath != "" {
return path > pm.resumePath
}
return true
}

func (pm *archiveMaster) NewWorker(workerIndex int) worker.Worker {
func (pm *archiveGru) NewWorker(workerIndex int) worker.Worker {
return &archiveWorker{
depot: pm.depot,
hh: newHashes(),
Expand All @@ -227,19 +227,19 @@ func (pm *archiveMaster) NewWorker(workerIndex int) worker.Worker {
}
}

func (pm *archiveMaster) CalculateWork() bool {
func (pm *archiveGru) CalculateWork() bool {
return !pm.skipInitialScan
}

func (pm *archiveMaster) NumWorkers() int {
func (pm *archiveGru) NumWorkers() int {
return pm.numWorkers
}

func (pm *archiveMaster) ProgressTracker() worker.ProgressTracker {
func (pm *archiveGru) ProgressTracker() worker.ProgressTracker {
return pm.pt
}

func (pm *archiveMaster) FinishUp() error {
func (pm *archiveGru) FinishUp() error {
pm.soFar <- &completed{
workerIndex: -1,
}
Expand All @@ -250,11 +250,11 @@ func (pm *archiveMaster) FinishUp() error {
return pm.resumeLogFile.Close()
}

func (pm *archiveMaster) Start() error {
func (pm *archiveGru) Start() error {
return nil
}

func (pm *archiveMaster) Scanned(numFiles int, numBytes int64, commonRootPath string) {}
func (pm *archiveGru) Scanned(numFiles int, numBytes int64, commonRootPath string) {}

func (depot *Depot) reserveRoot(size int64) (int, error) {
depot.lock.Lock()
Expand All @@ -278,7 +278,7 @@ func (depot *Depot) reserveRoot(size int64) (int, error) {

glog.Error("Depot with the following roots ran out of disk space")
for _, dr := range depot.roots {
glog.Errorf("root = %s, maxSize = %s, size = %s", dr.name,
glog.Errorf("root = %s, maxSize = %s, size = %s", dr.path,
humanize.IBytes(uint64(dr.maxSize)), humanize.IBytes(uint64(dr.size)))
}

Expand Down Expand Up @@ -391,7 +391,7 @@ func (w *archiveWorker) archive(ro readerOpener, name, path string, size int64,
return 0, err
}

outpath := pathFromSha1HexEncoding(w.depot.roots[root].name, sha1Hex, gzipSuffix)
outpath := pathFromSha1HexEncoding(w.depot.roots[root].path, sha1Hex, gzipSuffix)

w.depot.cache.Set(sha1Hex, &cacheValue{
hh: hh,
Expand Down
9 changes: 4 additions & 5 deletions archive/build.go
Expand Up @@ -188,15 +188,15 @@ endLoop2:
if err != nil {
return false, err
}
defer func(){
defer func() {
err := fixFile.Close()
if err != nil {
glog.Errorf("error, failed to close %s: %v", fixDatPath, err)
}
}()

fixWriter := bufio.NewWriter(fixFile)
defer func(){
defer func() {
err := fixWriter.Flush()
if err != nil {
glog.Errorf("error, failed to flush %s: %v", fixDatPath, err)
Expand All @@ -218,14 +218,13 @@ type nopWriterCloser struct {

func (nopWriterCloser) Close() error { return nil }


func cpGZUncompressed(srcName, dstName string) error {
file, err := os.Open(srcName)
if err != nil {
return err
}

defer func(){
defer func() {
err := file.Close()
if err != nil {
glog.Errorf("error closing %s: %v", srcName, err)
Expand Down Expand Up @@ -277,7 +276,7 @@ func (depot *Depot) buildGame(game *types.Game, gamePath string,
} else {
gameDir := filepath.Dir(game.Name)
if gameDir != "." {
// name has dirs in it
// path has dirs in it
err := os.MkdirAll(filepath.Dir(gamePath), 0777)
if err != nil {
glog.Errorf("error mkdir %s: %v", filepath.Dir(gamePath), err)
Expand Down
117 changes: 111 additions & 6 deletions archive/depot.go
Expand Up @@ -37,11 +37,14 @@ import (
"hash/crc32"
"io"
"os"
"path/filepath"
"strings"
"sync"

"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/klauspost/compress/gzip"
"github.com/uwedeportivo/romba/worker"

"github.com/dgraph-io/ristretto"
"github.com/uwedeportivo/romba/db"
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewDepot(roots []string, maxSize []int64, romDB db.RomDB) (*Depot, error) {
return nil, err
}
depot.roots[k] = &depotRoot{
path: root,
size: size,
maxSize: maxSize[k],
bf: bf,
Expand All @@ -104,7 +108,7 @@ func NewDepot(roots []string, maxSize []int64, romDB db.RomDB) (*Depot, error) {
glog.Info("Initializing Depot with the following roots")

for _, dr := range depot.roots {
glog.Infof("root = %s, maxSize = %s, size = %s", dr.name,
glog.Infof("root = %s, maxSize = %s, size = %s", dr.path,
humanize.IBytes(uint64(dr.maxSize)), humanize.IBytes(uint64(dr.size)))
}

Expand All @@ -118,15 +122,18 @@ func (depot *Depot) RomInDepot(sha1Hex string) (bool, string, error) {
v, hit := depot.cache.Get(sha1Hex)
if hit {
cv := v.(*cacheValue)
return true, pathFromSha1HexEncoding(depot.roots[cv.rootIndex].name,
return true, pathFromSha1HexEncoding(depot.roots[cv.rootIndex].path,
hex.EncodeToString(cv.hh.Sha1), gzipSuffix), nil
}
for _, dr := range depot.roots {
dr.Lock()
if dr.bloomReady && !dr.bf.Test([]byte(sha1Hex)) {
dr.Unlock()
return false, "", nil
}
dr.Unlock()

rompath := pathFromSha1HexEncoding(dr.name, sha1Hex, gzipSuffix)
rompath := pathFromSha1HexEncoding(dr.path, sha1Hex, gzipSuffix)
exists, err := PathExists(rompath)
if err != nil {
return false, "", err
Expand All @@ -143,15 +150,18 @@ func (depot *Depot) SHA1InDepot(sha1Hex string) (bool, *Hashes, string, int64, e
v, hit := depot.cache.Get(sha1Hex)
if hit {
cv := v.(*cacheValue)
return true, cv.hh, pathFromSha1HexEncoding(depot.roots[cv.rootIndex].name,
return true, cv.hh, pathFromSha1HexEncoding(depot.roots[cv.rootIndex].path,
hex.EncodeToString(cv.hh.Sha1), gzipSuffix), cv.hh.Size, nil
}
for idx, dr := range depot.roots {
dr.Lock()
if dr.bloomReady && !dr.bf.Test([]byte(sha1Hex)) {
dr.Unlock()
return false, nil, "", 0, nil
}
dr.Unlock()

rompath := pathFromSha1HexEncoding(dr.name, sha1Hex, gzipSuffix)
rompath := pathFromSha1HexEncoding(dr.path, sha1Hex, gzipSuffix)
exists, err := PathExists(rompath)
if err != nil {
return false, nil, "", 0, err
Expand Down Expand Up @@ -224,7 +234,7 @@ func (depot *Depot) OpenRomGZ(rom *types.Rom) (io.ReadCloser, error) {
sha1Hex := hex.EncodeToString(rom.Sha1)

for _, root := range depot.roots {
rompath := pathFromSha1HexEncoding(root.name, sha1Hex, gzipSuffix)
rompath := pathFromSha1HexEncoding(root.path, sha1Hex, gzipSuffix)
exists, err := PathExists(rompath)
if err != nil {
return nil, err
Expand All @@ -236,3 +246,98 @@ func (depot *Depot) OpenRomGZ(rom *types.Rom) (io.ReadCloser, error) {
}
return nil, nil
}

func (depot *Depot) Paths() []string {
ps := make([]string, 0, len(depot.roots))

for _, dr := range depot.roots {
ps = append(ps, dr.path)
}
return ps
}

func (depot *Depot) PopulateBloom(path string) {
parts := strings.Split(path, string(filepath.Separator))

if len(parts) < 5 {
glog.Errorf("failed to populate bloom filter for path %s: not enough dir parts", path)
return
}
n := len(parts) - 5
depotPath := string(filepath.Separator) + filepath.Join(parts[:n]...)

for _, dr := range depot.roots {
if depotPath == dr.path {
fn := parts[len(parts)-1]
sha1Hex := strings.TrimSuffix(fn, ".gz")
if len(sha1Hex) != 40 {
glog.Errorf("failed to populate bloom filter for path %s: not enough dir parts", path)
return
}
dr.Lock()
dr.bf.Add([]byte(sha1Hex))
dr.Unlock()
}
}
}

func (depot *Depot) ClearBloomFilters() error {
depot.lock.Lock()
defer depot.lock.Unlock()

for _, dr := range depot.roots {
dr.bloomReady = false
bfFilepath := filepath.Join(dr.path, bloomFilterFilename)
bfFileExists, err := PathExists(bfFilepath)
if err != nil {
return err
}
if bfFileExists {
err := os.Remove(bfFilepath)
if err != nil {
return err
}
}
}
return nil
}

func (depot *Depot) ResumePopBloomPaths() ([]worker.ResumePath, error) {
depot.lock.Lock()
defer depot.lock.Unlock()

rps := make([]worker.ResumePath, 0, len(depot.roots))

for _, dr := range depot.roots {
files, err := filepath.Glob(filepath.Join(dr.path, "resumebloom-*"))
if err != nil {
return nil, err
}

if len(files) > 1 {
return nil, fmt.Errorf("more than one resumebloom files found in %s", dr.path)
}

if len(files) == 0 {
rps = append(rps, worker.ResumePath{Path: dr.path})
continue
}

_, filename := filepath.Split(files[0])

parts := strings.Split(filename, "-")

if len(parts) != 2 || len(parts[1]) != 40 {
return nil, fmt.Errorf("resumebloom file with unexpected name %s", files[0])
}

sha1Hex := parts[1]
resumeLine := pathFromSha1HexEncoding(dr.path, sha1Hex, gzipSuffix)

// TODO(uwe): actually read in the contents of the bloom filter

rps = append(rps, worker.ResumePath{Path: dr.path, ResumeLine: resumeLine})
}

return rps, nil
}
10 changes: 5 additions & 5 deletions archive/depot_root.go
Expand Up @@ -12,7 +12,7 @@ import (
type depotRoot struct {
sync.Mutex

name string
path string
bloomReady bool
bf *bloom.BloomFilter
touched bool
Expand Down Expand Up @@ -76,18 +76,18 @@ func (depot *Depot) writeSizes() {
for _, dr := range depot.roots {
dr.Lock()
if dr.touched {
err := writeSizeFile(dr.name, dr.size)
err := writeSizeFile(dr.path, dr.size)
if err != nil {
glog.Errorf("failed to write size file into %s: %v\n", dr.name, err)
glog.Errorf("failed to write size file into %s: %v\n", dr.path, err)
} else {
dr.touched = false
}

if dr.bloomReady {
err = writeBloomFilter(dr.name, dr.bf)
err = writeBloomFilter(dr.path, dr.bf)
if err != nil {
dr.touched = true
glog.Errorf("failed to write bloomfilter into %s: %v\n", dr.name, err)
glog.Errorf("failed to write bloomfilter into %s: %v\n", dr.path, err)
}
}
}
Expand Down

0 comments on commit 2ef4913

Please sign in to comment.