Skip to content

Commit cabce89

Browse files
committed
Updated indexer code
1 parent 955dce5 commit cabce89

File tree

7 files changed

+316
-100
lines changed

7 files changed

+316
-100
lines changed

cmd/indexer/main.go

Lines changed: 33 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"context"
5-
"errors"
65
"flag"
76
"fmt"
87
"os"
@@ -14,20 +13,18 @@ import (
1413
"unicode"
1514

1615
// Packages
16+
"github.com/mutablelogic/go-sqlite/pkg/config"
1717
"github.com/mutablelogic/go-sqlite/pkg/indexer"
1818
"github.com/mutablelogic/go-sqlite/pkg/sqlite3"
19-
20-
// Namespace imports
21-
. "github.com/mutablelogic/go-sqlite"
22-
. "github.com/mutablelogic/go-sqlite/pkg/lang"
2319
)
2420

2521
var (
2622
flagName = flag.String("name", "index", "Index name")
2723
flagInclude = flag.String("include", "", "Paths, names and extensions to include")
2824
flagExclude = flag.String("exclude", "", "Paths, names and extensions to exclude")
29-
flagWorkers = flag.Uint("workers", 10, "Number of indexing workers")
25+
flagWorkers = flag.Uint("workers", 0, "Number of indexing workers")
3026
flagDatabase = flag.String("db", ":memory:", "Path to sqlite database")
27+
flagVersion = flag.Bool("version", false, "Display version")
3128
)
3229

