Skip to content

Commit

Permalink
Custom CRD: Wait for all processes before running metrics collector (#…
Browse files Browse the repository at this point in the history
…1313)

* Enable to wait all in metrics collectors

* Rename metricsFilePath

* Fix tfevent

* Fix pns py

* Fix comment
  • Loading branch information
andreyvelich committed Sep 9, 2020
1 parent 7b797e1 commit 6b7142f
Show file tree
Hide file tree
Showing 135 changed files with 20,302 additions and 121 deletions.
34 changes: 34 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Expand Up @@ -123,3 +123,7 @@ required = [
[[constraint]]
name = "github.com/tidwall/gjson"
version = "1.6.0"

[[constraint]]
name = "github.com/shirou/gopsutil"
version = "2.20.7"
30 changes: 15 additions & 15 deletions cmd/metricscollector/v1beta1/file-metricscollector/main.go
Expand Up @@ -54,14 +54,14 @@ import (
)

var (
metricsFileName = flag.String("path", "", "Metrics File Path")
trialName = flag.String("t", "", "Trial Name")
managerService = flag.String("s", "", "Katib Manager service")
metricNames = flag.String("m", "", "Metric names")
filters = flag.String("f", "", "Metric filters")
pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval to check if main process of worker container exit")
timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout to check if main process of worker container exit")
waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting")
managerServiceAddr = flag.String("s", "", "Katib Manager service")
trialName = flag.String("t", "", "Trial Name")
metricsFilePath = flag.String("path", "", "Metrics File Path")
metricNames = flag.String("m", "", "Metric names")
metricFilters = flag.String("f", "", "Metric filters")
pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval between running processes check")
timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout before invoke error during running processes check")
waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting")
)

func printMetricsFile(mFile string) {
Expand All @@ -86,18 +86,18 @@ func main() {
flag.Parse()
klog.Infof("Trial Name: %s", *trialName)

go printMetricsFile(*metricsFileName)
go printMetricsFile(*metricsFilePath)
wopts := common.WaitPidsOpts{
PollInterval: *pollInterval,
Timeout: *timeout,
WaitAll: *waitAll,
CompletedMarkedDirPath: filepath.Dir(*metricsFileName),
CompletedMarkedDirPath: filepath.Dir(*metricsFilePath),
}
if err := common.Wait(wopts); err != nil {
if err := common.WaitMainProcesses(wopts); err != nil {
klog.Fatalf("Failed to wait for worker container: %v", err)
}

conn, err := grpc.Dial(*managerService, grpc.WithInsecure())
conn, err := grpc.Dial(*managerServiceAddr, grpc.WithInsecure())
if err != nil {
klog.Fatalf("could not connect: %v", err)
}
Expand All @@ -109,10 +109,10 @@ func main() {
metricList = strings.Split(*metricNames, ";")
}
var filterList []string
if len(*filters) != 0 {
filterList = strings.Split(*filters, ";")
if len(*metricFilters) != 0 {
filterList = strings.Split(*metricFilters, ";")
}
olog, err := filemc.CollectObservationLog(*metricsFileName, metricList, filterList)
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filterList)
if err != nil {
klog.Fatalf("Failed to collect logs: %v", err)
}
Expand Down
21 changes: 15 additions & 6 deletions cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py
@@ -1,7 +1,8 @@
import grpc
import argparse
import api_pb2
from pns import WaitOtherMainProcesses
from pns import WaitMainProcesses
import const
from tfevent_loader import MetricsCollector
from logging import getLogger, StreamHandler, INFO

Expand All @@ -13,12 +14,16 @@ def parse_options():
description='TF-Event MetricsCollector',
add_help=True
)
parser.add_argument("-s", "--manager_server_addr",
type=str, default="katib-db-manager:6789")

parser.add_argument("-s", "--manager_server_addr", type=str, default="")
parser.add_argument("-t", "--trial_name", type=str, default="")
parser.add_argument("-path", "--dir_path", type=str, default="/log")
parser.add_argument("-path", "--metrics_file_dir", type=str, default=const.DEFAULT_METRICS_FILE_DIR)
parser.add_argument("-m", "--metric_names", type=str, default="")
parser.add_argument("-f", "--metric_filters", type=str, default="")
parser.add_argument("-p", "--poll_interval", type=int, default=const.DEFAULT_POLL_INTERVAL)
parser.add_argument("-timeout", "--timeout", type=int, default=const.DEFAULT_TIMEOUT)
parser.add_argument("-w", "--wait_all", type=bool, default=const.DEFAULT_WAIT_ALL)

opt = parser.parse_args()
return opt

Expand All @@ -36,10 +41,14 @@ def parse_options():
raise Exception("Invalid katib manager service address: %s" %
opt.manager_server_addr)

WaitOtherMainProcesses(completed_marked_dir=opt.dir_path)
WaitMainProcesses(
pool_interval=opt.poll_interval,
timout=opt.timeout,
wait_all=opt.wait_all,
completed_marked_dir=opt.metrics_file_dir)

mc = MetricsCollector(opt.metric_names.split(';'))
observation_log = mc.parse_file(opt.dir_path)
observation_log = mc.parse_file(opt.metrics_file_dir)

channel = grpc.beta.implementations.insecure_channel(
manager_server[0], int(manager_server[1]))
Expand Down
21 changes: 15 additions & 6 deletions pkg/metricscollector/v1beta1/common/const.go
Expand Up @@ -21,16 +21,25 @@ import (
)

const (
// DefaultPollInterval is the default value for interval between running processes check
DefaultPollInterval = time.Second
DefaultTimeout = 0
DefaultWaitAll = false

MetricCollectorContainerName = "metrics-collector"
MetricLoggerCollectorContainerName = "metrics-logger-and-collector"

// DefaultTimeout is the default value for timeout before invoke error during running processes check
// To run without timeout set value to 0
DefaultTimeout = 0
// DefaultWaitAll is the default value whether wait for all other main process of container exiting
DefaultWaitAll = true
// TrainingCompleted is the job finished marker in $$$$.pid file when main process is completed
TrainingCompleted = "completed"

// DefaultFilter is the default metrics collector filter to parse the metrics.
// Metrics must be printed this way
// loss=0.3
// accuracy=0.98
DefaultFilter = `([\w|-]+)\s*=\s*((-?\d+)(\.\d+)?)`

// TODO (andreyvelich): Do we need to maintain 2 names? Should we leave only 1?
MetricCollectorContainerName = "metrics-collector"
MetricLoggerCollectorContainerName = "metrics-logger-and-collector"
)

var (
Expand Down
10 changes: 10 additions & 0 deletions pkg/metricscollector/v1beta1/common/const.py
@@ -0,0 +1,10 @@
# Default value for interval between running processes check
DEFAULT_POLL_INTERVAL = 1
# Default value for timeout before invoke error during running processes check
DEFAULT_TIMEOUT = 0
# Default value whether wait for all other main process of container exiting
DEFAULT_WAIT_ALL = True
# Default value for directory where TF event metrics are reported
DEFAULT_METRICS_FILE_DIR = "/log"
# Job finished marker in $$$$.pid file when main process is completed
TRAINING_COMPLETED = "completed"

0 comments on commit 6b7142f

Please sign in to comment.