-
Notifications
You must be signed in to change notification settings - Fork 168
/
repository.go
84 lines (73 loc) · 2.3 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package log
import (
"context"
"fmt"
"github.com/pydio/cells/v4/common/dao"
log2 "github.com/pydio/cells/v4/common/log"
"github.com/pydio/cells/v4/common/proto/log"
)
type IndexService struct {
dao dao.IndexDAO
}
func NewIndexService(dao dao.IndexDAO) (MessageRepository, error) {
is := &IndexService{dao: dao}
return is, nil
}
// PutLog adds a new LogMessage in the syslog index.
func (s *IndexService) PutLog(ctx context.Context, line *log.Log) error {
return s.dao.InsertOne(ctx, line)
}
// ListLogs performs a query in the bleve index, based on the passed query string.
// It returns results as a stream of log.ListLogResponse for each corresponding hit.
// Results are ordered by descending timestamp rather than by score.
func (s *IndexService) ListLogs(ctx context.Context, str string, page int32, size int32) (chan log.ListLogResponse, error) {
ch, er := s.dao.FindMany(ctx, str, page*size, size, "", false, nil)
if er != nil {
return nil, er
}
wrapped := make(chan log.ListLogResponse)
go func() {
defer close(wrapped)
for res := range ch {
wrapped <- log.ListLogResponse{LogMessage: res.(*log.LogMessage)}
}
}()
return wrapped, nil
}
// DeleteLogs truncate logs based on a search query
func (s *IndexService) DeleteLogs(ctx context.Context, query string) (int64, error) {
c, er := s.dao.DeleteMany(ctx, query)
if er == nil {
return int64(c), nil
} else {
return 0, er
}
}
// AggregatedLogs performs a faceted query in the syslog repository. UNIMPLEMENTED.
func (s *IndexService) AggregatedLogs(_ context.Context, _ string, _ string, _ int32) (chan log.TimeRangeResponse, error) {
return nil, fmt.Errorf("unimplemented method")
}
func (s *IndexService) Resync(ctx context.Context, logger log2.ZapLogger) error {
return s.dao.Resync(ctx, func(s string) {
logTaskInfo(logger, s, "info")
})
}
func (s *IndexService) Truncate(ctx context.Context, max int64, logger log2.ZapLogger) error {
return s.dao.Truncate(ctx, max, func(s string) {
logTaskInfo(logger, s, "info")
})
}
func (s *IndexService) Close(ctx context.Context) error {
return s.dao.Close(ctx)
}
func logTaskInfo(l log2.ZapLogger, msg string, level string) {
if l == nil {
fmt.Println("[pydio.grpc.log] " + msg)
} else if level == "info" {
l.Info(msg)
} else if level == "error" {
l.Error(msg)
} else {
l.Debug(msg)
}
}