-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathskippy.py
134 lines (100 loc) · 4.28 KB
/
skippy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Module that glues simulation concepts to skippy concepts.
"""
import copy
import random
from collections import defaultdict
from typing import List, Dict
from ether.core import Node as EtherNode
from skippy.core.clustercontext import ClusterContext
from skippy.core.model import Node as SkippyNode, Capacity as SkippyCapacity, ImageState, Pod, PodSpec, Container, \
ResourceRequirements
from skippy.core.storage import StorageIndex
from skippy.core.utils import counter
from sim import docker
from sim.core import Environment
from sim.faas import FunctionContainer, FunctionDeployment
from sim.topology import LazyBandwidthGraph, DockerRegistry
class SimulationClusterContext(ClusterContext):
def __init__(self, env: Environment):
self.env = env
self.topology = env.topology
self.container_registry: docker.ContainerRegistry = env.container_registry
self.bw_graph = None
self.nodes = None
super().__init__()
self.storage_index = env.storage_index or StorageIndex()
self._storage_nodes = None
def get_init_image_states(self) -> Dict[str, ImageState]:
# FIXME: fix this image state business in skippy
return defaultdict(lambda: None)
def retrieve_image_state(self, image_name: str) -> ImageState:
# FIXME: hacky workaround
images = self.container_registry.find(image_name)
if not images:
raise ValueError('No container image "%s"' % image_name)
if len(images) == 1 and images[0].arch is None:
sizes = {
'x86': images[0].size,
'arm': images[0].size,
'arm32v7': images[0].size,
'aarch64': images[0].size,
'arm64': images[0].size,
'amd64': images[0].size
}
else:
sizes = {image.arch: image.size for image in images if image.arch is not None}
return ImageState(sizes)
def get_bandwidth_graph(self):
if self.bw_graph is None:
self.bw_graph = LazyBandwidthGraph(self.topology)
return self.bw_graph
def list_nodes(self) -> List[SkippyNode]:
if self.nodes is None:
self.nodes = [to_skippy_node(node) for node in self.topology.get_nodes() if node != DockerRegistry]
return self.nodes
def get_next_storage_node(self, node: SkippyNode) -> str:
if self.is_storage_node(node):
return node.name
if not self.storage_nodes:
return None
bw = self.get_bandwidth_graph()[node.name]
storage_nodes = list(self.storage_nodes.values())
random.shuffle(storage_nodes) # make sure you get a random one if bandwidth is the same
storage_node = max(storage_nodes, key=lambda n: bw[n.name])
return storage_node.name
@property
def storage_nodes(self) -> Dict[str, SkippyNode]:
if self._storage_nodes is None:
self._storage_nodes = {node.name: node for node in self.list_nodes() if self.is_storage_node(node)}
return self._storage_nodes
def is_storage_node(self, node: SkippyNode):
return 'data.skippy.io/storage' in node.labels
def to_skippy_node(node: EtherNode) -> SkippyNode:
"""
Converts an ether Node into a skippy model Node.
:param node: the node to convert
:return: the skippy node
"""
capacity = SkippyCapacity(node.capacity.cpu_millis, node.capacity.memory)
allocatable = copy.copy(capacity)
labels = dict(node.labels)
labels['beta.kubernetes.io/arch'] = node.arch
return SkippyNode(node.name, capacity=capacity, allocatable=allocatable, labels=labels)
pod_counters = defaultdict(counter)
def create_function_pod(fd: 'FunctionDeployment', fn: 'FunctionContainer') -> Pod:
"""
Creates a new Pod that hosts the given function.
:param fd: the function deployment to get the deployed function name
:param fn: the function container to package
:return: the Pod
"""
requests = fn.resource_config.get_resource_requirements()
resource_requirements = ResourceRequirements(requests)
spec = PodSpec()
spec.containers = [Container(fn.image, resource_requirements)]
spec.labels = fn.labels
cnt = next(pod_counters[fd.name])
pod = Pod(f'pod-{fd.name}-{cnt}', 'faas-sim')
pod.spec = spec
return pod