In [1]:
from nodes.log import LoggerMixIn, LogMixIn

LoggerMixIn.setlevel(LogMixIn.INFO)

Create a simple CDN setup consisting of two clients, two caches and a single origin.

```
[client1] -┐┌-> [cache1] -┐
           ├┤             ├-> [cache3] --> [origin]
[client2] -┘└-> [cache2] -┘
```

- clients are requesting different profiles of Zipf distribution
- with content sizes according to binomial distribution,
- according to different poisson arrival processes,
- the caches are LFU Caches, meaning, they evict the least frequently used objects,
- and the origin is a simple origin.

We are interested about the ingress throughput on the origin. 

In [2]:
# the poisson arrival process implementation
from cdnsim.arrival import Arrival
from numpy.random import default_rng


class Poisson(Arrival):
    def __init__(self, lam: float, **kwargs):
        super().__init__(**kwargs)
        if lam <= 1:
            raise ValueError(f"lambda for poisson should be greater or equal to 1, got: {lam}")
        self.__lam = lam
        self.__rng = default_rng()

    def __next__(self) -> int:
        super().__next__()
        return int(self.__rng.poisson(self.__lam, 1)[0])

In [3]:
# it simply generates arrival numbers
p = Poisson(lam=100, ticks=5)
for i in p:
    print(f"{i} requests at {p.tick} tick")

79 requests at 0 tick
94 requests at 1 tick
100 requests at 2 tick
108 requests at 3 tick
84 requests at 4 tick


In [4]:
# extend the BaseRequests to track tick and content sizes
from typing import List, Hashable, Dict
import pandas as pd
from cdnsim.requests import BaseRequests


class ThroughputRequests(BaseRequests):

    def __init__(self,
                 freqs: List[int] = [],
                 index: Dict[str, List[Hashable]] = {'tick': [], 'content': [], 'size': []}):
        if 'tick' not in index.keys() or 'size' not in index.keys():
            raise SyntaxError(f"'tick', 'size' must be part of the level names, got: {index.keys()}")
        super().__init__(freqs=freqs, index=index)

    @property
    def rpt(self) -> pd.Series:
        return self.groupby('tick').sum()

    @property
    def bpt(self) -> pd.Series:
        return self.reset_index('size').prod(axis=1).groupby('tick').sum()

In [5]:
# the zipf client implementation
from scipy.stats import zipfian, binom

from typing import Iterator, List, Tuple

import numpy as np

from cdnsim import Client
from cdnsim.arrival import Arrival


class ZipfClient(Client):

    def __init__(self, cbase: int, n: int, p: float, a: float, arrival: Arrival):
        self._cbase = cbase
        self._csize = binom.rvs(n, p, size=cbase)
        self._a = a
        self._arrival = arrival
        super().__init__()

    def _generate(self) -> Iterator[List[Tuple[str, ThroughputRequests]]]:
        for k in self._arrival:
            r = np.unique(zipfian.rvs(self._a, self._cbase, size=k), return_counts=True)
            yield [(remote, request) for remote, request in zip(self.remotes,
                                                                ThroughputRequests(freqs=list(r[1]),
                                                                                   index={
                                                                                       'time': np.full(self._cbase,
                                                                                                       self.tick),
                                                                                       'size': self._csize,
                                                                                       'content': list(r[0])}) // len(
                                                                    self.remotes))]

In [6]:
# create client
client1 = ZipfClient(cbase=20, n=50, p=0.5, a=1.6, arrival=Poisson(lam=50, ticks=10))
client2 = ZipfClient(cbase=100, n=200, p=0.3, a=1.1, arrival=Poisson(lam=80, ticks=10))

In [7]:
# PLFU cache implementation
from cdnsim.cache.cache import Cache


class PLFUCache(Cache):
    def __init__(self, size: int, **kwargs):
        super().__init__(**kwargs)
        self._size = size

    def _work(self) -> None:
        while recv := self._receive():
            requests = cast(ThroughputRequests, sum(recv))  # <-- TODO: this can be abstracted in a higher layer
            
            # blablabla... 
            misses = requests.sort_values(ascending=False).reset_index('size').prod(axis=1).cumsum().index

            # blablabla...
            for remote, request in zip(self.remotes, request[misses] // len(self.remotes)):
                self._send(remote, request)
        self._terminate()


In [8]:
# create caches
cache1 = PLFUCache(size=10)
cache2 = PLFUCache(size=10)
cache3 = PLFUCache(size=20)

In [9]:
# origin implementation
from typing import cast

from cdnsim.requests import BaseRequests
from nodes.log import LoggerMixIn
from nodes.node import LNode


class Origin(LoggerMixIn, LNode):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._requests = BaseRequests([], {'tick': [], 'content': []})

    def _work(self) -> None:
        while recv := self._receive():
            assert isinstance(recv, list) and len(recv) > 0, recv
            self._log(f"received {cast(BaseRequests, sum(recv)).sum()} requests")


In [10]:
# create origin
origin = Origin()

In [11]:
# establish connections
client1.connect_to(cache1)
client1.connect_to(cache2)
client2.connect_to(cache1)
client2.connect_to(cache2)
cache1.connect_to(cache3)
cache2.connect_to(cache3)
cache3.connect_to(origin)

In [12]:
# run simulation
client1.start_all()
client1.join_all()