3330
func main() {
@@ -36,6 +33,10 @@ func main() {
3633

3734
// Parse flags
3835
flag.Parse()
36+
if *flagVersion {
37+
config.PrintVersion(flag.CommandLine.Output())
38+
os.Exit(0)
39+
}
3940
if flag.NArg() != 1 {
4041
fmt.Fprintln(os.Stderr, "missing path argument")
4142
os.Exit(-1)
@@ -50,23 +51,23 @@ func main() {
5051
}
5152

5253
// Open indexer to path
53-
indexer, err := indexer.NewIndexer(*flagName, path)
54+
idx, err := indexer.NewIndexer(*flagName, path, indexer.NewQueue())
5455
if err != nil {
5556
fmt.Fprintln(os.Stderr, err)
5657
os.Exit(-1)
5758
}
5859

5960
// Indexer inclusions
6061
for _, include := range strings.FieldsFunc(*flagInclude, sep) {
61-
if err := indexer.Include(include); err != nil {
62+
if err := idx.Include(include); err != nil {
6263
fmt.Fprintln(os.Stderr, err)
6364
os.Exit(-1)
6465
}
6566
}
6667

6768
// Indexer exclusions
6869
for _, exclude := range strings.FieldsFunc(*flagExclude, sep) {
69-
if err := indexer.Exclude(exclude); err != nil {
70+
if err := idx.Exclude(exclude); err != nil {
7071
fmt.Fprintln(os.Stderr, err)
7172
os.Exit(-1)
7273
}
@@ -87,38 +88,46 @@ func main() {
8788
}
8889
defer pool.Close()
8990

90-
// Create schema
91-
if err := CreateSchema(ctx, pool); err != nil {
92-
fmt.Fprintln(os.Stderr, err)
91+
// Create store
92+
store := indexer.NewStore(pool, "main", idx.Queue(), *flagWorkers)
93+
if store == nil {
94+
fmt.Fprintln(os.Stderr, "failed to create store")
9395
os.Exit(-1)
9496
}
9597

98+
// Error routine persists until error channel is closed
99+
go func() {
100+
for err := range errs {
101+
fmt.Fprintln(os.Stderr, err)
102+
}
103+
}()
104+
96105
wg.Add(1)
97106
go func() {
98107
defer wg.Done()
99-
for {
100-
select {
101-
case err := <-errs:
102-
fmt.Fprintln(os.Stderr, err)
103-
case <-ctx.Done():
104-
return
105-
}
108+
errs <- fmt.Errorf("starting indexer")
109+
if err := idx.Run(ctx, errs); err != nil {
110+
errs <- err
106111
}
112+
errs <- fmt.Errorf("ending indexer")
107113
}()
108114

109115
wg.Add(1)
110116
go func() {
111117
defer wg.Done()
112-
if err := indexer.Run(ctx, errs); err != nil {
118+
errs <- fmt.Errorf("starting store")
119+
if err := store.Run(ctx, errs); err != nil {
113120
errs <- err
114121
}
122+
errs <- fmt.Errorf("ending store")
115123
}()
116124

117125
wg.Add(1)
118126
go func() {
119127
defer wg.Done()
120128
<-time.After(time.Second)
121-
err := indexer.Walk(ctx, func(err error) {
129+
errs <- fmt.Errorf("starting reindexer")
130+
err := idx.Walk(ctx, func(err error) {
122131
if err != nil {
123132
errs <- fmt.Errorf("reindexing completed with errors: %w", err)
124133
} else {
@@ -130,33 +139,11 @@ func main() {
130139
}
131140
}()
132141

133-
for i := uint(0); i < *flagWorkers; i++ {
134-
wg.Add(1)
135-
go func(i uint) {
136-
defer wg.Done()
137-
conn := pool.Get()
138-
if conn == nil {
139-
return
140-
}
141-
defer pool.Put(conn)
142-
for {
143-
select {
144-
case <-ctx.Done():
145-
return
146-
default:
147-
if evt := indexer.Next(); evt != nil {
148-
if err := Process(ctx, conn, evt); err != nil {
149-
errs <- err
150-
}
151-
}
152-
}
153-
}
154-
}(i)
155-
}
156-
157142
// Wait for all goroutines to finish
158143
wg.Wait()
159-
os.Exit(0)
144+
145+
// Close error channel
146+
close(errs)
160147
}
161148

162149
func HandleSignal() context.Context {
@@ -174,43 +161,3 @@ func HandleSignal() context.Context {
174161
func sep(r rune) bool {
175162
return r == ',' || unicode.IsSpace(r)
176163
}
177-
178-
func CreateSchema(ctx context.Context, pool SQPool) error {
179-
conn := pool.Get()
180-
if conn == nil {
181-
return errors.New("Unable to get a connection from pool")
182-
}
183-
defer pool.Put(conn)
184-
185-
// Create table
186-
return conn.Do(ctx, 0, func(txn SQTransaction) error {
187-
if _, err := txn.Query(Q(`CREATE TABLE IF NOT EXISTS files (
188-
name TEXT NOT NULL,
189-
path TEXT NOT NULL,
190-
PRIMARY KEY (name, path)
191-
)`)); err != nil {
192-
return err
193-
}
194-
return nil
195-
})
196-
}
197-
198-
func Process(ctx context.Context, conn SQConnection, evt *indexer.QueueEvent) error {
199-
return conn.Do(ctx, 0, func(txn SQTransaction) error {
200-
switch evt.Event {
201-
case indexer.EventAdd:
202-
if result, err := txn.Query(Q(`REPLACE INTO files (name, path) VALUES (?, ?)`), evt.Name, evt.Path); err != nil {
203-
return err
204-
} else if result.LastInsertId() > 0 {
205-
//fmt.Println("ADDED:", evt)
206-
}
207-
case indexer.EventRemove:
208-
if result, err := txn.Query(Q(`DELETE FROM files WHERE name=? AND path=?`), evt.Name, evt.Path); err != nil {
209-
return err
210-
} else if result.RowsAffected() == 1 {
211-
fmt.Println("REMOVED:", evt)
212-
}
213-
}
214-
return nil
215-
})
216-
}

pkg/config/version.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,6 @@ func PrintVersion(w io.Writer) {
3131
fmt.Fprintf(w, " Build Time: %v\n", GoBuildTime)
3232
}
3333
fmt.Fprintf(w, " go: %v (%v/%v)\n", runtime.Version(), runtime.GOOS, runtime.GOARCH)
34-
fmt.Fprintf(w, " sqlite3: %v\n", sqlite3.Version())
34+
v, _, _ := sqlite3.Version()
35+
fmt.Fprintf(w, " sqlite3: %v\n", v)
3536
}

pkg/indexer/indexer.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323

2424
type Indexer struct {
2525
*walkfs.WalkFS
26-
*Queue
27-
name string
28-
path string
29-
walk chan WalkFunc
26+
queue *Queue
27+
name string
28+
path string
29+
walk chan WalkFunc
3030
}
3131

3232
// WalkFunc is called after a reindexing with any walk errors
@@ -48,10 +48,9 @@ var (
4848
// LIFECYCLE
4949

5050
// Create a new indexer with an identifier, path to the root of the indexer
51-
// and a channel to receive any errors
52-
func NewIndexer(name, path string) (*Indexer, error) {
51+
// and a queue
52+
func NewIndexer(name, path string, queue *Queue) (*Indexer, error) {
5353
this := new(Indexer)
54-
this.Queue = NewQueueWithCapacity(defaultCapacity)
5554
this.WalkFS = walkfs.New(this.visit)
5655

5756
// Check path argument
@@ -68,14 +67,21 @@ func NewIndexer(name, path string) (*Indexer, error) {
6867
this.path = abspath
6968
}
7069

70+
// Check queue argument
71+
if queue == nil {
72+
this.queue = NewQueue()
73+
} else {
74+
this.queue = queue
75+
}
76+
7177
// Channel to indicate we want to walk the index
7278
this.walk = make(chan WalkFunc)
7379

7480
// Return success
7581
return this, nil
7682
}
7783

78-
// run indexer
84+
// run indexer, provider channel to receive errors
7985
func (i *Indexer) Run(ctx context.Context, errs chan<- error) error {
8086
var walking sync.Mutex
8187

@@ -141,6 +147,11 @@ func (i *Indexer) Path() string {
141147
return i.path
142148
}
143149

150+
// Return the queue
151+
func (i *Indexer) Queue() *Queue {
152+
return i.queue
153+
}
154+
144155
///////////////////////////////////////////////////////////////////////////////
145156
// PUBLIC METHODS
146157

@@ -173,15 +184,15 @@ func (i *Indexer) event(ctx context.Context, evt notify.EventInfo) error {
173184
case notify.Create, notify.Write:
174185
info, err := os.Stat(evt.Path())
175186
if err == nil && info.Mode().IsRegular() && i.ShouldVisit(relpath, info) {
176-
i.Queue.Add(i.name, relpath)
187+
i.queue.Add(i.name, relpath)
177188
}
178189
case notify.Remove, notify.Rename:
179190
info, err := os.Stat(evt.Path())
180191
if err == nil && info.Mode().IsRegular() && i.ShouldVisit(relpath, info) {
181-
i.Queue.Add(i.name, relpath)
192+
i.queue.Add(i.name, relpath)
182193
} else {
183194
// Always attempt removal from index
184-
i.Queue.Remove(i.name, relpath)
195+
i.queue.Remove(i.name, relpath)
185196
}
186197
}
187198
// Return success
@@ -191,7 +202,7 @@ func (i *Indexer) event(ctx context.Context, evt notify.EventInfo) error {
191202
// visit is used to index a file from the indexer
192203
func (i *Indexer) visit(ctx context.Context, abspath, relpath string, info fs.FileInfo) error {
193204
if info.Mode().IsRegular() {
194-
i.Queue.Add(i.name, relpath)
205+
i.queue.Add(i.name, relpath)
195206
}
196207
return nil
197208
}

pkg/indexer/queue.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ const (
4242
///////////////////////////////////////////////////////////////////////////////
4343
// LIFECYCLE
4444

45+
// Create a new queue with default capacity
46+
func NewQueue() *Queue {
47+
return NewQueueWithCapacity(0)
48+
}
49+
4550
// Create a new queue which acts as a buffer between the file indexing
46-
// and the rendering which can be slower than the file indexing
51+
// and the processng/rendering which can be slower than the file indexing
4752
func NewQueueWithCapacity(cap int) *Queue {
4853
q := new(Queue)
4954

pkg/indexer/schema.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package indexer
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
// Namespace imports
8+
. "github.com/mutablelogic/go-sqlite"
9+
. "github.com/mutablelogic/go-sqlite/pkg/lang"
10+
)
11+
12+
///////////////////////////////////////////////////////////////////////////////
13+
// GLOBALS
14+
15+
const (
16+
filesTableName = "files"
17+
)
18+
19+
///////////////////////////////////////////////////////////////////////////////
20+
// PUBLIC METHODS
21+
22+
func CreateSchema(ctx context.Context, pool SQPool, schema string) error {
23+
conn := pool.Get()
24+
if conn == nil {
25+
return errors.New("unable to get a connection from pool")
26+
}
27+
defer pool.Put(conn)
28+
29+
// Create table
30+
return conn.Do(ctx, 0, func(txn SQTransaction) error {
31+
if _, err := txn.Query(N(filesTableName).WithSchema(schema).CreateTable(
32+
C("name").WithPrimary(),
33+
C("path").WithPrimary(),
34+
).IfNotExists()); err != nil {
35+
return err
36+
}
37+
return nil
38+
})
39+
}
40+
41+
func Replace(schema string, evt *QueueEvent) (SQStatement, []interface{}) {
42+
return N(filesTableName).WithSchema(schema).Replace("name", "path"),
43+
[]interface{}{evt.Name, evt.Path}
44+
}
45+
46+
func Delete(schema string, evt *QueueEvent) (SQStatement, []interface{}) {
47+
return N(filesTableName).WithSchema(schema).Delete(Q("name=?"), Q("path=?")),
48+
[]interface{}{evt.Name, evt.Path}
49+
}

0 commit comments

Comments
 (0)