Skip to content

Commit

Permalink
Refactor requeue
Browse files Browse the repository at this point in the history
If objective metric value is not reported metrics collector reports unavailable value to the DB
Controller reconciles Trial until DB is empty
  • Loading branch information
andreyvelich committed Sep 18, 2020
1 parent 3257e2b commit 4970cc2
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/controller.v1beta1/consts/const.go
Expand Up @@ -147,6 +147,7 @@ const (
TrialTemplateMetaKeyOfLabels = "Labels"

// UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64
// This value is recorded in to DB when metrics collector can't parse objective metric from the training logs.
UnavailableMetricValue = "unavailable"
)

Expand Down
36 changes: 12 additions & 24 deletions pkg/controller.v1beta1/trial/trial_controller.go
Expand Up @@ -58,6 +58,8 @@ const (

var (
log = logf.Log.WithName(ControllerName)
// errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet
errMetricsNotReported = fmt.Errorf("Metrics are not reported yet")
)

// Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
Expand Down Expand Up @@ -175,12 +177,6 @@ type ReconcileTrial struct {
collector *trialutil.TrialsCollector
}

// Map which contains number of requeuing for each trial if observation logs are not available
// That is needed if Job is succeeded but metrics are not reported yet
// Key = Trial name, value = requeue count
var trialRequeueCount = make(map[string]int)
var maxRequeueCount = 5

// Reconcile reads that state of the cluster for a Trial object and makes changes based on the state read
// and what is in the Trial.Spec
// +kubebuilder:rbac:groups=trials.kubeflow.org,resources=trials,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -220,6 +216,11 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result,
} else {
err := r.reconcileTrial(instance)
if err != nil {
if err == errMetricsNotReported {
return reconcile.Result{
RequeueAfter: time.Second * 1,
}, nil
}
logger.Error(err, "Reconcile trial error")
r.recorder.Eventf(instance,
corev1.EventTypeWarning, ReconcileFailedReason,
Expand All @@ -239,24 +240,6 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result,
}
}

// Restart Reconcile for maxRequeueCount times
if instance.IsMetricsUnavailable() {

count, ok := trialRequeueCount[instance.GetName()]
if !ok {
trialRequeueCount[instance.GetName()] = 1
logger.Info("Trial metrics are not available, reconcile requeued", "max requeue count", maxRequeueCount)
} else {
trialRequeueCount[instance.GetName()]++
}

if count <= maxRequeueCount {
return reconcile.Result{
RequeueAfter: time.Second * 1,
}, nil
}
}

return reconcile.Result{}, nil
}

Expand Down Expand Up @@ -295,6 +278,11 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
return err
}
}
// If observation is empty metrics collector doesn't finish
if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil {
logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued")
return errMetricsNotReported
}

// Update Trial job status only
// if job has succeeded and if observation field is available.
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller.v1beta1/trial/trial_controller_util.go
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
Expand All @@ -40,6 +41,7 @@ const (

// UpdateTrialStatusCondition updates Trial status from current deployed Job status
func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Trial, deployedJobName string, jobStatus *trialutil.TrialJobStatus) {
logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})

timeNow := metav1.Now()

Expand All @@ -56,6 +58,7 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason)
}

logger.Info("Trial status changed to Succeeded")
instance.MarkTrialStatusSucceeded(corev1.ConditionTrue, reason, msg)
instance.Status.CompletionTime = &timeNow

Expand All @@ -64,6 +67,7 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
r.collector.IncreaseTrialsSucceededCount(instance.Namespace)
} else {
// TODO (andreyvelich): Is it correct to mark succeeded status false when metrics are unavailable?
// Ref issue to add new condition: https://github.com/kubeflow/katib/issues/1343
msg := "Metrics are not available"
reason := TrialMetricsUnavailableReason

Expand All @@ -75,12 +79,13 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason)
}

logger.Info("Trial status changed to Metrics Unavailable")
instance.MarkTrialStatusSucceeded(corev1.ConditionFalse, reason, msg)

