/
concurrent_index_page.go
100 lines (83 loc) · 2.7 KB
/
concurrent_index_page.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"flag"
"fmt"
"os"
"runtime"
"github.com/peterwilliams97/pdf-search/doclib"
)
const usage = `Usage: go run concurrent_inde_page.go [OPTIONS] testdata/*.pdf
Runs UniDoc PDF text extraction on PDF files in testdata and writes a Bleve index to
store.concurrent.`
var indexPath = "store.concurrent.page"
func main() {
flag.StringVar(&indexPath, "s", indexPath, "Bleve store name. This is a directory.")
var forceCreate, allowAppend bool
flag.BoolVar(&forceCreate, "f", false, "Force creation of a new Bleve index.")
flag.BoolVar(&allowAppend, "a", false, "Allow existing an Bleve index to be appended to.")
numWorkers := -1
flag.IntVar(&numWorkers, "w", numWorkers, "Number of worker threads.")
doclib.MakeUsage(usage)
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(-1))
fmt.Printf("NumCPU: %d\n\n", runtime.NumCPU())
flag.Parse()
doclib.SetLogging()
if len(flag.Args()) < 1 {
flag.Usage()
os.Exit(1)
}
// Read the list of PDF files that will be processed.
pathList, err := doclib.PatternsToPaths(flag.Args(), true)
if err != nil {
fmt.Fprintf(os.Stderr, "PatternsToPaths failed. args=%#q err=%v\n", flag.Args(), err)
os.Exit(1)
}
pathList = doclib.CleanCorpus(pathList)
fmt.Printf("Indexing %d PDF files.\n", len(pathList))
// Create a new index.
index, err := doclib.CreateBleveIndex(indexPath, forceCreate, allowAppend)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not create Bleve index %q.\n", indexPath)
panic(err)
}
// Set a number of worker threads that won't overload the host computer.
if numWorkers < 0 {
numWorkers = runtime.NumCPU() - 1
}
if numWorkers <= 0 {
numWorkers = 1
}
fmt.Printf("%d workers\n", numWorkers)
// Create the processing queue.
queue := doclib.NewExtractPageQueue(numWorkers)
resultChan := make(chan *doclib.ExtractPageResult)
// Start a go routine to feed the processing queue.
go func() {
// Create processing instructions `w` for each file in pathList and add the processing
// instructions to the queue.
for i, inPath := range pathList {
w := doclib.NewExtractPageWork(i, inPath, resultChan)
queue.Queue(w)
}
}()
completeJob := func(pageResult doclib.ExtractPageResult) error {
page := pageResult.Page
err := index.Index(page.ID, page)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not index %s.\n", pageResult.DocID)
panic(err)
}
return err
}
// Wait for extraction results here in the main thread.
queue.Complete(len(pathList), completeJob)
// Shut down the processing queue workers.
queue.Close()
docCount, err := index.DocCount()
if err != nil {
fmt.Fprintf(os.Stderr, "index.DocCount failed. err=%v\n", err)
return
}
fmt.Printf("Total %d pages.\n", docCount)
fmt.Println("Finished")
}