/
indexer.go
130 lines (101 loc) · 3.06 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package index
import (
"container/list"
"context"
"fmt"
"os"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/ythosa/bendy/internal/config"
"github.com/ythosa/bendy/internal/decoding"
"github.com/ythosa/bendy/internal/normalizing"
"github.com/ythosa/bendy/pkg/fcheck"
)
type Indexer struct {
decoder decoding.Decoder
normalizer normalizing.Normalizer
config *config.Index
}
func NewIndexer(decoder decoding.Decoder, normalizer normalizing.Normalizer, config *config.Index) *Indexer {
return &Indexer{
decoder: decoder,
normalizer: normalizer,
config: config,
}
}
func (i *Indexer) IndexFiles(filePaths []string) (InvertIndex, error) {
if err := fcheck.CheckIsFilePathsValid(filePaths); err != nil {
return nil, fmt.Errorf("error while checking files: %w", err)
}
ctx := context.TODO()
sem := semaphore.NewWeighted(i.config.MaxOpenFilesCount)
errs, _ := errgroup.WithContext(context.Background())
for docID, filePath := range filePaths {
filePath := filePath
docID := DocID(docID)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("failed to acquire semaphore: %w", err)
}
errs.Go(func() error {
defer sem.Release(1)
return i.indexFile(filePath, docID)
})
}
if err := errs.Wait(); err != nil {
return nil, fmt.Errorf("error while indexing file: %w", err)
}
return i.mergeIndexingResults(len(filePaths))
}
func (i *Indexer) indexFile(filePath string, docID DocID) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("error while opening file: %w", err)
}
defer file.Close()
decoder, err := i.decoder.GetDecoder(file)
if err != nil {
return fmt.Errorf("error while getting decoder for file: %w", err)
}
tokens := make(map[string]bool)
dictionary := make([]string, 0)
for decoded, ok := decoder.DecodeNext(); ok; decoded, ok = decoder.DecodeNext() {
normalizer, err := i.normalizer.GetNormalizer(decoded)
if err != nil {
return fmt.Errorf("error while getting normalizer for %s: %w", decoded, err)
}
normalized := normalizer.Normalize(decoded)
if normalized == "" {
continue
}
if _, ok := tokens[normalized]; !ok {
tokens[normalized] = true
dictionary = append(dictionary, normalized)
}
}
return encodeDictionaryToFile(dictionary, i.getFilenameFromDocID(docID))
}
func (i *Indexer) mergeIndexingResults(resultsCount int) (InvertIndex, error) {
invertIndex := make(InvertIndex)
for docID := DocID(0); int(docID) < resultsCount; docID++ {
filename := i.getFilenameFromDocID(docID)
terms, err := decodeDictionaryFromFile(filename)
if err != nil {
return nil, err
}
if err := os.Remove(filename); err != nil {
return nil, fmt.Errorf("error removing file: %w", err)
}
for _, t := range terms {
if index, ok := invertIndex[t]; ok {
index.Insert(docID)
} else {
invertIndex[t] = NewIndex(list.New())
invertIndex[t].DocIDs.PushBack(docID)
}
}
}
return invertIndex, nil
}
func (i *Indexer) getFilenameFromDocID(docID DocID) string {
return fmt.Sprintf("%s%d", i.config.TempFilesStoragePath, docID)
}