# Temporal Simulation

Assuming stationary processes, we used the previously demonstrated (check other examples)
average QoS metrics over time by simulating for a very large `max_time`. Here, our goal is to find
temporal characteristics by calculating sample averages (with limited time executation).
To do so, we need to provide the function instances in the current state.

In [1]:
from pacssim.SimProcess import ExpSimProcess
from pacssim.FunctionInstance import FunctionInstance
from pacssim.ServerlessTemporalSimulator import ServerlessTemporalSimulator

from tqdm import tqdm
import numpy as np

cold_service_rate = 1/2.163
warm_service_rate = 1/2.016
expiration_threshold = 600

arrival_rate = 0.9
max_time = 3600

# number of simulations samples produced
num_sim = 1000

running_function_count = 3
idle_function_count = 5

cold_service_process = ExpSimProcess(rate=cold_service_rate)
warm_service_process = ExpSimProcess(rate=warm_service_rate)

def generate_trace():
    idle_functions = []
    for _ in range(idle_function_count):
        f = FunctionInstance(0,
                                cold_service_process,
                                warm_service_process,
                                expiration_threshold
                                )

        f.state = 'IDLE'
        f.is_cold = False
        # when will it be destroyed if no requests
        f.next_termination = 300
        # so that they would be less likely to be chosen by scheduler
        f.creation_time = 0.01
        idle_functions.append(f)

    running_functions = []
    for _ in range(running_function_count):
        f = FunctionInstance(0,
                                cold_service_process,
                                warm_service_process,
                                expiration_threshold
                                )

        f.state = 'IDLE'
        f.is_cold = False
        # transition it into running mode
        f.arrival_transition(0)

        running_functions.append(f)

    sim = ServerlessTemporalSimulator(running_functions, idle_functions, arrival_rate=arrival_rate, warm_service_rate=warm_service_rate, cold_service_rate=cold_service_rate,
                                        expiration_threshold=expiration_threshold, max_time=max_time)
    sim.generate_trace(debug_print=False, progress=False)
    return sim.get_cold_start_prob()

In [2]:
traces = [generate_trace() for _ in tqdm(range(num_sim))]
p_cold = np.mean(traces)
print(f"The probability of cold start request in the next {max_time}s is: {p_cold:.8f}")

100%|██████████| 1000/1000 [02:44<00:00,  6.07it/s]The probability of cold start request in the next 3600s is: 0.00116955



## Distribute Workload using ZeroMQ

In this section, we will be using ZeroMQ to distribute workload of generating temporal traces
among distributed workers. Doing so, gives us the ability to get a much higher throughput of
simulations.

In [3]:
import sys
import time
import random
from threading import Thread
import struct

import zmq

port = "5556"


context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, b'master')
socket_addr = "tcp://127.0.0.1:%s" % port
socket.bind(socket_addr)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

In [12]:
worker_count = 32

stop_signal = False
def worker(context=None, name="worker"):
    context = context or zmq.Context.instance()
    worker = context.socket(zmq.ROUTER)
    worker.connect(socket_addr)

    print(f"Starting thread: {name}")

    poller = zmq.Poller()
    poller.register(worker, zmq.POLLIN)
    while not stop_signal:
        socks = dict(poller.poll(timeout=1000))

        if worker in socks and socks[worker] == zmq.POLLIN:
            ident, message = worker.recv_multipart()
            
            # calculate trace
            msg = struct.pack("d", generate_trace())
            
            worker.send_multipart([ident, msg])

# t = Thread(target=worker)
# t.start()

worker_names = [f"worker-{i}" for i in range(worker_count)]
worker_funcs = [lambda context=None,name=n: worker(context=context, name=name) for n in worker_names]
worker_threads = [Thread(target=f) for f in worker_funcs]
_ = [t.start() for t in worker_threads]

# wait for threads to stabilize
time.sleep(5)

Starting thread: worker-0
Starting thread: worker-1
Starting thread: worker-2
Starting thread: worker-3
Starting thread: worker-4
Starting thread: worker-5
Starting thread: worker-6
Starting thread: worker-7
Starting thread: worker-8
Starting thread: worker-9
Starting thread: worker-10
Starting thread: worker-11
Starting thread: worker-12
Starting thread: worker-13
Starting thread: worker-14
Starting thread: worker-15
Starting thread: worker-16
Starting thread: worker-17
Starting thread: worker-18
Starting thread: worker-19
Starting thread: worker-20
Starting thread: worker-21
Starting thread: worker-22
Starting thread: worker-23
Starting thread: worker-24
Starting thread: worker-25
Starting thread: worker-26
Starting thread: worker-27
Starting thread: worker-28
Starting thread: worker-29
Starting thread: worker-30
Starting thread: worker-31


In [13]:
total_sims = 1000

def sender(num):
    # send the tasks
    for _ in range(num):
        request = b"HI"
        socket.send(request)

st = Thread(target=sender, args=(total_sims,))
st.start()

In [14]:
pbar = tqdm(total=int(total_sims))

received_sims = 0
results = []
# receive the results
while received_sims < total_sims:
    socks = dict(poller.poll(timeout=30000))
    if socks == {}:
        print("Timeout!")
        break

    if socket in socks and socks[socket] == zmq.POLLIN:
        # print("Message from socket: %s" % struct.unpack("d", socket.recv()))
        results.append(struct.unpack("d", socket.recv()))
        received_sims += 1
        pbar.update(1)

pbar.close()

100%|██████████| 1000/1000 [04:30<00:00,  3.69it/s]


In [7]:
p_cold = np.mean(results)
print(f"The probability of cold start request in the next {max_time}s is: {p_cold:.8f}")

The probability of cold start request in the next 3600s is: nan


In [8]:
stop_signal = True
[t.join() for t in worker_threads]
st.is_alive()

False