/
filer_meta_tail.go
138 lines (120 loc) · 4.39 KB
/
filer_meta_tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package command
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"os"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
}
var cmdFilerMetaTail = &Command{
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
Short: "see continuous changes on a filer",
Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
weed filer.meta.tail -timeAgo=30h -untilTimeAgo=20h | jq .
weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
weed filer.meta.tail -timeAgo=30h -es=http://<elasticSearchServerHost>:<port> -es.index=seaweedfs
`,
}
var (
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailStop = cmdFilerMetaTail.Flag.Duration("untilTimeAgo", 0, "read until this time ago. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
)
func runFilerMetaTail(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
var filterFunc func(dir, fname string) bool
if *tailPattern != "" {
if strings.Contains(*tailPattern, "/") {
println("watch path pattern", *tailPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
} else {
println("watch file pattern", *tailPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*tailPattern, fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
}
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
if filer_pb.IsEmpty(resp) {
return false
}
if filterFunc == nil {
return true
}
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
return true
}
if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
return true
}
return false
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
filer.ProtoToText(os.Stdout, resp)
fmt.Fprintln(os.Stdout)
return nil
}
if *esServers != "" {
var err error
eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
if err != nil {
fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
return false
}
}
var untilTsNs int64
if *tailStop != 0 {
untilTsNs = time.Now().Add(-*tailStop).UnixNano()
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "tail",
ClientId: clientId,
ClientEpoch: 0,
SelfSignature: 0,
PathPrefix: *tailTarget,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: time.Now().Add(-*tailStart).UnixNano(),
StopTsNs: untilTsNs,
EventErrorType: pb.TrivialOnError,
}
tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
return nil
}
if err := eachEntryFunc(resp); err != nil {
return err
}
return nil
})
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
return true
}