Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengjiajin committed Dec 11, 2018
1 parent d034961 commit f2d7b82
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 28 deletions.
41 changes: 20 additions & 21 deletions pkg/controller.v2/tensorflow/controller.go
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options"
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
tfjobinformersv1alpha2 "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions/kubeflow/v1alpha2"
tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/kubeflow/v1alpha2"
Expand Down Expand Up @@ -104,8 +103,6 @@ func NewTFController(
tfJobInformerFactory tfjobinformers.SharedInformerFactory,
option options.ServerOption) *TFController {

tfjobscheme.AddToScheme(scheme.Scheme)

log.Info("Creating TFJob controller")
// Create new TFController.
tc := &TFController{
Expand Down Expand Up @@ -177,16 +174,10 @@ func (tc *TFController) Run(threadiness int, stopCh <-chan struct{}) error {

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced); !ok {
return fmt.Errorf("failed to wait for tfjob caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, tc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, tc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced,
tc.PodInformerSynced,
tc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
Expand All @@ -213,15 +204,26 @@ func (tc *TFController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (tc *TFController) processNextWorkItem() bool {
key, quit := tc.WorkQueue.Get()
obj, quit := tc.WorkQueue.Get()
if quit {
return false
}
defer tc.WorkQueue.Done(key)
defer tc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
tc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}

logger := tflogger.LoggerForKey(key.(string))
logger := tflogger.LoggerForKey(key)

tfJob, err := tc.getTFJobFromKey(key.(string))
tfJob, err := tc.getTFJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("TFJob has been deleted: %v", key)
Expand All @@ -240,7 +242,7 @@ func (tc *TFController) processNextWorkItem() bool {
}

// Sync TFJob to match the actual state to this desired state.
forget, err := tc.syncHandler(key.(string))
forget, err := tc.syncHandler(key)
if err == nil {
if forget {
tc.WorkQueue.Forget(key)
Expand Down Expand Up @@ -279,9 +281,6 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
if err != nil {
return false, err
}
if len(namespace) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid tfjob key %q: either namespace or name is missing", key)
}

sharedTFJob, err := tc.getTFJobFromName(namespace, name)
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller.v2/tensorflow/informer.go
Expand Up @@ -65,9 +65,9 @@ func (tc *TFController) getTFJobFromName(namespace, name string) (*tfv1alpha2.TF
}

func (tc *TFController) getTFJobFromKey(key string) (*tfv1alpha2.TFJob, error) {
logger := tflogger.LoggerForKey(key)
// Check if the key exists.
obj, exists, err := tc.tfJobInformer.GetIndexer().GetByKey(key)
logger := tflogger.LoggerForKey(key)
if err != nil {
logger.Errorf("Failed to get TFJob '%s' from informer index: %+v", key, err)
return nil, errGetFromKey
Expand All @@ -77,11 +77,7 @@ func (tc *TFController) getTFJobFromKey(key string) (*tfv1alpha2.TFJob, error) {
return nil, errNotExists
}

tfjob, err := tfJobFromUnstructured(obj)
if err != nil {
return nil, err
}
return tfjob, nil
return tfJobFromUnstructured(obj)
}

func tfJobFromUnstructured(obj interface{}) (*tfv1alpha2.TFJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v2/tensorflow/job.go
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
failedMarshalTFJobReason = "FailedInvalidTFJobSpec"
failedMarshalTFJobReason = "InvalidTFJobSpec"
)

// When a pod is added, set the defaults and enqueue the current tfjob.
Expand Down

0 comments on commit f2d7b82

Please sign in to comment.