/
controller.go
128 lines (109 loc) · 4.75 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package v1
import (
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
)
// JobControllerConfiguration contains configuration of operator.
type JobControllerConfiguration struct {
// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
// wait between two reconciler sync.
// It is set to 15 sec by default.
// TODO(cph): maybe we can let it grows by multiple in the future
// and up to 5 minutes to reduce idle loop.
// e.g. 15s, 30s, 60s, 120s...
ReconcilerSyncLoopPeriod metav1.Duration
// Enable gang scheduling by kube-batch
EnableGangScheduling bool
}
// JobController abstracts other operators to manage the lifecycle of Jobs.
// User need to first implement the ControllerInterface(objectA) and then initialize a JobController(objectB) struct with objectA
// as the parameter.
// And then call objectB.ReconcileJobs as mentioned below, the ReconcileJobs method is the entrypoint to trigger the
// reconcile logic of the job controller
//
// ReconcileJobs(
// job interface{},
// replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
// jobStatus apiv1.JobStatus,
// runPolicy *apiv1.RunPolicy) error
type JobController struct {
Controller ControllerInterface
Config JobControllerConfiguration
// KubeClientSet is a standard kubernetes clientset.
KubeClientSet kubeclientset.Interface
// KubeBatchClientSet is a standard kube-batch clientset.
KubeBatchClientSet kubebatchclient.Interface
// PodLister can list/get pods from the shared informer's store.
PodLister corelisters.PodLister
// ServiceLister can list/get services from the shared informer's store.
ServiceLister corelisters.ServiceLister
// PodInformerSynced returns true if the pod store has been synced at least once.
PodInformerSynced cache.InformerSynced
// ServiceInformerSynced returns true if the service store has been synced at least once.
ServiceInformerSynced cache.InformerSynced
// A TTLCache of pod/services creates/deletes each job expects to see
// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
// {
// "PS": {
// "Replicas": 2,
// },
// "Worker": {
// "Replicas": 4,
// }
// }
// We will create 4 expectations:
// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
Expectations controller.ControllerExpectationsInterface
// WorkQueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
WorkQueue workqueue.RateLimitingInterface
// Recorder is an event recorder for recording Event resources to the
// Kubernetes API.
Recorder record.EventRecorder
}
func NewJobController(
controllerImpl ControllerInterface,
reconcilerSyncPeriod metav1.Duration,
enableGangScheduling bool,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
workQueueName string) JobController {
log.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()})
jobControllerConfig := JobControllerConfiguration{
ReconcilerSyncLoopPeriod: reconcilerSyncPeriod,
EnableGangScheduling: enableGangScheduling,
}
jc := JobController{
Controller: controllerImpl,
Config: jobControllerConfig,
KubeClientSet: kubeClientSet,
KubeBatchClientSet: kubeBatchClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
}
return jc
}