-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathfaassim.py
149 lines (99 loc) · 4.67 KB
/
faassim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import logging
import time
from skippy.core.scheduler import Scheduler
from sim.benchmark import Benchmark
from sim.core import Environment, timeout_listener
from sim.docker import ContainerRegistry, pull as docker_pull
from sim.faas import FunctionReplica, FunctionRequest, FunctionSimulator, SimulatorFactory, FunctionContainer
from sim.faas.system import DefaultFaasSystem
from sim.metrics import Metrics, RuntimeLogger
from sim.resource import MetricsServer, ResourceState, ResourceMonitor
from sim.skippy import SimulationClusterContext
from sim.topology import Topology
logger = logging.getLogger(__name__)
class BadPlacementException(BaseException):
pass
class Simulation:
def __init__(self, topology: Topology, benchmark: Benchmark, env: Environment = None, timeout=None, name=None):
self.env = env or Environment()
self.topology = topology
self.benchmark = benchmark
self.timeout = timeout
self.name = name
def run(self):
logger.info('initializing simulation, benchmark: %s, topology nodes: %d',
type(self.benchmark).__name__, len(self.topology.nodes))
env = self.env
env.benchmark = self.benchmark
env.topology = self.topology
self.init_environment(env)
then = time.time()
if self.timeout:
logger.info('starting timeout listener with timeout %d', self.timeout)
env.process(timeout_listener(env, then, self.timeout))
logger.info('starting resource monitor')
env.process(env.resource_monitor.run())
logger.info('setting up benchmark')
self.benchmark.setup(env)
logger.info('starting faas system')
env.faas.start()
logger.info('starting benchmark process')
p = env.process(self.benchmark.run(env))
logger.info('executing simulation')
env.run(until=p)
logger.info('simulation ran %.2fs sim, %.2fs wall', env.now, (time.time() - then))
def init_environment(self, env):
if not env.simulator_factory:
env.simulator_factory = env.simulator_factory or self.create_simulator_factory()
if not env.container_registry:
env.container_registry = self.create_container_registry()
if not env.faas:
env.faas = self.create_faas_system(env)
if not env.metrics:
env.metrics = Metrics(env, RuntimeLogger())
if not env.cluster:
env.cluster = SimulationClusterContext(env)
if not env.scheduler:
env.scheduler = self.create_scheduler(env)
if not env.metrics_server:
env.metrics_server = MetricsServer()
if not env.resource_state:
env.resource_state = ResourceState()
if not env.resource_monitor:
env.resource_monitor = ResourceMonitor(env, 1)
def create_container_registry(self):
return ContainerRegistry()
def create_simulator_factory(self):
return SimpleSimulatorFactory()
def create_faas_system(self, env):
return DefaultFaasSystem(env)
def create_scheduler(self, env):
return Scheduler(env.cluster)
class DummySimulator(FunctionSimulator):
def deploy(self, env: Environment, replica: FunctionReplica):
yield env.timeout(0)
def startup(self, env: Environment, replica: FunctionReplica):
yield env.timeout(0)
def setup(self, env: Environment, replica: FunctionReplica):
yield env.timeout(0)
def invoke(self, env: Environment, replica: FunctionReplica, request: FunctionRequest):
yield env.timeout(0)
def teardown(self, env: Environment, replica: FunctionReplica):
yield env.timeout(0)
class DockerDeploySimMixin:
def deploy(self, env: Environment, replica: FunctionReplica):
yield from docker_pull(env, replica.image, replica.node.ether_node)
class ModeledExecutionSimMixin:
def invoke(self, env: Environment, replica: FunctionReplica, request: FunctionRequest):
# 1) get parameters of base distribution (ideal case)
# 2) check the utilization of the node the replica is running on
# 3) transform distribution parameters with degradation function depending on utilization
# 4) sample from that distribution
logger.info('invoking %s on %s (%d in parallel)', request.name, replica.node.name,
len(replica.node.current_requests))
yield env.timeout(1)
class SimpleFunctionSimulator(ModeledExecutionSimMixin, DockerDeploySimMixin, DummySimulator):
pass
class SimpleSimulatorFactory(SimulatorFactory):
def create(self, env: Environment, fn: FunctionContainer) -> FunctionSimulator:
return SimpleFunctionSimulator()