-
Notifications
You must be signed in to change notification settings - Fork 39.6k
/
cloud_request_manager.go
135 lines (112 loc) · 4.48 KB
/
cloud_request_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudresource
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
)
// SyncManager is an interface for making requests to a cloud provider
type SyncManager interface {
Run(stopCh <-chan struct{})
NodeAddresses() ([]v1.NodeAddress, error)
}
var _ SyncManager = &cloudResourceSyncManager{}
type cloudResourceSyncManager struct {
// Cloud provider interface.
cloud cloudprovider.Interface
// Sync period
syncPeriod time.Duration
nodeAddressesMonitor *sync.Cond
nodeAddressesErr error
nodeAddresses []v1.NodeAddress
nodeName types.NodeName
}
// NewSyncManager creates a manager responsible for collecting resources from a
// cloud provider through requests that are sensitive to timeouts and hanging
func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
return &cloudResourceSyncManager{
cloud: cloud,
syncPeriod: syncPeriod,
nodeName: nodeName,
// nodeAddressesMonitor is a monitor that guards a result (nodeAddresses,
// nodeAddressesErr) of the sync loop under the condition that a result has
// been saved at least once. The semantics here are:
//
// * Readers of the result will wait on the monitor until the first result
// has been saved.
// * The sync loop (i.e. the only writer), will signal all waiters every
// time it updates the result.
nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
}
}
// NodeAddresses waits for the first sync loop to run. If no successful syncs
// have run, it will return the most recent error. If node addresses have been
// synced successfully, it will return the list of node addresses from the most
// recent successful sync.
func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
m.nodeAddressesMonitor.L.Lock()
defer m.nodeAddressesMonitor.L.Unlock()
// wait until there is something
for {
if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
return addrs, err
}
klog.V(5).InfoS("Waiting for cloud provider to provide node addresses")
m.nodeAddressesMonitor.Wait()
}
}
// getNodeAddresses calls the cloud provider to get a current list of node addresses.
func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) {
// TODO(roberthbailey): Can we do this without having credentials to talk to
// the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and
// returned an interface.
// TODO: If IP addresses couldn't be fetched from the cloud provider, should
// kubelet fallback on the other methods for getting the IP below?
instances, ok := m.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
return instances.NodeAddresses(context.TODO(), m.nodeName)
}
func (m *cloudResourceSyncManager) syncNodeAddresses() {
klog.V(5).InfoS("Requesting node addresses from cloud provider for node", "nodeName", m.nodeName)
addrs, err := m.getNodeAddresses()
m.nodeAddressesMonitor.L.Lock()
defer m.nodeAddressesMonitor.L.Unlock()
defer m.nodeAddressesMonitor.Broadcast()
if err != nil {
klog.V(2).InfoS("Node addresses from cloud provider for node not collected", "nodeName", m.nodeName, "err", err)
if len(m.nodeAddresses) > 0 {
// in the event that a sync loop fails when a previous sync had
// succeeded, continue to use the old addresses.
return
}
m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err)
return
}
klog.V(5).InfoS("Node addresses from cloud provider for node collected", "nodeName", m.nodeName)
m.nodeAddressesErr = nil
m.nodeAddresses = addrs
}
// Run starts the cloud resource sync manager's sync loop.
func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh)
}