/
batch_log.go
209 lines (179 loc) · 5.27 KB
/
batch_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package cmd
import (
"fmt"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/odahu/odahu-flow/packages/operator/pkg/apis/predict_v2"
"github.com/odahu/odahu-flow/packages/operator/pkg/utils/feedback"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"net/url"
feedback_utils "odahu-commons/feedback"
"os"
"strconv"
)
// Model log commands
const (
defaultRequestTag = "request_response"
defaultFluentdHost = "localhost"
defaultFluentdPort = 24224
defaultResponseTag = "response_body"
maxRetryToDeliver = 100
maxRetryWait = 1000
)
var (
requestTag string
responseTag string
model string
version string
requestID string
)
func init () {
batchCommand.AddCommand(logCommand)
logCommand.AddCommand(logModelInputCommand)
logCommand.AddCommand(logModelOutputCommand)
logCommand.PersistentFlags().StringVarP(
&model, "model", "m", "", "ML Model name",
)
_ = logCommand.MarkPersistentFlagRequired("model")
logCommand.PersistentFlags().StringVar(
&version, "version", "", "ML Model version",
)
_ = logCommand.MarkPersistentFlagRequired("version")
logCommand.PersistentFlags().StringVarP(
&requestID, "request-id", "r", "",
"request id for which this request/response data is logged",
)
_ = logCommand.MarkFlagRequired("request-id")
logCommand.PersistentFlags().StringVar(
&apiURL, "fluentd", "", "fluentd base URL (schema://host:port)",
)
_ = viper.BindPFlag("feedback.fluentd.baseurl", logCommand.PersistentFlags().Lookup("fluentd"))
logModelInputCommand.Flags().StringVar(&requestTag, "tag", defaultRequestTag, "tag model request")
logModelOutputCommand.Flags().StringVar(&responseTag, "tag", defaultResponseTag, "tag model response")
}
type ModelOutputLogger interface {
Log(requestID string, request predict_v2.InferenceResponse) error
}
var logCommand = &cobra.Command{
Use: "log",
Short: "Catch model input or output from json files to fluentd service",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
_ = cmd.Help()
os.Exit(0)
}
},
}
func initFluentd() (*fluent.Fluent, error) {
var host string
var port int
rawBaseURL := cfg.Feedback.Fluentd.BaseURL
if rawBaseURL == "" {
host = defaultFluentdHost
port = defaultFluentdPort
} else {
baseURL, err := url.Parse(rawBaseURL)
if err != nil {
return nil, fmt.Errorf("unable to parse fluend base url: %s", rawBaseURL)
}
host = baseURL.Hostname()
portString := baseURL.Port()
if portString != "" {
port, err = strconv.Atoi(portString)
if err != nil {
return nil, fmt.Errorf("fluentd port must be integer %s", portString)
}
} else {
port = defaultFluentdPort
}
}
zap.S().Infof("Connecting to fluentd using host %s and port %d", host, port)
f, err := fluent.New(fluent.Config{
FluentPort: port,
FluentHost: host,
MaxRetry: maxRetryToDeliver,
Async: true,
MaxRetryWait: maxRetryWait,
})
return f, err
}
func getRequestWrapper(modelName string, modelVersion string) func(content string)interface{}{
return func(content string) interface{} {
return feedback_utils.RequestResponse{
RequestID: requestID,
RequestContent: content,
ModelVersion: modelVersion,
ModelName: modelName,
}
}
}
func getResponseWrapper(modelName string, modelVersion string) func(content string)interface{}{
return func(content string) interface{} {
return feedback_utils.ResponseBody{
RequestID: requestID,
ModelVersion: modelVersion,
ModelName: modelName,
ResponseContent: content,
}
}
}
var logModelInputCommand = &cobra.Command{
Use: "input",
Short: "log model input to feedback storage",
Args: cobra.ExactArgs(1),
Example: "odahu-tools batch log input <path-to-folder-with-json-files>",
RunE: func(cmd *cobra.Command, args []string) error {
logEngine, err := initFluentd()
if err != nil {
return err
}
defer func() {
if err := logEngine.Close(); err != nil {
zap.S().Errorw("Error closing fluentd", zap.Error(err))
} else {
zap.S().Info("Fluentd logs are flushed")
}
}()
dataLogger := feedback.NewLogger(logEngine)
modelName, modelVer := model, version
wrap := getRequestWrapper(modelName, modelVer)
for _, source := range args {
zap.S().Infof("Handle %s directory", source)
if err := dataLogger.LogDir(source, requestTag, wrap); err != nil {
zap.S().Errorw("Error during logging model input", zap.Error(err))
return err
}
}
return nil
},
}
var logModelOutputCommand = &cobra.Command{
Use: "output",
Short: "log model output to feedback storage",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
logEngine, err := initFluentd()
if err != nil {
return err
}
defer func() {
if err := logEngine.Close(); err != nil {
zap.S().Errorw("Error closing fluentd", zap.Error(err))
} else {
zap.S().Info("Fluentd logs are flushed")
}
}()
dataLogger := feedback.NewLogger(logEngine)
modelName, modelVer := model, version
wrap := getResponseWrapper(modelName, modelVer)
for _, source := range args {
zap.S().Infof("Handle %s directory", source)
if err := dataLogger.LogDir(source, responseTag, wrap); err != nil {
zap.S().Errorw("Error during logging model output", zap.Error(err))
return err
}
}
return nil
},
}