Skip to content
This repository has been archived by the owner on Aug 15, 2022. It is now read-only.

Commit

Permalink
klient/machine: use workers to add index entries
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawel Knap committed Jan 28, 2017
1 parent 0f98765 commit 5cc12a0
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions go/src/koding/klient/machine/index/index.go
Expand Up @@ -115,8 +115,6 @@ func (cs ChangeSlice) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
// Index stores a virtual working tree state. It recursively records objects in
// a given root path and allows to efficiently detect changes on it.
type Index struct {
limitC chan struct{}

mu sync.RWMutex
entries map[string]*Entry
}
Expand All @@ -129,18 +127,33 @@ var (
// NewIndex creates the empty index object.
func NewIndex() *Index {
return &Index{
limitC: make(chan struct{}, 2*runtime.NumCPU()),
entries: make(map[string]*Entry, 0),
}
}

type fileDesc struct {
path string // relative path to the file.
info os.FileInfo // file LStat result.
}

// NewIndexFiles walks the given file tree roted at root and records file
// states to resulting Index object.
func NewIndexFiles(root string) (*Index, error) {
idx := NewIndex()

// In order to get as much entries as we can we ignore errors.
// Start worker pool.
var wg sync.WaitGroup
fC := make(chan *fileDesc)
for i := 0; i < 2*runtime.NumCPU(); i++ {
wg.Add(1)
go idx.addEntryWorker(root, &wg, fC)
}
defer func() {
close(fC)
wg.Wait()
}()

// In order to get as much entries as we can we ignore errors.
walkFn := func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
Expand All @@ -151,39 +164,31 @@ func NewIndexFiles(root string) (*Index, error) {
return nil
}

wg.Add(1)
idx.addEntry(&wg, root, path, info)
fC <- &fileDesc{path: path, info: info}
return nil
}

if err := filepath.Walk(root, walkFn); err != nil {
return nil, err
}

wg.Wait()
return idx, nil
}

// addEntry asynchronously adds new entry to index. This function has limits
// to a number of started go-routines. Errors are ignored.
func (idx *Index) addEntry(wg *sync.WaitGroup, root, path string, info os.FileInfo) {
idx.limitC <- struct{}{}
// addEntryWorker asynchronously adds new entry to index. Errors are ignored.
func (idx *Index) addEntryWorker(root string, wg *sync.WaitGroup, fC <-chan *fileDesc) {
defer wg.Done()

go func() {
defer func() {
<-idx.limitC
wg.Done()
}()

entry, err := NewEntryFile(root, path, info)
for f := range fC {
entry, err := NewEntryFile(root, f.path, f.info)
if err != nil {
return
continue
}

idx.mu.Lock()
idx.entries[entry.Name] = entry
idx.mu.Unlock()
}()
}
}

// Count returns the number of entries stored in index. Only items which size is
Expand Down Expand Up @@ -332,7 +337,13 @@ func ctime(fi os.FileInfo) int64 {
// guarantee that changes from Compare function applied to the index will
// result in actual directory state.
func (idx *Index) Apply(root string, cs ChangeSlice) {
// Start worker pool.
var wg sync.WaitGroup
fC := make(chan *fileDesc)
for i := 0; i < naturalMin(2*runtime.NumCPU(), len(cs)); i++ {
wg.Add(1)
go idx.addEntryWorker(root, &wg, fC)
}

for i := range cs {
switch {
Expand Down Expand Up @@ -361,14 +372,28 @@ func (idx *Index) Apply(root string, cs ChangeSlice) {
continue
}

wg.Add(1)
idx.addEntry(&wg, root, path, info)
fC <- &fileDesc{path: path, info: info}
}
}

close(fC)
wg.Wait()
}

// naturalMin returns the minimal value of provided arguments but not less than
// one.
func naturalMin(a, b int) (n int) {
if n = b; a < b {
n = a
}

if n < 1 {
return 1
}

return n
}

// MarshalJSON satisfies json.Marshaler interface. It safely marshals index
// private data to JSON format.
func (idx *Index) MarshalJSON() ([]byte, error) {
Expand Down

0 comments on commit 5cc12a0

Please sign in to comment.