Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding error handling and logging #694

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/kar-controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"os"
"strconv"
"strings"

"k8s.io/klog/v2"
)

// ServerOption is the main context object for the controller manager.
Expand Down Expand Up @@ -96,6 +98,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
backoffInt, err := strconv.Atoi(backoffString)
if err == nil {
s.BackoffTime = backoffInt
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}

Expand All @@ -105,6 +109,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
holInt, err := strconv.Atoi(holString)
if err == nil {
s.HeadOfLineHoldingTime = holInt
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}

Expand All @@ -126,6 +132,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
to, err := strconv.ParseInt(dispatchResourceReservationTimeoutString, 10, 64)
if err == nil {
s.DispatchResourceReservationTimeout = to
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}
}
8 changes: 5 additions & 3 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"fmt"
"net/http"
"strings"

Expand All @@ -42,7 +43,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) {
func Run(opt *options.ServerOption) error {
restConfig, err := buildConfig(opt.Master, opt.Kubeconfig)
if err != nil {
return err
return fmt.Errorf("[Run] unable to build server config, - error: %#v", err)
Fiona-Waters marked this conversation as resolved.
Show resolved Hide resolved
}

neverStop := make(chan struct{})
Expand Down Expand Up @@ -71,7 +72,8 @@ func Run(opt *options.ServerOption) error {
// This call is blocking (unless an error occurs) which equates to <-neverStop
err = listenHealthProbe(opt)
if err != nil {
return err
return fmt.Errorf("[Run] unable to start health probe listener, - error: %#v", err)

}

return nil
Expand All @@ -83,7 +85,7 @@ func listenHealthProbe(opt *options.ServerOption) error {
handler.Handle("/healthz", &health.Handler{})
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler)
if err != nil {
return err
return fmt.Errorf("[listenHealthProbe] unable to listen and serve, - error: %#v", err)
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/controller/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)

func GetController(obj interface{}) types.UID {
accessor, err := meta.Accessor(obj)
if err != nil {
klog.Errorf("[GetController] unable to return object as minimum required fields are missing, - error: %#v", err)
return ""
}

Expand All @@ -37,10 +39,10 @@ func GetController(obj interface{}) types.UID {
return ""
}


func GetJobID(pod *v1.Pod) types.UID {
accessor, err := meta.Accessor(pod)
if err != nil {
klog.Errorf("[GetJobID] unable to return object as minimum required fields are missing, - error: %#v", err)
return ""
}

Expand Down
189 changes: 116 additions & 73 deletions pkg/controller/queuejob/queuejob_controller_ex.go

Large diffs are not rendered by default.

47 changes: 37 additions & 10 deletions pkg/controller/queuejob/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ func (p *PriorityQueue) Length() int {
func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, _ := p.activeQ.Get(qj)
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExist] unable to check if app wrapper exists, - error:%#v", err)
}
if p.unschedulableQ.Get(qj) != nil || exists {
return true
}
Expand All @@ -140,7 +143,10 @@ func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, _ := p.activeQ.Get(qj)
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExistActiveQ] unable to check if app wrapper exists, - error:%#v", err)
}
return exists
}

Expand Down Expand Up @@ -196,12 +202,15 @@ func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error {
if p.unschedulableQ.Get(qj) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return nil
}
err := p.activeQ.Add(qj)
if err != nil {
klog.Errorf("Error adding pod %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
klog.Errorf("[AddIfNotPresent] Error adding pod %s/%s to the scheduling queue, - error:%#v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
Expand All @@ -218,7 +227,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
if p.unschedulableQ.Get(qj) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return fmt.Errorf("pod is already present in the activeQ")
}
// if !p.receivedMoveRequest && isPodUnschedulable(qj) {
Expand All @@ -227,7 +239,9 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
return nil
}
err := p.activeQ.Add(qj)
if err == nil {
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand Down Expand Up @@ -271,16 +285,24 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newQJ); exists {
if _, exists, errp := p.activeQ.Get(newQJ); exists {
if errp != nil {
klog.Errorf("[Update] unable to check if pod exists, - error:%#v", errp)
}
err := p.activeQ.Update(newQJ)
if err != nil {
klog.Errorf("[Update] unable to update pod, - error: %#v", err)
}
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usQJ := p.unschedulableQ.Get(newQJ); usQJ != nil {
if p.isQJUpdated(oldQJ, newQJ) {
p.unschedulableQ.Delete(usQJ)
err := p.activeQ.Add(newQJ)
if err == nil {
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand All @@ -290,7 +312,9 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
}
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newQJ)
if err == nil {
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand All @@ -303,7 +327,10 @@ func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
p.unschedulableQ.Delete(qj)
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[Delete] unable to check if pod exists - error: %#v", err)
}
return p.activeQ.Delete(qj)
}
// p.unschedulableQ.Delete(qj)
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/queuejobdispatch/queuejobagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func NewJobClusterAgent(config string, agentEventQueue *cache.FIFO) *JobClusterA

qa.jobSynced = qa.jobInformer.Informer().HasSynced

qa.UpdateAggrResources(context.Background())
err = qa.UpdateAggrResources(context.Background())
if err != nil {
klog.Errorf("[NewJobClusterAgent] Unable to update aggr resources - error: %#v", err)
}

return qa
}
Expand Down Expand Up @@ -161,7 +164,10 @@ func (qa *JobClusterAgent) Run(stopCh <-chan struct{}) {
func (qa *JobClusterAgent) DeleteJob(ctx context.Context, cqj *arbv1.AppWrapper) {
qj_temp := cqj.DeepCopy()
klog.V(2).Infof("[Dispatcher: Agent] Request deletion of XQJ %s/%s to Agent %s\n", qj_temp.Namespace, qj_temp.Name, qa.AgentId)
qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("[DeleteJob] Unable to delete app wrapper, - error: %#v", err)
}
}

func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper) {
Expand All @@ -183,7 +189,10 @@ func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper)
agent_qj.Labels["IsDispatched"] = "true"

klog.V(2).Infof("[Dispatcher: Agent] Create XQJ: %s/%s (Status: %+v) in Agent %s\n", agent_qj.Namespace, agent_qj.Name, agent_qj.Status, qa.AgentId)
qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
_, err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
if err != nil {
klog.Errorf("[CreateJob] Unable to create app wrapper, - error: %#v", err)
}
}

type ClusterMetricsList struct {
Expand Down Expand Up @@ -228,7 +237,10 @@ func (qa *JobClusterAgent) UpdateAggrResources(ctx context.Context) error {
clusterMetricType := res.Items[i].MetricLabels["cluster"]

if strings.Compare(clusterMetricType, "cpu") == 0 || strings.Compare(clusterMetricType, "memory") == 0 {
val, units, _ := getFloatString(res.Items[i].Value)
val, units, err := getFloatString(res.Items[i].Value)
if err != nil {
klog.Errorf("[Dispatcher: UpdateAggrResources] Possible issue getting float string - error: %#v", err)
}
num, err := strconv.ParseFloat(val, 64)
if err != nil {
klog.Warningf("[Dispatcher: UpdateAggrResources] Possible issue converting %s string value of %s due to error: %v\n",
Expand Down
Loading
Loading