Skip to content

Commit

Permalink
Support JSON format logs in file-metrics-collector (#1765)
Browse files Browse the repository at this point in the history
* support JSON format logs in file-metrics-collector

* review: convert fileFormat to type FileSystemFileFormat

* Update cmd/metricscollector/v1beta1/file-metricscollector/main.go

Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* review: remove func (f FileSystemFileFormat) String()

* review: get metricRegList only when the format is TEXT

* review: change var name in a script for e2e

* review: explict specify the cloudml-hypyertune in the Dockerfile

* review: use reflect.DeepEqual instead of go-cmp.Diff

* review: stop using 'JSON' directly in error statements

* review: install specific version cloudml-hypertune

* review: get objType in the updateStopRules function

* review: save optimalObjValue across multiple stopRules

* review: add warning messages to parseTimestamp func

* review: generate test files with go test command

* review: change api for new feature

Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
  • Loading branch information
tenzen-y and andreyvelich committed Apr 5, 2022
1 parent 36d0a57 commit d443ed3
Show file tree
Hide file tree
Showing 22 changed files with 824 additions and 123 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bin
*.dll
*.so
*.dylib
pkg/metricscollector/v1beta1/file-metricscollector/testdata

## Test binary, build with `go test -c`
*.test
Expand Down
199 changes: 127 additions & 72 deletions cmd/metricscollector/v1beta1/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +104,7 @@ var (
earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint")
trialName = flag.String("t", "", "Trial Name")
metricsFilePath = flag.String("path", "", "Metrics File Path")
metricsFileFormat = flag.String("format", "", "Metrics File Format")
metricNames = flag.String("m", "", "Metric names")
objectiveType = flag.String("o-type", "", "Objective type")
metricFilters = flag.String("f", "", "Metric filters")
Expand Down Expand Up @@ -137,7 +140,7 @@ func printMetricsFile(mFile string) {
}
}

func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string, fileFormat commonv1beta1.FileFormat) {

// metricStartStep is the dict where key = metric name, value = start step.
// We should apply early stopping rule only if metric is reported at least "start_step" times.
Expand All @@ -148,9 +151,6 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}
}

// First metric is objective in metricNames array.
objMetric := strings.Split(*metricNames, ";")[0]
objType := commonv1beta1.ObjectiveType(*objectiveType)
// For objective metric we calculate best optimal value from the recorded metrics.
// This is workaround for Median Stop algorithm.
// TODO (andreyvelich): Think about it, maybe define latest, max or min strategy type in stop-rule as well ?
Expand All @@ -169,88 +169,89 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
klog.Fatalf("Failed to create new Process from pid %v, error: %v", mainProcPid, err)
}

// Get list of regural expressions from filters.
metricRegList := filemc.GetFilterRegexpList(filters)

// Start watch log lines.
t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
for line := range t.Lines {
logText := line.Text
// Print log line
klog.Info(logText)

// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
switch fileFormat {
case commonv1beta1.TextFormat:
// Get list of regural expressions from filters.
var metricRegList []*regexp.Regexp
metricRegList = filemc.GetFilterRegexpList(filters)

// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}

// Calculate optimalObjValue.
if metricName == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[metricName]; ok {
metricStartStep[metricName]--
if metricStartStep[metricName] != 0 {
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
continue
}
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
}
}
}
case commonv1beta1.JsonFormat:
var logJsonObj map[string]interface{}
if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil {
klog.Fatalf("Failed to unmarshal logs in %v format, log: %s, error: %v", commonv1beta1.JsonFormat, logText, err)
}
// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if _, exist := logJsonObj[rule.Name]; exist {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
stopRules = deleteStopRule(stopRules, idx)
}
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
value, exist := logJsonObj[rule.Name].(string)
if !exist {
continue
}
metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name)
}
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
}
default:
klog.Fatalf("Format must be set to %v or %v", commonv1beta1.TextFormat, commonv1beta1.JsonFormat)
}

// If stopRules array is empty, Trial is early stopped.
Expand Down Expand Up @@ -289,7 +290,7 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}

// Report metrics to DB.
reportMetrics(filters)
reportMetrics(filters, fileFormat)

// Wait until main process is completed.
timeout := 60 * time.Second
Expand Down Expand Up @@ -326,6 +327,58 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}
}

func updateStopRules(
stopRules []commonv1beta1.EarlyStoppingRule,
optimalObjValue *float64,
metricValue float64,
metricStartStep map[string]int,
rule commonv1beta1.EarlyStoppingRule,
ruleIdx int,
) ([]commonv1beta1.EarlyStoppingRule, *float64) {

// First metric is objective in metricNames array.
objMetric := strings.Split(*metricNames, ";")[0]
objType := commonv1beta1.ObjectiveType(*objectiveType)

// Calculate optimalObjValue.
if rule.Name == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[rule.Name]; ok {
metricStartStep[rule.Name]--
if metricStartStep[rule.Name] != 0 {
return stopRules, optimalObjValue
}
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
}
return stopRules, optimalObjValue
}

func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule {
if idx >= len(stopRules) {
klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules)
Expand All @@ -345,9 +398,11 @@ func main() {
filters = strings.Split(*metricFilters, ";")
}

fileFormat := commonv1beta1.FileFormat(*metricsFileFormat)

// If stop rule is set we need to parse metrics during run.
if len(stopRules) != 0 {
go watchMetricsFile(*metricsFilePath, stopRules, filters)
go watchMetricsFile(*metricsFilePath, stopRules, filters, fileFormat)
} else {
go printMetricsFile(*metricsFilePath)
}
Expand All @@ -366,11 +421,11 @@ func main() {

// If training was not early stopped, report the metrics.
if !isEarlyStopped {
reportMetrics(filters)
reportMetrics(filters, fileFormat)
}
}

func reportMetrics(filters []string) {
func reportMetrics(filters []string, fileFormat commonv1beta1.FileFormat) {

conn, err := grpc.Dial(*dbManagerServiceAddr, grpc.WithInsecure())
if err != nil {
Expand All @@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
if len(*metricNames) != 0 {
metricList = strings.Split(*metricNames, ";")
}
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters)
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, fileFormat)
if err != nil {
klog.Fatalf("Failed to collect logs: %v", err)
}
Expand Down
72 changes: 72 additions & 0 deletions examples/v1beta1/early-stopping/median-stop-with-json-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This is example with median stopping early stopping rule with logs in JSON format.
# It has bad feasible space for learning rate to show more early stopped Trials.
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
namespace: kubeflow
name: median-stop-with-json-format
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: accuracy
additionalMetricNames:
- loss
metricsCollectorSpec:
source:
fileSystemPath:
path: "/katib/mnist.json"
kind: File
format: JSON
collector:
kind: File
algorithm:
algorithmName: random
earlyStopping:
algorithmName: medianstop
algorithmSettings:
- name: min_trials_required
value: "1"
- name: start_step
value: "2"
parallelTrialCount: 2
maxTrialCount: 15
maxFailedTrialCount: 3
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.5"
- name: num-epochs
parameterType: int
feasibleSpace:
min: "3"
max: "4"
trialTemplate:
retain: true
primaryContainerName: training-container
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: numberEpochs
description: Number of epochs to train the model
reference: num-epochs
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/pytorch-mnist:latest
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=${trialParameters.numberEpochs}"
- "--log-path=/katib/mnist.json"
- "--lr=${trialParameters.learningRate}"
- "--logger=hypertune"
restartPolicy: Never
Loading

0 comments on commit d443ed3

Please sign in to comment.