eventMsg := fmt.Sprintf("Metrics are not available for Job %v", deployedJobName)
r.recorder.Eventf(instance, corev1.EventTypeWarning, JobMetricsUnavailableReason, eventMsg)
}
} else if jobStatus.Condition == trialutil.JobFailed {
} else if jobStatus.Condition == trialutil.JobFailed && !instance.IsFailed() {
msg := "Trial has failed"
reason := TrialFailedReason

Expand All @@ -102,12 +107,14 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria

r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg)
r.collector.IncreaseTrialsFailedCount(instance.Namespace)
} else if jobStatus.Condition == trialutil.JobRunning {
logger.Info("Trial status changed to Failed")
} else if jobStatus.Condition == trialutil.JobRunning && !instance.IsRunning() {
msg := "Trial is running"
instance.MarkTrialStatusRunning(TrialRunningReason, msg)

eventMsg := fmt.Sprintf("Job %v is running", deployedJobName)
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobRunningReason, eventMsg)
logger.Info("Trial status changed to Running")
// TODO(gaocegege): Should we maintain a TrialsRunningCount?
}
// else nothing to do
Expand Down Expand Up @@ -168,7 +175,7 @@ func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1beta1.Tr
return err
}
metricStrategies := instance.Spec.Objective.MetricStrategies
if reply.ObservationLog != nil {
if len(reply.ObservationLog.MetricLogs) != 0 {
observation, err := getMetrics(reply.ObservationLog.MetricLogs, metricStrategies)
if err != nil {
log.Error(err, "Get metrics from logs error")
Expand Down Expand Up @@ -203,9 +210,6 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1beta1.Trial, finaliz
}

func isTrialObservationAvailable(instance *trialsv1beta1.Trial) bool {
if instance == nil {
return false
}
objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName
if instance.Status.Observation != nil && instance.Status.Observation.Metrics != nil {
for _, metric := range instance.Status.Observation.Metrics {
Expand Down
11 changes: 0 additions & 11 deletions pkg/controller.v1beta1/trial/util/job_util.go
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
Expand Down Expand Up @@ -47,7 +46,6 @@ var (

// GetDeployedJobStatus returns internal representation for deployed Job status.
func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.Unstructured) (*TrialJobStatus, error) {
logger := log.WithValues("Trial", types.NamespacedName{Name: trial.GetName(), Namespace: trial.GetNamespace()})

trialJobStatus := &TrialJobStatus{}

Expand Down Expand Up @@ -75,10 +73,6 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.

// Job condition is failed
trialJobStatus.Condition = JobFailed
// Log only for the first status update
if !trial.IsFailed() {
logger.Info("Deployed Job status is failed", "Job", deployedJob.GetName())
}
return trialJobStatus, nil
}

Expand All @@ -100,18 +94,13 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.

// Job condition is succeeded
trialJobStatus.Condition = JobSucceeded
// Log only for the first status update
if !trial.IsSucceeded() && !trial.IsMetricsUnavailable() {
logger.Info("Deployed Job status is succeeded", "Job", deployedJob.GetName())
}
return trialJobStatus, nil
}

// Set default Job condition is running when Job name is generated.
// Check if Trial is not running
if !trial.IsRunning() && deployedJob.GetName() != "" {
trialJobStatus.Condition = JobRunning
logger.Info("Deployed Job status is running", "Job", deployedJob.GetName())
return trialJobStatus, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/db/v1beta1/mysql/mysql.go
Expand Up @@ -92,6 +92,9 @@ func NewDBInterface() (common.KatibDBInterface, error) {
}

func (d *dbConn) RegisterObservationLog(trialName string, observationLog *v1beta1.ObservationLog) error {
fmt.Println("--------------------------")
fmt.Println(trialName)
fmt.Println(observationLog)
sqlQuery := "INSERT INTO observation_logs (trial_name, time, metric_name, value) VALUES "
values := []interface{}{}

Expand Down
4 changes: 4 additions & 0 deletions pkg/metricscollector/v1beta1/common/const.py
Expand Up @@ -8,3 +8,7 @@
DEFAULT_METRICS_FILE_DIR = "/log"
# Job finished marker in $$$$.pid file when main process is completed
TRAINING_COMPLETED = "completed"

# UnavailableMetricValue is the value in the DB
# when metrics collector can't parse objective metric from the training logs.
UNAVAILABLE_METRIC_VALUE = "unavailable"
Expand Up @@ -8,6 +8,7 @@ import (
"time"

v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common"
"k8s.io/klog"
)
Expand All @@ -31,14 +32,14 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse

for _, logline := range logs {
// skip line which doesn't contain any metrics keywords, avoiding unnecessary pattern match
isObjLine := false
isMetricLine := false
for _, m := range metrics {
if strings.Contains(logline, m) {
isObjLine = true
isMetricLine = true
break
}
}
if !isObjLine {
if !isMetricLine {
continue
}

Expand Down Expand Up @@ -78,7 +79,31 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse
}
}
}
olog.MetricLogs = mlogs
// Metrics logs must contain at least one objective metric value
// Objective metric is located at first index
isObjectiveMetricReported := false
for _, mLog := range mlogs {
if mLog.Metric.Name == metrics[0] {
isObjectiveMetricReported = true
break
}
}
// If objective metrics were not reported, insert unavailable value in the DB
if !isObjectiveMetricReported {
olog.MetricLogs = []*v1beta1.MetricLog{
{
TimeStamp: time.Time{}.UTC().Format(time.RFC3339),
Metric: &v1beta1.Metric{
Name: metrics[0],
Value: consts.UnavailableMetricValue,
},
},
}
klog.Infof("Objective metric %v is not found in training logs, %v value is reported", metrics[0], consts.UnavailableMetricValue)
} else {
olog.MetricLogs = mlogs
}

return olog, nil
}

Expand Down
Expand Up @@ -4,6 +4,7 @@
import rfc3339
import api_pb2
from logging import getLogger, StreamHandler, INFO
import const

# TFEventFileParser parses tfevent files and returns an ObservationLog of the metrics specified.
# When the event file is under a directory(e.g. test dir), please specify "{{dirname}}/{{metrics name}}"
Expand All @@ -12,6 +13,14 @@


class TFEventFileParser:
def __init__(self):
self.logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
self.logger.setLevel(INFO)
self.logger.addHandler(handler)
self.logger.propagate = False

def find_all_files(self, directory):
for root, dirs, files in os.walk(directory):
yield root
Expand All @@ -36,6 +45,27 @@ def parse_summary(self, tfefile, metrics):
)
)
metric_logs.append(ml)
# Metrics logs must contain at least one objective metric value
# Objective metric is located at first index
is_objective_metric_reported = False
for ml in metric_logs:
if ml.metric.name == metrics[0]:
is_objective_metric_reported = True
break
# If objective metrics were not reported, insert unavailable value in the DB
if not is_objective_metric_reported:
metric_logs = [
api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(datetime.datetime.now()),
metric=api_pb2.Metric(
name=metrics[0],
value=const.UNAVAILABLE_METRIC_VALUE
)
)
]
self.logger.Info("Objective metric {} is not found in training logs, {} value is reported",
metrics[0], const.UNAVAILABLE_METRIC_VALUE)

return metric_logs


Expand Down

0 comments on commit 4970cc2

Please sign in to comment.