forked from kubeedge/kubeedge
/
status_manager.go
103 lines (94 loc) · 3.23 KB
/
status_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package status
import (
edgeapi "github.com/kubeedge/kubeedge/common/types"
"time"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/edge/pkg/edged/podmanager"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/client"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/kubelet/status"
)
// manager as status manager, embedded a k8s.io/kubernetes/pkg/kubelet/status.Manager
// inherit it's method but refactored Start() function to periodicity update status to IEF
type manager struct {
status.Manager
// TODO: consider need lock?
podManager podmanager.Manager
apiStatusVersions map[types.UID]*v1.PodStatus
metaClient client.CoreInterface
}
//NewManager creates and returns a new manager object
func NewManager(kubeClient clientset.Interface, podManager podmanager.Manager, podDeletionSafety status.PodDeletionSafetyProvider, metaClient client.CoreInterface) status.Manager {
kubeManager := status.NewManager(kubeClient, podManager, podDeletionSafety)
return &manager{
Manager: kubeManager,
metaClient: metaClient,
podManager: podManager,
apiStatusVersions: make(map[types.UID]*v1.PodStatus),
}
}
const syncPeriod = 10 * time.Second
func (m *manager) Start() {
log.LOGGER.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
go wait.Forever(func() {
select {
case <-syncTicker:
m.updatePodStatus()
}
}, 0)
}
func (m *manager) updatePodStatus() {
for _, pod := range m.podManager.GetPods() {
uid := pod.UID
podStatus, ok := m.GetPodStatus(uid)
if !ok || &podStatus == nil {
continue
}
latestStatus, ok := m.apiStatusVersions[uid]
if ok && apiequality.Semantic.DeepEqual(latestStatus, &podStatus) {
continue
}
s := *podStatus.DeepCopy()
var conditionFlag bool
podCondition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionFalse, Reason: "ContainersNotReady"}
for idx, cs := range podStatus.ContainerStatuses {
if cs.State.Running != nil && cs.State.Running.StartedAt.Unix() == 0 {
newState := v1.ContainerState{Waiting: &v1.ContainerStateWaiting{
Reason: "CrashLoopBackOff",
Message: "Container restarting in container runtime",
}}
s.ContainerStatuses[idx].State = newState
conditionFlag = true
}
}
var podReadyFlag bool
if conditionFlag {
if s.Conditions == nil {
s.Conditions = append(s.Conditions, podCondition)
} else {
for index, condition := range s.Conditions {
if condition.Type == v1.PodReady {
s.Conditions[index].Status = v1.ConditionFalse
s.Conditions[index].Reason = "ContainersNotReady"
podReadyFlag = true
break
}
}
if !podReadyFlag {
s.Conditions = append(s.Conditions, podCondition)
}
}
}
err := m.metaClient.PodStatus(pod.Namespace).Update(pod.Name, edgeapi.PodStatusRequest{UID: pod.UID, Name: pod.Name, Status: s})
if err != nil {
log.LOGGER.Errorf("Update pod status failed err :%v", err)
}
log.LOGGER.Infof("Status for pod %s updated successfully: %+v", pod.Name, podStatus)
m.apiStatusVersions[pod.UID] = podStatus.DeepCopy()
}
}