Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
15732 lines (13288 sloc) 566 KB
""" _ _ _ _ ___ ___ ____
___ __ _ | | __ _ | |__ (_) _ __ ___ / | / _ \ / _ \ |___ \
/ __| / _` || | / _` || '_ \ | || '_ ` _ \ | || (_) | | | | | __) |
\__ \| (_| || || (_| || |_) || || | | | | | | | \__, | _ | |_| | _ / __/
|___/ \__,_||_| \__,_||_.__/ |_||_| |_| |_| |_| /_/ (_) \___/ (_)|_____|
Discrete event simulation in Python
see www.salabim.org for more information, the documentation and license information
"""
from __future__ import print_function # compatibility with Python 2.x
from __future__ import division # compatibility with Python 2.x
__version__ = "19.0.2"
import heapq
import random
import time
import math
import array
import collections
import glob
import os
import inspect
import platform
import sys
import itertools
import io
import pickle
import logging
import types
import bisect
import operator
import string
Pythonista = sys.platform == "ios"
Windows = sys.platform.startswith("win")
class g:
pass
if Pythonista:
import scene
import ui
import objc_util
inf = float("inf")
nan = float("nan")
class ItemFile(object):
"""
define an item file to be used with read_item, read_item_int, read_item_float and read_item_bool
Parameters
----------
filename : str
file to be used for subsequent read_item, read_item_int, read_item_float and read_item_bool calls |n|
or |n|
content to be interpreted used in subsequent read_item calls. The content should have at least one linefeed
character and will be usually triple quoted.
Note
----
It is advised to use ItemFile with a context manager, like ::
with sim.ItemFile("experiment0.txt") as f:
run_length = f.read_item_float() |n|
run_name = f.read_item() |n|
Alternatively, the file can be opened and closed explicitely, like ::
f = sim.ItemFile("experiment0.txt")
run_length = f.read_item_float()
run_name = f.read_item()
f.close()
Item files consist of individual items separated by whitespace (blank or tab)|n|
If a blank or tab is required in an item, use single or double quotes |n|
All text following # on a line is ignored |n|
All texts on a line within curly brackets {} is ignored and considered white space. |n|
Curly braces cannot spawn multiple lines and cannot be nested.
Example ::
Item1
"Item 2"
Item3 Item4 # comment
Item5 {five} Item6 {six}
'Double quote" in item'
"Single quote' in item"
True
"""
def __init__(self, filename):
self.iter = self._nextread()
if "\n" in filename:
self.open_file = io.StringIO(filename)
else:
self.open_file = open(filename, "r")
def __enter__(self):
return self
def __exit__(self, *args):
self.open_file.close()
def close(self):
self.open_file.close()
def read_item_int(self):
"""
read next field from the ItemFile as int.
if the end of file is reached, EOFError is raised
"""
return int(self.read_item().replace(",", "."))
def read_item_float(self):
"""
read next item from the ItemFile as float
if the end of file is reached, EOFError is raised
"""
return float(self.read_item().replace(",", "."))
def read_item_bool(self):
"""
read next item from the ItemFile as bool
A value of False (not case sensitive) will return False |n|
A value of 0 will return False |n|
The null string will return False |n|
Any other value will return True
if the end of file is reached, EOFError is raised
"""
result = self.read_item().strip().lower()
if result == "false":
return False
try:
if float(result) == 0:
return False
except (ValueError, TypeError):
pass
if result == "":
return False
return True
def read_item(self):
"""
read next item from the ItemFile
if the end of file is reached, EOFError is raised
"""
try:
return next(self.iter)
except StopIteration:
raise EOFError
def _nextread(self):
remove = "\r\n"
quotes = "'\""
for line in self.open_file:
mode = "."
result = ""
for c in line:
if c not in remove:
if mode in quotes:
if c == mode:
mode = "."
yield result # even return the null string
result = ""
else:
result += c
elif mode == "{":
if c == "}":
mode = "."
else:
if c == "#":
break
if c in quotes:
if result:
yield result
result = ""
mode = c
elif c == "{":
if result:
yield result
result = ""
mode = c
elif c in (" ", "\t"):
if result:
yield result
result = ""
else:
result += c
if result:
yield result
class Monitor(object):
"""
Monitor object
Parameters
----------
name : str
name of the monitor |n|
if the name ends with a period (.),
auto serializing will be applied |n|
if the name end with a comma,
auto serializing starting at 1 will be applied |n|
if omitted, the name will be derived from the class
it is defined in (lowercased)
monitor : bool
if True (default), monitoring will be on. |n|
if False, monitoring is disabled |n|
it is possible to control monitoring later,
with the monitor method
level : bool
if False (default), individual values are tallied, optionally with weight |n|
if True, the tallied vslues are interpreted as levels
initial_tally : any, preferably int, float or translatable into int or float
initial value for the a level monitor |n|
it is important to set the value correctly.
default: 0 |n|
not available for non level monitors
type : str
specifies how tallied values are to be stored
- "any" (default) stores values in a list. This allows
non numeric values. In calculations the values are
forced to a numeric value (0 if not possible)
- "bool" (True, False) Actually integer >= 0 <= 255 1 byte
- "int8" integer >= -128 <= 127 1 byte
- "uint8" integer >= 0 <= 255 1 byte
- "int16" integer >= -32768 <= 32767 2 bytes
- "uint16" integer >= 0 <= 65535 2 bytes
- "int32" integer >= -2147483648<= 2147483647 4 bytes
- "uint32" integer >= 0 <= 4294967295 4 bytes
- "int64" integer >= -9223372036854775808 <= 9223372036854775807 8 bytes
- "uint64" integer >= 0 <= 18446744073709551615 8 bytes
- "float" float 8 bytes
weight_legend : str
used in print_statistics and print_histogram to indicate the dimension of weight or duration (for
level monitors, e.g. minutes. Default: weight for non level monitors, duration for level monitors.
env : Environment
environment where the monitor is defined |n|
if omitted, default_env will be used
"""
cached_xweight = {(ex0, force_numeric): (0, 0) for ex0 in (False, True) for force_numeric in (False, True)}
def __init__(
self,
name=None,
monitor=True,
level=False,
initial_tally=None,
type=None,
weight_legend=None,
env=None,
*args,
**kwargs
):
if env is None:
self.env = g.default_env
else:
self.env = env
_set_name(name, self.env._nameserializeMonitor, self)
self._level = level
self._weight_legend = ("duration" if self._level else "weight") if weight_legend is None else weight_legend
if self._level:
if weight_legend is None:
self.weight_legend = "duration"
else:
self.weight_legend = weight_legend
if initial_tally is None:
self._tally = 0
else:
self._tally = initial_tally
else:
if initial_tally is not None:
raise TypeError("initial_tally not available for non level monitors")
if weight_legend is None:
self.weight_legend = "weight"
else:
self.weight_legend = weight_legend
if type is None:
type = "any"
try:
self.xtypecode, self.off = type_to_typecode_off(type)
except KeyError:
raise ValueError("type '" + type + "' not recognized")
self.xtype = type
self.reset(monitor)
self.setup(*args, **kwargs)
def __add__(self, other):
if other == 0: # to be able to use sum
return self
else:
return self.merge(other)
def __radd__(self, other):
if other == 0: # to be able to use sum
return self
else:
return self.merge(other)
def __mul__(self, other):
try:
other = float(other)
except Exception:
return NotImplemented
return self.multiply(other)
def __rmul__(self, other):
return self * other
def __truediv__(self, other):
try:
other = float(other)
except Exception:
return NotImplemented
return self * (1 / other)
def merge(self, *monitors, **kwargs):
"""
merges this monitor with other monitors
Parameters
----------
monitors : sequence
zero of more monitors to be merged to this monitor
name : str
name of the merged monitor |n|
default: name of this monitor + ".merged"
Returns
-------
merged monitor : Monitor
Note
----
Level monitors can only be merged with level monitors |n|
Non level monitors can only be merged with non level monitors |n|
Only monitors with the same type can be merged |n|
If no monitors are specified, a copy is created. |n|
For level monitors, merging means summing the available x-values|n|
"""
name = kwargs.pop("name", None)
if kwargs:
raise TypeError("merge() got an unexpected keyword argument '" + tuple(kwargs)[0] + "'")
for m in monitors:
if not isinstance(m, Monitor):
raise TypeError("not possible to merge monitor with " + object_to_str(m, True) + " type")
if self._level != m._level:
raise TypeError("not possible to mix level monitor with non level monitor")
if self.xtype != m.xtype:
raise TypeError("not possible to mix type '" + self.xtype + "' with type '" + m.xtype + "'")
if name is None:
if self.name().endswith(".merged"):
# this to avoid multiple .merged (particularly when merging with the + operator)
name = self.name()
else:
name = self.name() + ".merged"
new = Monitor(name=name, type=self.xtype, level=self._level)
merge = [self] + list(monitors)
if new._level:
if new.xtypecode:
new._x = array.array(self.xtypecode)
else:
new._x = []
curx = [new.off] * len(merge)
new._t = array.array("d")
for t, index, x in heapq.merge(
*[zip(merge[index]._t, itertools.repeat(index), merge[index]._x) for index in range(len(merge))]
):
if new.xtypecode:
curx[index] = x
else:
try:
curx[index] = float(x)
except (ValueError, TypeError):
curx[index] = 0
sum = 0
for xi in curx:
if xi == new.off:
sum = new.off
break
sum += xi
if new._t and (t == new._t[-1]):
new._x[-1] = sum
else:
new._t.append(t)
new._x.append(sum)
else:
for t, _, x, weight in heapq.merge(
*[
zip(
merge[index]._t,
itertools.repeat(index),
merge[index]._x,
merge[index]._weight if merge[index]._weight else (1,) * len(merge[index]._x),
)
for index in range(len(merge))
]
):
if weight == 1:
if new._weight:
new._weight.append(weight)
else:
if not new._weight:
new._weight = array.array("d", (1,) * len(new._x))
new._weight.append(weight)
new._t.append(t)
new._x.append(x)
new.monitor(False)
new.isgenerated = True
return new
def __getitem__(self, key):
if isinstance(key, slice):
return self.slice(key.start, key.stop, key.step)
else:
return self.slice(key)
def slice(self, start=None, stop=None, modulo=None, name=None):
"""
slices this monitor (creates a subset)
Parameters
----------
start : float
if modulo is not given, the start of the slice |n|
if modulo is given, this is indicates the slice period start (modulo modulo)
stop : float
if modulo is not given, the end of the slice |n|
if modulo is given, this is indicates the slice period end (modulo modulo) |n|
note that stop is excluded from the slice (open at right hand side)
modulo : float
specifies the distance between slice periods |n|
if not specified, just one slice subset is used.
name : str
name of the sliced monitor |n|
default: name of this monitor + ".sliced"
Returns
-------
sliced monitor : Monitor
"""
new = Monitor(name="slice", type=self.xtype, level=self._level)
if name is None:
name = self.name() + ".sliced"
new = Monitor(level=self._level, type=self.xtype, name=name)
actions = []
if modulo is None:
if start is None:
start = -inf
else:
start += self.env._offset
start = max(start, self.start)
if stop is None:
stop = inf
else:
stop += self.env._offset
stop = min(stop, self.env.now())
actions.append((start, "a", 0, 0))
actions.append((stop, "b", 0, 0)) # non inclusive
else:
if start is None:
raise TypeError("Modulo specified, but no start specified. ")
if stop is None:
raise TypeError("Module specified, but no stop specified")
if stop <= start:
raise ValueError("stop must be > start")
if stop - start >= modulo:
raise ValueError("stop must be < start + modulo")
start = start % modulo
stop = stop % modulo
start1 = self._t[0] - (self._t[0] % modulo) + start
len1 = (stop - start) % modulo
while start1 < self.env._now:
actions.append((start1, "a", 0, 0))
actions.append((start1 + len1, "b", 0, 0)) # non inclusive
start1 += modulo
if new._level:
if new.xtypecode:
new._x = array.array(self.xtypecode)
else:
new._x = []
new._t = array.array("d")
curx = new.off
new._t.append(self.start)
new._x.append(curx)
enabled = False
for (t, type, x, weight) in heapq.merge(
actions,
zip(
self._t,
itertools.repeat("c"),
self._x,
self._weight if (self._weight and not self._level) else (1,) * len(self._x),
),
):
if new._level:
if type == "a":
enabled = True
if new._t[-1] == t:
new._x[-1] = curx
else:
if new._x[-1] == curx:
new._t[-1] = t
else:
new._t.append(t)
new._x.append(curx)
elif type == "b":
enabled = False
if new._t[-1] == t:
new._x[-1] = self.off
else:
if new._x[-1] == self.off:
new._t[-1] = t
else:
new._t.append(t)
new._x.append(self.off)
else:
if enabled:
if curx != x:
if new._t[-1] == t:
new._x[-1] = x
else:
if x == new._x[-1]:
new._t[-1] = t
else:
new._t.append(t)
new._x.append(x)
curx = x
else:
if type == "a":
enabled = True
elif type == "b":
enabled = False
else:
if enabled:
if weight == 1:
if new._weight:
new._weight.append(weight)
else:
if not new._weight:
new._weight = array.array("d", (1,) * len(new._x))
new._weight.append(weight)
new._t.append(t)
new._x.append(x)
new.monitor(False)
new.isgenerated = True
return new
def setup(self):
"""
called immediately after initialization of a monitor.
by default this is a dummy method, but it can be overridden.
only keyword arguments are passed
"""
pass
def register(self, registry):
"""
registers the monitor in the registry
Parameters
----------
registry : list
list of (to be) registered objects
Returns
-------
monitor (self) : Monitor
Note
----
Use Monitor.deregister if monitor does not longer need to be registered.
"""
if not isinstance(registry, list):
raise TypeError("registry not list")
if self in registry:
raise ValueError(self.name() + " already in registry")
registry.append(self)
return self
def deregister(self, registry):
"""
deregisters the monitor in the registry
Parameters
----------
registry : list
list of registered objects
Returns
-------
monitor (self) : Monitor
"""
if not isinstance(registry, list):
raise TypeError("registry not list")
if self not in registry:
raise ValueError(self.name() + " not in registry")
registry.remove(self)
return self
def __repr__(self):
return object_to_str(self) + ("[level]" if self._level else "") + " (" + self.name() + ")"
def __call__(self, t=None): # direct moneypatching __call__ doesn't work
if not self._level:
raise TypeError("get not available for non level monitors")
if t is None:
return self._tally
if t < self._t[0] or t > self.env._now:
return self.off
if t == self.env._now:
return self._tally # even if monitor is off, the current value is valid
i = bisect.bisect_left(list(zip(self._t, itertools.count())), (t, float("inf")))
return self._x[i - 1]
def get(self, t=None):
"""
Parameters
----------
t : float
time at which the value of the level is to be returned |n|
default: now
Returns
-------
last tallied value : any, usually float
Instead of this method, the level monitor can also be called directly, like |n|
level = sim.Monitor("level", level=True) |n|
... |n|
print(level()) |n|
print(level.get()) # identical |n|
Note
----
If the value is not available, self.off will be returned.
"""
self.__call__(t)
def reset_monitors(self, monitor=None):
"""
resets monitor
Parameters
----------
monitor : bool
if True (default), monitoring will be on. |n|
if False, monitoring is disabled |n|
if omitted, the monitor state remains unchanged
Note
----
Exactly same functionality as Monitor.reset()
"""
self.reset(monitor)
def reset(self, monitor=None):
"""
resets monitor
Parameters
----------
monitor : bool
if True, monitoring will be on. |n|
if False, monitoring is disabled
if omitted, no change of monitoring state
"""
if monitor is not None:
self._monitor = monitor
if self.xtypecode:
self._x = array.array(self.xtypecode)
else:
self._x = []
self._weight = False
self._t = array.array("d")
if self._level:
self._weight = True # signal for statistics that weights are present (although not stored in _weight)
if self._monitor:
self._x.append(self._tally)
else:
self._x.append(self.off)
self._t.append(self.env._now)
else:
self._weight = False # weights are only stored if there is a non 1 weight
self.start = self.env.now()
self.isgenerated = False
self.monitor(monitor)
Monitor.cached_xweight = {
(ex0, force_numeric): (0, 0) for ex0 in (False, True) for force_numeric in (False, True)
} # invalidate the cache
def monitor(self, value=None):
"""
enables/disables monitor
Parameters
----------
value : bool
if True, monitoring will be on. |n|
if False, monitoring is disabled |n|
if omitted, no change
Returns
-------
True, if monitoring enabled. False, if not : bool
"""
if value is not None:
if value and self.isgenerated:
raise TypeError("merged or sliced monitors cannot not be turned on")
self._monitor = value
if self._level:
if self._monitor:
self.tally(self._tally)
else:
self._tally_off() # can't use tally() here because self._tally should be untouched
return self.monitor
def tally(self, value, weight=1):
"""
Parameters
----------
x : any, preferably int, float or translatable into int or float
value to be tallied
weight: float
weight to be tallied |n|
default : 1 |n|
"""
if self._level:
if weight != 1:
if self._level:
raise ValueError("level monitor supports only weight=1, not: " + str(weight))
if value == self.off:
raise ValueError("not allowed to tally " + str(self.off) + " (off)")
self._tally = value
if self._monitor:
t = self.env._now
if self._t[-1] == t:
self._x[-1] = value
else:
self._x.append(value)
self._t.append(t)
else:
if self._monitor:
if weight == 1:
if self._weight:
self._weight.append(weight)
else:
if not self._weight:
self._weight = array.array("d", (1,) * len(self._x))
self._weight.append(weight)
self._x.append(value)
self._t.append(self.env._now)
def _tally_off(self):
t = self.env._now
if self._t[-1] == t:
self._x[-1] = self.off
else:
self._x.append(self.off)
self._t.append(t)
def to_years(self, name=None):
"""
makes a monitor with all x-values converted to years
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("years", name=name)
def to_weeks(self, name=None):
"""
makes a monitor with all x-values converted to weeks
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("weeks", name=name)
def to_days(self, name=None):
"""
makes a monitor with all x-values converted to days
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("days", name=name)
def to_hours(self, name=None):
"""
makes a monitor with all x-values converted to hours
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("hours", name=name)
def to_minutes(self, name=None):
"""
makes a monitor with all x-values converted to minutes
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("minutes", name=name)
def to_seconds(self, name=None):
"""
makes a monitor with all x-values converted to seconds
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("seconds", name=name)
def to_milliseconds(self, name=None):
"""
makes a monitor with all x-values converted to milliseconds
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("milliseconds", name=name)
def to_microseconds(self, name=None):
"""
makes a monitor with all x-values converted to microseconds
Parameters
----------
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.to_time_unit("microseconds", name=name)
def to_time_unit(self, time_unit, name=None):
"""
makes a monitor with all x-values converted to the specified time unit
Parameters
----------
time_unit : str
Supported time_units: |n|
"years", "weeks", "days", "hours", "minutes", "seconds", "milliseconds", "microseconds"
name : str
name of the converted monitor |n|
default: name of this monitor
Returns
-------
converted monitor : Monitor
Note
----
Only non level monitors with type float can be converted. |n|
It is required that a time_unit is defined for the environment.
"""
self.env._check_time_unit_na()
return self.multiply(_time_unit_lookup(time_unit) / self.env._time_unit, name=name)
def multiply(self, scale=1, name=None):
"""
makes a monitor with all x-values multiplied with scale
Parameters
----------
scale : float
scale to be applied
name : str
name of the multiplied monitor |n|
default: name of this monitor
Returns
-------
multiplied monitor : Monitor
Note
----
Only non level monitors with type float can be multiplied |n|
"""
if self._level:
raise ValueError("level monitors can't be multiplied")
if self.xtype == "float":
if name is None:
name = self.name()
new = Monitor(name=name, monitor=False, type="float", level=False)
new.isgenerated = True
new._x = [x * scale for x in self._x]
new._t = 1 # self._t[]
return new
else:
raise ValueError("type", self.xtype, " monitors can't be multiplied (only float)")
def name(self, value=None):
"""
Parameters
----------
value : str
new name of the monitor
if omitted, no change
Returns
-------
Name of the monitor : str
Note
----
base_name and sequence_number are not affected if the name is changed
"""
if value is not None:
self._name = value
return self._name
def base_name(self):
"""
Returns
-------
base name of the monitor (the name used at initialization): str
"""
return self._base_name
def sequence_number(self):
"""
Returns
-------
sequence_number of the monitor : int
(the sequence number at initialization) |n|
normally this will be the integer value of a serialized name,
but also non serialized names (without a dot or a comma at the end)
will be numbered)
"""
return self._sequence_number
def mean(self, ex0=False):
"""
mean of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
mean : float
Note
----
For weighs are applied , the weighted mean is returned
"""
x, weight = self._xweight(ex0=ex0)
sumweight = sum(weight)
if sumweight:
return sum(vx * vweight for vx, vweight in zip(x, weight)) / sumweight
else:
return nan
def std(self, ex0=False):
"""
standard deviation of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
standard deviation : float
Note
----
For weights are applied, the weighted standard deviation is returned
"""
x, weight = self._xweight(ex0=ex0)
sumweight = sum(weight)
if sumweight:
wmean = self.mean(ex0=ex0)
wvar = sum((vweight * (vx - wmean) ** 2) for vx, vweight in zip(x, weight)) / sumweight
return math.sqrt(wvar)
else:
return nan
def minimum(self, ex0=False):
"""
minimum of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
minimum : float
"""
x = self._xweight(ex0=ex0)[0]
if x:
return min(x)
else:
return nan
def maximum(self, ex0=False):
"""
maximum of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
maximum : float
"""
x = self._xweight(ex0=ex0)[0]
if x:
return max(x)
else:
return nan
def median(self, ex0=False):
"""
median of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
median : float
Note
----
If weight are applied, the weighted median is returned
"""
return self.percentile(50, ex0=ex0)
def percentile(self, q, ex0=False):
"""
q-th percentile of tallied values
Parameters
----------
q : float
percentage of the distribution |n|
values <0 are treated a 0 |n|
values >100 are treated as 100
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
: float
q-th percentile |n|
0 returns the minimum, 50 the median and 100 the maximum
Note
----
If weights are applied, the weighted percentile is returned
"""
# algorithm based on
# https://stats.stackexchange.com/questions/13169/defining-quantiles-over-a-weighted-sample
q = max(0, min(q, 100))
x, weight = self._xweight(ex0=ex0)
if len(x) == 1:
return x[0]
sumweight = sum(weight)
n = len(weight)
if not sumweight:
return nan
xweight = sorted(zip(x, weight), key=lambda v: v[0])
x_sorted, weight_sorted = zip(*xweight)
cum = 0
s = []
for k in range(n):
s.append((k * weight_sorted[k] + cum))
cum += (n - 1) * weight_sorted[k]
for k in range(n - 1):
if s[k + 1] > s[n - 1] * q / 100:
break
return x_sorted[k] + (x_sorted[k + 1] - x_sorted[k]) * (q / 100 * s[n - 1] - s[k]) / (s[k + 1] - s[k])
def bin_number_of_entries(self, lowerbound, upperbound, ex0=False):
"""
count of the number of tallied values in range (lowerbound,upperbound]
Parameters
----------
lowerbound : float
non inclusive lowerbound
upperbound : float
inclusive upperbound
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
number of values >lowerbound and <=upperbound : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("bin_number_of_entries not available for level monitors")
x = self._xweight(ex0=ex0)[0]
return sum(1 for vx in x if (vx > lowerbound) and (vx <= upperbound))
def bin_weight(self, lowerbound, upperbound):
"""
total weight of tallied values in range (lowerbound,upperbound]
Parameters
----------
lowerbound : float
non inclusive lowerbound
upperbound : float
inclusive upperbound
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
total weight of values >lowerbound and <=upperbound : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("bin_weight not available for level monitors")
return self.sys_bin_weight(lowerbound, upperbound)
def bin_duration(self, lowerbound, upperbound):
"""
total duration of tallied values in range (lowerbound,upperbound]
Parameters
----------
lowerbound : float
non inclusive lowerbound
upperbound : float
inclusive upperbound
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
total duration of values >lowerbound and <=upperbound : int
Note
----
Not available for level monitors
"""
if not self._level:
raise TypeError("bin_duration not available for non level monitors")
return self.sys_bin_weight(lowerbound, upperbound)
def sys_bin_weight(self, lowerbound, upperbound):
x, weight = self._xweight()
return sum((vweight for vx, vweight in zip(x, weight) if (vx > lowerbound) and (vx <= upperbound)))
def value_number_of_entries(self, value):
"""
count of the number of tallied values equal to value or in value
Parameters
----------
value : any
if list, tuple or set, check whether the tallied value is in value |n|
otherwise, check whether the tallied value equals the given value
Returns
-------
number of tallied values in value or equal to value : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("value_number_of_entries not available for level monitors")
if isinstance(value, (list, tuple, set)):
value = [str(v) for v in value]
else:
value = [str(value)]
x = self._xweight(force_numeric=False)[0]
return sum(1 for vx in x if (str(vx).strip() in value))
def value_weight(self, value):
"""
total weight of tallied values equal to value or in value
Parameters
----------
value : any
if list, tuple or set, check whether the tallied value is in value |n|
otherwise, check whether the tallied value equals the given value
Returns
-------
total of weights of tallied values in value or equal to value : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("value_weight not supported for level monitors")
return self.sys_value_weight(value)
def value_duration(self, value):
"""
total duration of tallied values equal to value or in value
Parameters
----------
value : any
if list, tuple or set, check whether the tallied value is in value |n|
otherwise, check whether the tallied value equals the given value
Returns
-------
total of duration of tallied values in value or equal to value : int
Note
----
Not available for non level monitors
"""
if not self._level:
raise TypeError("value_weight not available for non level monitors")
return self.sys_value_weight(value)
def sys_value_weight(self, value):
x, weight = self._xweight(force_numeric=False)
if isinstance(value, (list, tuple, set)):
value = [str(v) for v in value]
else:
value = [str(value)]
return sum(vweight for (vx, vweight) in zip(x, weight) if (str(vx).strip() in value))
def number_of_entries(self, ex0=False):
"""
count of the number of entries
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
number of entries : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("number_of_entries not available for level monitors")
x = self._xweight(ex0=ex0)[0]
return len(x)
def number_of_entries_zero(self):
"""
count of the number of zero entries
Returns
-------
number of zero entries : int
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("number_of_entries_zero not available for level monitors")
return self.number_of_entries() - self.number_of_entries(ex0=True)
def weight(self, ex0=False):
"""
sum of weights
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
sum of weights : float
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("weight not available for level monitors")
return self.sys_weight(ex0)
def duration(self, ex0=False):
"""
total duration
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
total duration : float
Note
----
Not available for non level monitors
"""
if not self._level:
raise TypeError("duration not available for non level monitors")
return self.sys_weight(ex0)
def sys_weight(self, ex0=False):
x, weight = self._xweight(ex0=ex0)
return sum(weight)
def weight_zero(self):
"""
sum of weights of zero entries
Returns
-------
sum of weights of zero entries : float
Note
----
Not available for level monitors
"""
if self._level:
raise TypeError("weight_zero not available for level monitors")
return self.sys_weight_zero()
def duration_zero(self):
"""
total duratiom of zero entries
Returns
-------
total duration of zero entries : float
Note
----
Not available for non level monitors
"""
if not self._level:
raise TypeError("duration_zero not available for non level monitors")
return self.sys_weight_zero()
def sys_weight_zero(self):
return self.sys_weight() - self.sys_weight(ex0=True)
def print_statistics(self, show_header=True, show_legend=True, do_indent=False, as_str=False, file=None):
"""
print monitor statistics
Parameters
----------
show_header: bool
primarily for internal use
show_legend: bool
primarily for internal use
do_indent: bool
primarily for internal use
as_str: bool
if False (default), print the statistics
if True, return a string containing the statistics
file: file
if Noneb(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
statistics (if as_str is True) : str
"""
result = []
if do_indent:
ll = 45
else:
ll = 0
indent = pad("", ll)
if show_header:
result.append(
indent + "Statistics of {} at {}".format(self.name(), fn(self.env._now - self.env._offset, 13, 3))
)
if show_legend:
result.append(indent + " all excl.zero zero")
result.append(pad("-" * (ll - 1) + " ", ll) + "-------------- ------------ ------------ ------------")
if self.sys_weight() == 0:
result.append(pad(self.name(), ll) + "no data")
return return_or_print(result, as_str, file)
if self._weight:
result.append(
pad(self.name(), ll)
+ pad(self.weight_legend, 14)
+ "{}{}{}".format(
fn(self.sys_weight(), 13, 3),
fn(self.sys_weight(ex0=True), 13, 3),
fn(self.sys_weight_zero(), 13, 3),
)
)
else:
result.append(
pad(self.name(), ll)
+ pad("entries", 14)
+ "{}{}{}".format(
fn(self.number_of_entries(), 13, 3),
fn(self.number_of_entries(ex0=True), 13, 3),
fn(self.number_of_entries_zero(), 13, 3),
)
)
result.append(indent + "mean {}{}".format(fn(self.mean(), 13, 3), fn(self.mean(ex0=True), 13, 3)))
result.append(indent + "std.deviation {}{}".format(fn(self.std(), 13, 3), fn(self.std(ex0=True), 13, 3)))
result.append("")
result.append(
indent + "minimum {}{}".format(fn(self.minimum(), 13, 3), fn(self.minimum(ex0=True), 13, 3))
)
result.append(
indent
+ "median {}{}".format(fn(self.percentile(50), 13, 3), fn(self.percentile(50, ex0=True), 13, 3))
)
result.append(
indent
+ "90% percentile{}{}".format(fn(self.percentile(90), 13, 3), fn(self.percentile(90, ex0=True), 13, 3))
)
result.append(
indent
+ "95% percentile{}{}".format(fn(self.percentile(95), 13, 3), fn(self.percentile(95, ex0=True), 13, 3))
)
result.append(
indent + "maximum {}{}".format(fn(self.maximum(), 13, 3), fn(self.maximum(ex0=True), 13, 3))
)
return return_or_print(result, as_str, file)
def histogram_autoscale(self, ex0=False):
"""
used by histogram_print to autoscale |n|
may be overridden.
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
Returns
-------
bin_width, lowerbound, number_of_bins : tuple
"""
xmax = self.maximum(ex0=ex0)
xmin = self.minimum(ex0=ex0)
done = False
for i in range(10):
exp = 10 ** i
for bin_width in (exp, exp * 2, exp * 5):
lowerbound = math.floor(xmin / bin_width) * bin_width
number_of_bins = int(math.ceil((xmax - lowerbound) / bin_width))
if number_of_bins <= 30:
done = True
break
if done:
break
return bin_width, lowerbound, number_of_bins
def print_histograms(
self, number_of_bins=None, lowerbound=None, bin_width=None, values=False, ex0=False, as_str=False, file=None
):
"""
print monitor statistics and histogram
Parameters
----------
number_of_bins : int
number of bins |n|
default: 30 |n|
if <0, also the header of the histogram will be surpressed
lowerbound: float
first bin |n|
default: 0
bin_width : float
width of the bins |n|
default: 1
values : bool
if False (default), bins will be used |n|
if True, the individual values will be shown (sorted on the value).
in that case, no cumulative values will be given |n|
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
as_str: bool
if False (default), print the histogram
if True, return a string containing the histogram
file: file
if None(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
histogram (if as_str is True) : str
Note
----
If number_of_bins, lowerbound and bin_width are omitted, the histogram will be autoscaled,
with a maximum of 30 classes. |n|
Exactly same functionality as Monitor.print_histogram()
"""
return self.print_histogram(number_of_bins, lowerbound, bin_width, values, ex0, as_str=as_str, file=file)
def print_histogram(
self, number_of_bins=None, lowerbound=None, bin_width=None, values=False, ex0=False, as_str=False, file=None
):
"""
print monitor statistics and histogram
Parameters
----------
number_of_bins : int
number of bins |n|
default: 30 |n|
if <0, also the header of the histogram will be surpressed
lowerbound: float
first bin |n|
default: 0
bin_width : float
width of the bins |n|
default: 1
values : bool
if False (default), bins will be used |n|
if True, the individual values will be shown (in the right order).
in that case, no cumulative values will be given |n|
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
as_str: bool
if False (default), print the histogram
if True, return a string containing the histogram
file: file
if None(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
histogram (if as_str is True) : str
Note
----
If number_of_bins, lowerbound and bin_width are omitted, the histogram will be autoscaled,
with a maximum of 30 classes.
"""
result = []
result.append("Histogram of " + self.name() + ("[ex0]" if ex0 else ""))
x, weight = self._xweight(ex0=ex0, force_numeric=not values)
weight_total = sum(weight)
if weight_total == 0:
result.append("")
result.append("no data")
else:
if values:
nentries = len(x)
if self._weight:
result.append(pad(self.weight_legend, 13) + "{}".format(fn(weight_total, 13, 3)))
if not self._level:
result.append(pad("entries", 13) + "{}".format(fn(nentries, 13, 3)))
result.append("")
if self._level:
result.append("value " + rpad(self.weight_legend, 13) + " %")
else:
if self._weight:
result.append("value " + rpad(self.weight_legend, 13) + " % entries %")
else:
result.append("value entries %")
as_set = {str(x).strip() for x in set(x)}
values = sorted(list(as_set), key=self.key)
for value in values:
if self._level:
count = self.value_duration(value)
else:
if self._weight:
count = self.value_weight(value)
count_entries = self.value_number_of_entries(value)
else:
count = self.value_number_of_entries(value)
perc = count / weight_total
scale = 80
n = int(perc * scale)
s = ("*" * n) + (" " * (scale - n))
if self._level:
result.append(pad(str(value), 20) + fn(count, 14, 3) + fn(perc * 100, 6, 1) + " " + s)
else:
if self._weight:
result.append(
pad(str(value), 20)
+ fn(count, 14, 3)
+ fn(perc * 100, 6, 1)
+ rpad(str(count_entries), 8)
+ fn(count_entries * 100 / nentries, 6, 1)
+ " "
+ s
)
else:
result.append(pad(str(value), 20) + rpad(str(count), 7) + fn(perc * 100, 6, 1) + " " + s)
else:
bin_width, lowerbound, number_of_bins = self.histogram_autoscale()
result.append(self.print_statistics(show_header=False, show_legend=True, do_indent=False, as_str=True))
if number_of_bins >= 0:
result.append("")
if self._weight:
result.append(" <= " + rpad(self.weight_legend, 13) + " % cum%")
else:
result.append(" <= entries % cum%")
cumperc = 0
for i in range(-1, number_of_bins + 1):
if i == -1:
lb = -inf
else:
lb = lowerbound + i * bin_width
if i == number_of_bins:
ub = inf
else:
ub = lowerbound + (i + 1) * bin_width
if self._weight:
count = self.sys_bin_weight(lb, ub)
else:
count = self.bin_number_of_entries(lb, ub)
perc = count / weight_total
if weight_total == inf:
s = ""
else:
cumperc += perc
scale = 80
n = int(perc * scale)
ncum = int(cumperc * scale) + 1
s = ("*" * n) + (" " * (scale - n))
s = s[: ncum - 1] + "|" + s[ncum + 1 :]
result.append(
"{} {}{}{} {}".format(
fn(ub, 13, 3), fn(count, 13, 3), fn(perc * 100, 6, 1), fn(cumperc * 100, 6, 1), s
)
)
result.append("")
return return_or_print(result, as_str=as_str, file=file)
def key(self, x):
try:
x1 = float(x)
x2 = ""
except ValueError:
x1 = math.inf
x2 = x
return (x1, x2)
def animate(self, *args, **kwargs):
"""
animates the monitor in a panel
Parameters
----------
linecolor : colorspec
color of the line or points (default foreground color)
linewidth : int
width of the line or points (default 1 for line, 3 for points)
fillcolor : colorspec
color of the panel (default transparent)
bordercolor : colorspec
color of the border (default foreground color)
borderlinewidth : int
width of the line around the panel (default 1)
nowcolor : colorspec
color of the line indicating now (default red)
titlecolor : colorspec
color of the title (default foreground color)
titlefont : font
font of the title (default null string)
titlefontsize : int
size of the font of the title (default 15)
title : str
title to be shown above panel |n|
default: name of the monitor
x : int
x-coordinate of panel, relative to xy_anchor, default 0
y : int
y-coordinate of panel, relative to xy_anchor. default 0
xy_anchor : str
specifies where x and y are relative to |n|
possible values are (default: sw): |n|
``nw n ne`` |n|
``w c e`` |n|
``sw s se``
vertical_offset : float
the vertical position of x within the panel is
vertical_offset + x * vertical_scale (default 0)
vertical_scale : float
the vertical position of x within the panel is
vertical_offset + x * vertical_scale (default 5)
horizontal_scale : float
for timescaled monitors the relative horizontal position of time t within the panel is on
t * horizontal_scale, possibly shifted (default 1)|n|
for non timescaled monitors, the relative horizontal position of index i within the panel is on
i * horizontal_scale, possibly shifted (default 5)|n|
width : int
width of the panel (default 200)
height : int
height of the panel (default 75)
layer : int
layer (default 0)
Returns
-------
reference to AnimateMonitor object : AnimateMonitor
Note
----
It is recommended to use sim.AnimateMonitor instead |n|
All measures are in screen coordinates |n|
"""
return AnimateMonitor(monitor=self, *args, **kwargs)
def x(self, ex0=False, force_numeric=True):
"""
array/list of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
force_numeric : bool
if True (default), convert non numeric tallied values numeric if possible, otherwise assume 0 |n|
if False, do not interpret x-values, return as list if type is any (list)
Returns
-------
all tallied values : array/list
Note
----
Not available for level monitors. Use xduration(), xt() or tx() instead.
"""
if self._level:
raise TypeError("x not available for level monitors")
return self._xweight(ex0=ex0, force_numeric=force_numeric)[0]
def xweight(self, ex0=False, force_numeric=True):
"""
array/list of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
force_numeric : bool
if True (default), convert non numeric tallied values numeric if possible, otherwise assume 0 |n|
if False, do not interpret x-values, return as list if type is list
Returns
-------
all tallied values : array/list
Note
----
not available for level monitors
"""
if self._level:
raise TypeError("xweight not available for level monitors")
return self._xweight(ex0, force_numeric)
def xduration(self, ex0=False, force_numeric=True):
"""
array/list of tallied values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
force_numeric : bool
if True (default), convert non numeric tallied values numeric if possible, otherwise assume 0 |n|
if False, do not interpret x-values, return as list if type is list
Returns
-------
all tallied values : array/list
Note
----
not available for non level monitors
"""
if not self._level:
raise TypeError("xduration not available for non level monitors")
return self._xweight(ex0, force_numeric)
def xt(self, ex0=False, exoff=False, force_numeric=True, add_now=True):
"""
tuple of array/list with x-values and array with timestamp
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
exoff : bool
if False (default), include self.off. if True, exclude self.off's |n|
non level monitors will return all values, regardless of exoff
force_numeric : bool
if True (default), convert non numeric tallied values numeric if possible, otherwise assume 0 |n|
if False, do not interpret x-values, return as list if type is list
add_now : bool
if True (default), the last tallied x-value and the current time is added to the result |n|
if False, the result ends with the last tallied value and the time that was tallied |n|
non level monitors will never add now
Returns
-------
array/list with x-values and array with timestamps : tuple
Note
----
The value self.off is stored when monitoring is turned off |n|
The timestamps are not corrected for any reset_now() adjustment.
"""
if not self._level:
exoff = False
add_now = False
if self.xtypecode or (not force_numeric):
x = self._x
typecode = self.xtypecode
off = self.off
else:
x = list_to_array(self._x)
typecode = x.typecode
off = -inf # float
if typecode:
xx = array.array(typecode)
else:
xx = []
t = array.array("d")
if add_now:
addx = [x[-1]]
addt = [self.env._now]
else:
addx = []
addt = []
for vx, vt in zip(itertools.chain(x, addx), itertools.chain(self._t, addt)):
if not ex0 or (vx != 0):
if not exoff or (vx != off):
xx.append(vx)
t.append(vt)
return xx, t
def tx(self, ex0=False, exoff=False, force_numeric=False, add_now=True):
"""
tuple of array with timestamps and array/list with x-values
Parameters
----------
ex0 : bool
if False (default), include zeroes. if True, exclude zeroes
exoff : bool
if False (default), include self.off. if True, exclude self.off's |n|
non level monitors will return all values, regardless of exoff
force_numeric : bool
if True (default), convert non numeric tallied values numeric if possible, otherwise assume 0 |n|
if False, do not interpret x-values, return as list if type is list
add_now : bool
if True (default), the last tallied x-value and the current time is added to the result |n|
if False, the result ends with the last tallied value and the time that was tallied |n|
non level monitors will never add now
Returns
-------
array with timestamps and array/list with x-values : tuple
Note
----
The value self.off is stored when monitoring is turned off |n|
The timestamps are not corrected for any reset_now() adjustment.
"""
return tuple(reversed(self.xt(ex0=ex0, exoff=exoff, force_numeric=force_numeric, add_now=add_now)))
def _xweight(self, ex0=False, force_numeric=True):
if self._level:
thishash = hash((self, len(self._x), max(self.env.t, self.env._now)))
else:
thishash = hash((self, len(self._x)))
if Monitor.cached_xweight[(ex0, force_numeric)][0] == thishash:
return Monitor.cached_xweight[(ex0, force_numeric)][1]
if self.xtypecode or (not force_numeric):
x = self._x
typecode = self.xtypecode
else:
x = list_to_array(self._x)
typecode = x.typecode
if self._level:
weightall = array.array("d")
lastt = None
for t in self._t:
if lastt is not None:
weightall.append(t - lastt)
lastt = t
weightall.append(self.env._now - lastt)
weight = array.array("d")
if typecode:
xx = array.array(typecode)
else:
xx = []
for vx, vweight in zip(x, weightall):
if vx != self.off:
xx.append(vx)
weight.append(vweight)
xweight = (xx, weight)
else:
if ex0:
x = [vx for vx in x if vx != 0]
if typecode:
x = array.array(typecode, x)
weight = self._weight
if self._weight:
if ex0:
xweight = (x, array.array("d", [vweight for vx, vweight in zip(x, self._weight) if vx != 0]))
else:
xweight = (x, self._weight)
else:
xweight = (x, array.array("d", (1,) * len(x)))
Monitor.cached_xweight[(ex0, force_numeric)] = (thishash, xweight)
return xweight
class AnimateMonitor(object):
"""
animates a monitor in a panel
Parameters
----------
linecolor : colorspec
color of the line or points (default foreground color)
linewidth : int
width of the line or points (default 1 for line, 3 for points)
fillcolor : colorspec
color of the panel (default transparent)
bordercolor : colorspec
color of the border (default foreground color)
borderlinewidth : int
width of the line around the panel (default 1)
nowcolor : colorspec
color of the line indicating now (default red)
titlecolor : colorspec
color of the title (default foreground color)
titlefont : font
font of the title (default null string)
titlefontsize : int
size of the font of the title (default 15)
title : str
title to be shown above panel |n|
default: name of the monitor
x : int
x-coordinate of panel, relative to xy_anchor, default 0
y : int
y-coordinate of panel, relative to xy_anchor. default 0
xy_anchor : str
specifies where x and y are relative to |n|
possible values are (default: sw): |n|
``nw n ne`` |n|
``w c e`` |n|
``sw s se``
vertical_offset : float
the vertical position of x within the panel is
vertical_offset + x * vertical_scale (default 0)
vertical_scale : float
the vertical position of x within the panel is
vertical_offset + x * vertical_scale (default 5)
horizontal_scale : float
the relative horizontal position of time t within the panel is on
t * horizontal_scale, possibly shifted (default 1)|n|
width : int
width of the panel (default 200)
height : int
height of the panel (default 75)
layer : int
layer (default 0)
parent : Component
component where this animation object belongs to (default None) |n|
if given, the animation object will be removed
automatically upon termination of the parent
Note
----
All measures are in screen coordinates |n|
"""
def __init__(
self,
monitor,
linecolor="fg",
linewidth=None,
fillcolor="",
bordercolor="fg",
borderlinewidth=1,
titlecolor="fg",
nowcolor="red",
titlefont="",
titlefontsize=15,
title=None,
x=0,
y=0,
vertical_offset=2,
parent=None,
vertical_scale=5,
horizontal_scale=None,
width=200,
height=75,
xy_anchor="sw",
layer=0,
):
_checkismonitor(monitor)
if title is None:
title = monitor.name()
if linewidth is None:
linewidth = 1 if monitor._level else 3
if horizontal_scale is None:
horizontal_scale = 1
xll = x + monitor.env.xy_anchor_to_x(xy_anchor, screen_coordinates=True)
yll = y + monitor.env.xy_anchor_to_y(xy_anchor, screen_coordinates=True)
self.aos = []
self.parent = parent
self.env = monitor.env
self.aos.append(
AnimateRectangle(
spec=(0, 0, width, height),
offsetx=xll,
offsety=yll,
fillcolor=fillcolor,
linewidth=borderlinewidth,
linecolor=bordercolor,
screen_coordinates=True,
layer=layer,
)
)
self.aos.append(
AnimateText(
text=title,
textcolor=titlecolor,
offsetx=xll,
offsety=yll + height + titlefontsize * 0.15,
text_anchor="sw",
screen_coordinates=True,
fontsize=titlefontsize,
font=titlefont,
layer=layer,
)
)
self.aos.append(
_Animate_t_Line(
line0=(),
linecolor0=nowcolor,
monitor=monitor,
width=width,
height=height,
t_scale=horizontal_scale,
layer=layer,
offsetx0=xll,
offsety0=yll,
screen_coordinates=True,
)
)
self.aos.append(
_Animate_t_x_Line(
monitor=monitor,
linecolor0=linecolor,
line0=(),
linewidth0=linewidth,
as_points=not monitor._level,
screen_coordinates=True,
offsetx0=xll,
offsety0=yll,
width=width,
height=height,
value_offsety=vertical_offset,
value_scale=vertical_scale,
linewidth=linewidth,
t_scale=horizontal_scale,
layer=layer,
)
)
self.env.sys_objects.append(self)
def update(self, t):
pass
def remove(self):
"""
removes the animate object and thus closes this animation
"""
for ao in self.aos:
ao.remove()
self.env.sys_objects.remove(self)
if Pythonista:
class AnimationScene(scene.Scene):
def __init__(self, env, *args, **kwargs):
scene.Scene.__init__(self, *args, **kwargs)
def setup(self):
pass
def touch_ended(self, touch):
env = g.animation_env
if env is not None:
for uio in env.ui_objects:
ux = uio.x + env.xy_anchor_to_x(uio.xy_anchor, screen_coordinates=True)
uy = uio.y + env.xy_anchor_to_y(uio.xy_anchor, screen_coordinates=True)
if uio.type == "button":
if touch.location in scene.Rect(ux - 2, uy - 2, uio.width + 2, uio.height + 2):
uio.action()
break # new buttons might have been installed
if uio.type == "slider":
if touch.location in scene.Rect(ux - 2, uy - 2, uio.width + 4, uio.height + 4):
xsel = touch.location[0] - ux
uio._v = uio.vmin + round(-0.5 + xsel / uio.xdelta) * uio.resolution
uio._v = max(min(uio._v, uio.vmax), uio.vmin)
if uio.action is not None:
uio.action(uio._v)
break # new items might have been installed
def draw(self):
env = g.animation_env
g.in_draw = True
if (env is not None) and env._animate and env.running:
scene.background(env.pythonistacolor("bg"))
if env._synced or env._video: # video forces synced
if env._video:
env.t = env.video_t
else:
if env.paused:
env.t = env.start_animation_time
else:
env.t = env.start_animation_time + (
(time.time() - env.start_animation_clocktime) * env._speed
)
while (env.peek() < env.t) and env.running and env._animate:
env.step()
else:
if (env._step_pressed or (not env.paused)) and env._animate:
env.step()
if not env._current_component._suppress_pause_at_step:
env._step_pressed = False
env.t = env._now
if not env.paused:
env.frametimes.append(time.time())
env.an_objects.sort(key=lambda obj: (-obj.layer(env.t), obj.sequence))
touchvalues = self.touches.values()
capture_image = Image.new("RGB", (env._width, env._height), env.colorspec_to_tuple("bg"))
env.animation_pre_tick(env.t)
env.animation_pre_tick_sys(env.t)
for ao in env.an_objects:
ao.make_pil_image(env.t)
if ao._image_visible:
capture_image.paste(
ao._image, (int(ao._image_x), int(env._height - ao._image_y - ao._image.size[1])), ao._image
)
env.animation_post_tick(env.t)
ims = scene.load_pil_image(capture_image)
scene.image(ims, 0, 0, *capture_image.size)
scene.unload_image(ims)
if env._video and (not env.paused):
if env._video_out == "gif":
env._images.append(capture_image.convert("RGB"))
elif env._video_out == "snapshots":
capture_image.save(env._video_name)
env._video_name = incstr(env._video_name)
else:
pass # this should never occur
for uio in env.ui_objects:
ux = uio.x + env.xy_anchor_to_x(uio.xy_anchor, screen_coordinates=True)
uy = uio.y + env.xy_anchor_to_y(uio.xy_anchor, screen_coordinates=True)
if uio.type == "entry":
raise NotImplementedError("AnimateEntry not supported on Pythonista")
if uio.type == "button":
linewidth = uio.linewidth
scene.push_matrix()
scene.fill(env.pythonistacolor(uio.fillcolor))
scene.stroke(env.pythonistacolor(uio.linecolor))
scene.stroke_weight(linewidth)
scene.rect(ux - 4, uy + 2, uio.width + 8, uio.height - 4)
scene.tint(env.pythonistacolor(uio.color))
scene.translate(ux + uio.width / 2, uy + uio.height / 2)
scene.text(uio.text(), uio.font, uio.fontsize, alignment=5)
scene.tint(1, 1, 1, 1)
# required for proper loading of images
scene.pop_matrix()
elif uio.type == "slider":
scene.push_matrix()
scene.tint(env.pythonistacolor(uio.labelcolor))
v = uio.vmin
x = ux + uio.xdelta / 2
y = uy
mindist = inf
v = uio.vmin
while v <= uio.vmax:
if abs(v - uio._v) < mindist:
mindist = abs(v - uio._v)
vsel = v
v += uio.resolution
thisv = uio._v
for touch in touchvalues:
if touch.location in scene.Rect(ux, uy, uio.width, uio.height):
xsel = touch.location[0] - ux
vsel = round(-0.5 + xsel / uio.xdelta) * uio.resolution
thisv = vsel
scene.stroke(env.pythonistacolor(uio.linecolor))
v = uio.vmin
xfirst = -1
while v <= uio.vmax:
if xfirst == -1:
xfirst = x
if v == vsel:
scene.stroke_weight(3)
else:
scene.stroke_weight(1)
scene.line(x, y, x, y + uio.height)
v += uio.resolution
x += uio.xdelta
scene.push_matrix()
scene.translate(xfirst, uy + uio.height + 2)
if uio.label:
scene.text(uio.label, uio.font, uio.fontsize, alignment=9)
scene.pop_matrix()
scene.translate(ux + uio.width, uy + uio.height + 2)
scene.text(str(thisv) + " ", uio.font, uio.fontsize, alignment=7)
scene.tint(1, 1, 1, 1)
# required for proper loading of images later
scene.pop_matrix()
else:
width, height = ui.get_screen_size()
scene.pop_matrix()
scene.tint(1, 1, 1, 1)
scene.translate(width / 2, height / 2)
scene.text("salabim animation paused/stopped")
scene.pop_matrix()
scene.tint(1, 1, 1, 1)
if env is not None:
if env._video:
if not env.paused:
env.video_t += env._speed / env._fps
g.in_draw = False
class Qmember:
def __init__(self):
pass
def insert_in_front_of(self, m2, c, q, priority):
m1 = m2.predecessor
m1.successor = self
m2.predecessor = self
self.predecessor = m1
self.successor = m2
self.priority = priority
self.component = c
self.queue = q
self.enter_time = c.env._now
q._length += 1
for iter in q._iter_touched:
q._iter_touched[iter] = True
c._qmembers[q] = self
if q.env._trace:
if not q._isinternal:
q.env.print_trace("", "", c.name(), "enter " + q.name())
q.length.tally(q._length)
q.number_of_arrivals += 1
class Queue(object):
"""
Queue object
Parameters
----------
fill : Queue, list or tuple
fill the queue with the components in fill |n|
if omitted, the queue will be empty at initialization
name : str
name of the queue |n|
if the name ends with a period (.),
auto serializing will be applied |n|
if the name end with a comma,
auto serializing starting at 1 will be applied |n|
if omitted, the name will be derived from the class
it is defined in (lowercased)
monitor : bool
if True (default) , both length and length_of_stay are monitored |n|
if False, monitoring is disabled.
env : Environment
environment where the queue is defined |n|
if omitted, default_env will be used
"""
def __init__(self, name=None, monitor=True, fill=None, env=None, *args, **kwargs):
if env is None:
self.env = g.default_env
else:
self.env = env
_set_name(name, self.env._nameserializeQueue, self)
self._head = Qmember()
self._tail = Qmember()
self._head.successor = self._tail
self._head.predecessor = None
self._tail.successor = None
self._tail.predecessor = self._head
self._head.component = None
self._tail.component = None
self._head.priority = 0
self._tail.priority = 0
self._length = 0
self._iter_sequence = 0
self._iter_touched = {}
self._isinternal = False
self.arrival_rate(reset=True)
self.departure_rate(reset=True)
self.length = Monitor(
"Length of " + self.name(), level=True, initial_tally=0, monitor=monitor, type="uint32", env=self.env
)
self.length_of_stay = Monitor("Length of stay in " + self.name(), monitor=monitor, type="float")
if fill is not None:
savetrace = self.env._trace
self.env._trace = False
for c in fill:
c.enter(self)
self.env._trace = savetrace
if self.env._trace:
self.env.print_trace("", "", self.name() + " create")
self.setup(*args, **kwargs)
def setup(self):
"""
called immediately after initialization of a queue.
by default this is a dummy method, but it can be overridden.
only keyword arguments are passed
"""
pass
def animate(self, *args, **kwargs):
"""
Animates the components in the queue.
Parameters
----------
x : float
x-position of the first component in the queue |n|
default: 50
y : float
y-position of the first component in the queue |n|
default: 50
direction : str
if "w", waiting line runs westwards (i.e. from right to left) |n|
if "n", waiting line runs northeards (i.e. from bottom to top) |n|
if "e", waiting line runs eastwards (i.e. from left to right) (default) |n|
if "s", waiting line runs southwards (i.e. from top to bottom)
reverse : bool
if False (default), display in normal order. If True, reversed.
max_length : int
maximum number of components to be displayed
xy_anchor : str
specifies where x and y are relative to |n|
possible values are (default: sw): |n|
``nw n ne`` |n|
``w c e`` |n|
``sw s se``
id : any
the animation works by calling the animation_objects method of each component, optionally
with id. By default, this is self, but can be overriden, particularly with the queue
arg : any
this is used when a parameter is a function with two parameters, as the first argument or
if a parameter is a method as the instance |n|
default: self (instance itself)
Returns
-------
reference to AnimationQueue object : AnimationQueue
Note
----
It is recommended to use sim.AnimateQueue instead |n|
All measures are in screen coordinates |n|
All parameters, apart from queue and arg can be specified as: |n|
- a scalar, like 10 |n|
- a function with zero arguments, like lambda: title |n|
- a function with one argument, being the time t, like lambda t: t + 10 |n|
- a function with two parameters, being arg (as given) and the time, like lambda comp, t: comp.state |n|
- a method instance arg for time t, like self.state, actually leading to arg.state(t) to be called
"""
return AnimateQueue(self, *args, **kwargs)
def reset_monitors(self, monitor=None):
"""
resets queue monitor length_of_stay and length
Parameters
----------
monitor : bool
if True, monitoring will be on. |n|
if False, monitoring is disabled |n|
if omitted, no change of monitoring state
Note
----
it is possible to reset individual monitoring with length_of_stay.reset() and length.reset()
"""
self.length.reset(monitor=monitor)
self.length_of_stay.reset(monitor=monitor)
def arrival_rate(self, reset=False):
"""
returns the arrival rate |n|
When the queue is created, the registration is reset.
Parameters
----------
reset : bool
if True, number_of_arrivals is set to 0 since last reset and the time of the last reset to now |n|
default: False ==> no reset
Returns
-------
arrival rate : float
number of arrivals since last reset / duration since last reset |n|
nan if duration is zero
"""
if reset:
self.number_of_arrivals = 0
self.number_of_arrivals_t0 = self.env._now
duration = self.env._now - self.number_of_arrivals_t0
if duration == 0:
return nan
else:
return self.number_of_arrivals / duration
def departure_rate(self, reset=False):
"""
returns the departure rate |n|
When the queue is created, the registration is reset.
Parameters
----------
reset : bool
if True, number_of_departures is set to 0 since last reset and the time of the last reset to now |n|
default: False ==> no reset
Returns
-------
departure rate : float
number of departures since last reset / duration since last reset |n|
nan if duration is zero
"""
if reset:
self.number_of_departures = 0
self.number_of_departures_t0 = self.env._now
duration = self.env._now - self.number_of_departures_t0
if duration == 0:
return nan
else:
return self.number_of_departures / duration
def monitor(self, value):
"""
enables/disables monitoring of length_of_stay and length
Parameters
----------
value : bool
if True, monitoring will be on. |n|
if False, monitoring is disabled |n|
Note
----
it is possible to individually control monitoring with length_of_stay.monitor() and length.monitor()
"""
self.length.monitor(value=value)
self.length_of_stay.monitor(value=value)
def register(self, registry):
"""
registers the queue in the registry
Parameters
----------
registry : list
list of (to be) registered objects
Returns
-------
queue (self) : Queue
Note
----
Use Queue.deregister if queue does not longer need to be registered.
"""
if not isinstance(registry, list):
raise TypeError("registry not list")
if self in registry:
raise ValueError(self.name() + " already in registry")
registry.append(self)
return self
def deregister(self, registry):
"""
deregisters the queue in the registry
Parameters
----------
registry : list
list of registered queues
Returns
-------
queue (self) : Queue
"""
if not isinstance(registry, list):
raise TypeError("registry not list")
if self not in registry:
raise ValueError(self.name() + " not in registry")
registry.remove(self)
return self
def __repr__(self):
return object_to_str(self) + " (" + self.name() + ")"
def print_info(self, as_str=False, file=None):
"""
prints information about the queue
Parameters
----------
as_str: bool
if False (default), print the info
if True, return a string containing the info
file: file
if None(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
info (if as_str is True) : str
"""
result = []
result.append(object_to_str(self) + " " + hex(id(self)))
result.append(" name=" + self.name())
if self._length:
result.append(" component(s):")
mx = self._head.successor
while mx != self._tail:
result.append(
" "
+ pad(mx.component.name(), 20)
+ " enter_time"
+ self.env.time_to_str(mx.enter_time - self.env._offset)
+ " priority="
+ str(mx.priority)
)
mx = mx.successor
else:
result.append(" no components")
return return_or_print(result, as_str, file)
def print_statistics(self, as_str=False, file=None):
"""
prints a summary of statistics of a queue
Parameters
----------
as_str: bool
if False (default), print the statistics
if True, return a string containing the statistics
file: file
if None(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
statistics (if as_str is True) : str
"""
result = []
result.append("Statistics of {} at {}".format(self.name(), fn(self.env._now - self.env._offset, 13, 3)))
result.append(self.length.print_statistics(show_header=False, show_legend=True, do_indent=True, as_str=True))
result.append("")
result.append(
self.length_of_stay.print_statistics(show_header=False, show_legend=False, do_indent=True, as_str=True)
)
return return_or_print(result, as_str, file)
def print_histograms(self, exclude=(), as_str=False, file=None):
"""
prints the histograms of the length and length_of_stay monitor of the queue
Parameters
----------
exclude : tuple or list
specifies which monitors to exclude |n|
default: () |n|
as_str: bool
if False (default), print the histograms
if True, return a string containing the histograms
file: file
if None(default), all output is directed to stdout |n|
otherwise, the output is directed to the file
Returns
-------
histograms (if as_str is True) : str
"""
result = []
for m in (self.length, self.length_of_stay):
if m not in exclude:
result.append(m.print_histogram(as_str=True))
return return_or_print(result, as_str, file)
def name(self, value=None):
"""
Parameters
----------
value : str
new name of the queue
if omitted, no change
Returns
-------
Name of the queue : str
Note
----
base_name and sequence_number are not affected if the name is changed |n|
All derived named are updated as well.
"""
if value is not None:
self._name = value
self.length.name("Length of " + self.name())
self.length_of_stay.name("Length of stay of " + self.name())
return self._name
def base_name(self):
"""
Returns
-------
base name of the queue (the name used at initialization): str
"""
return self._base_name
def sequence_number(self):
"""
Returns
-------
sequence_number of the queue : int
(the sequence number at initialization) |n|
normally this will be the integer value of a serialized name,
but also non serialized names (without a dot or a comma at the end)
will be numbered)
"""
return self._sequence_number
def add(self, component):
"""
adds a component to the tail of a queue
Parameters
----------
component : Component
component to be added to the tail of the queue |n|
may not be member of the queue yet
Note
----
the priority will be set to
the priority of the tail of the queue, if any
or 0 if queue is empty |n|
This method is equivalent to append()
"""
component.enter(self)
return self
def append(self, component):
"""
appends a component to the tail of a queue
Parameters
----------
component : Component
component to be appened to the tail of the queue |n|
may not be member of the queue yet
Note
----
the priority will be set to
the priority of the tail of the queue, if any
or 0 if queue is empty |n|
This method is equivalent to add()
"""
component.enter(self)
return self
def add_at_head(self, component):
"""
adds a component to the head of a queue
Parameters
----------
component : Component
component to be added to the head of the queue |n|
may not be member of the queue yet
Note
----
the priority will be set to
the priority of the head of the queue, if any
or 0 if queue is empty
"""
component.enter_at_head(self)
return self
def add_in_front_of(self, component, poscomponent):
"""
adds a component to a queue, just in front of a component
Parameters
----------
component : Component
component to be added to the queue |n|
may not be member of the queue yet
poscomponent : Component
component in front of which component will be inserted |n|
must be member of the queue
Note
----
the priority of component will be set to the priority of poscomponent
"""
component.enter_in_front_of(self, poscomponent)
return self
def insert(self, index, component):
"""
Insert component before index-th element of the queue
Parameters
----------
index : int
component to be added just before index'th element |n|
should be >=0 and <=len(self)
component : Component
component to be added to the queue
Note
----
the priority of component will be set to the priority of the index'th component,
or 0 if the queue is empty
"""
if index < 0:
raise IndexError("index < 0")
if index > self._length:
raise IndexError("index > lengh of queue")
component._checknotinqueue(self)
mx = self._head.successor
count = 0
while mx != self._tail:
if count == index:
break
count = count + 1
mx = mx.successor
priority = mx.priority
Qmember().insert_in_front_of(mx, component, self, priority)
return self
def add_behind(self, component, poscomponent):
"""
adds a component to a queue, just behind a component
Parameters
----------
component : Component
component to be added to the queue |n|
may not be member of the queue yet
poscomponent : Component
component behind which component will be inserted |n|
must be member of the queue
Note
----
the priority of component will be set to the priority of poscomponent
"""
component.enter_behind(self, poscomponent)
return self
def add_sorted(self, component, priority):
"""
adds a component to a queue, according to the priority
Parameters
----------
component : Component
component to be added to the queue |n|
may not be member of the queue yet
priority: type that can be compared with other priorities in the queue
priority in the queue
Note
----
The component is placed just before the first component with a priority > given priority
"""
component.enter_sorted(self, priority)
return self
def remove(self, component=None):
"""
removes component from the queue
Parameters
----------
component : Component
component to be removed |n|
if omitted, all components will be removed.
Note
----
component must be member of the queue
"""
if component is None:
self.clear()
else:
component.leave(self)
return self
def head(self):
"""
Returns
-------
the head component of the queue, if any. None otherwise : Component
Note
----
q[0] is a more Pythonic way to access the head of the queue
"""
return self._head.successor.component
def tail(self):
"""
Returns
-------
the tail component of the queue, if any. None otherwise : Component
Note
-----
q[-1] is a more Pythonic way to access the tail of the queue
"""
return self._tail.predecessor.component
def pop(self, index=None):
"""
removes a component by its position (or head)
Parameters
----------
index : int
index-th element to remove, if any |n|
if omitted, return the head of the queue, if any
Returns
-------
The i-th component or head : Component
None if not existing
"""
if index is None:
c = self._head.successor.component
else:
c = self[index]
if c is not None:
c.leave(self)
return c
def successor(self, component):
"""
successor in queue
Parameters
----------
component : Component
component whose successor to return |n|
must be member of the queue
Returns
-------
successor of component, if any : Component
None otherwise
"""
return component.successor(self)
def predecessor(self, component):
"""
predecessor in queue
Parameters
----------
component : Component
component whose predecessor to return |n|
must be member of the queue
Returns
-------
predecessor of component, if any : Component |n|
None otherwise.
"""
return component.predecessor(self)
def __contains__(self, component):
return component._member(self) is not None
def __getitem__(self, key):
if isinstance(key, slice):
# Get the start, stop, and step from the slice
startval, endval, incval = key.indices(self._length)
if incval > 0:
result = []
targetval = startval
mx = self._head.successor
count = 0
while mx != self._tail:
if targetval >= endval:
break
if targetval == count:
result.append(mx.component)
targetval += incval
count += 1
mx = mx.successor
else:
result = []
targetval = startval
mx = self._tail.predecessor
count = self._length - 1
while mx != self._head:
if targetval <= endval:
break
if targetval == count:
result.append(mx.component)
targetval += incval # incval is negative here!
count -= 1
mx = mx.predecessor
return list(result)
elif isinstance(key, int):
if key < 0: # Handle negative indices
key += self._length
if key < 0 or key >= self._length:
return None
mx = self._head.successor
count = 0
while mx != self._tail:
if count == key:
return mx.component
count = count + 1
mx = mx.successor
return None # just for safety
else:
raise TypeError("Invalid argument type: " + object_to_str(key))
def __delitem__(self, key):
if isinstance(key, slice):
for c in self[key]:
self.remove(c)
elif isinstance(key, int):
self.remove(self[key])
else:
raise TypeError("Invalid argument type:" + object_to_str(key))
def __len__(self):
return self._length
def __reversed__(self):
self._iter_sequence += 1
iter_sequence = self._iter_sequence
self._iter_touched[iter_sequence] = False
iter_list = []
mx = self._tail.predecessor
while mx != self._head:
iter_list.append(mx)
mx = mx.predecessor
iter_index = 0
while len(iter_list) > iter_index:
if self._iter_touched[iter_sequence]:
# place all taken qmembers on the list
iter_list = iter_list[:iter_index]
mx = self._tail.predecessor
while mx != self._head:
if mx not in iter_list:
iter_list.append(mx)
mx = mx.precessor
self._iter_touched[iter_sequence] = False
else:
c = iter_list[iter_index].component
if c is not None: # skip deleted components
yield c
iter_index += 1
del self._iter_touched[iter_sequence]
def __add__(self, q):
return self.union(q)
def __or__(self, q):
return self.union(q)
def __sub__(self, q):
return self.difference(q)
def __and__(self, q):
return self.intersection(q)
def __xor__(self, q):
return self.symmetric_difference(q)
def count(self, component):
"""
component count
Parameters
---------
component : Component
component to count
Returns
-------
number of occurences of component in the queue
Note
----
The result can only be 0 or 1
"""
return component.count(self)
def index(self, component):
"""
get the index of a component in the queue
Parameters
----------
component : Component
component to be queried |n|
does not need to be in the queue
Returns
-------
index of component in the queue : int
0 denotes the head, |n|
returns -1 if component is not in the queue
"""
return component.index(self)
def component_with_name(self, txt):
"""
returns a component in the queue according to its name
Parameters
----------
txt : str
name of component to be retrieved
Returns
-------
the first component in the queue with name txt : Component |n|
returns None if not found
"""
mx = self._head.successor
while mx != self._tail:
if mx.component.name() == txt:
return mx.component
mx = mx.successor
return None
def __iter__(self):
self._iter_sequence += 1
iter_sequence = self._iter_sequence
self._iter_touched[iter_sequence] = False
iter_list = []
mx = self._head.successor
while mx != self._tail:
iter_list.append(mx)
mx = mx.successor
iter_index = 0
while len(iter_list) > iter_index:
if self._iter_touched[iter_sequence]:
# place all taken qmembers on the list
iter_list = iter_list[:iter_index]
mx = self._head.successor
while mx != self._tail:
if mx not in iter_list:
iter_list.append(mx)
mx = mx.successor
self._iter_touched[iter_sequence] = False
else:
c = iter_list[iter_index].component
if c is not None: # skip deleted components
yield c
iter_index += 1
del self._iter_touched[iter_sequence]
def extend(self, q):
"""
extends the queue with components of q that are not already in self
Parameters
----------
q : queue, list or tuple
Note
----
The components added to the queue will get the priority of the tail of self.
"""
savetrace = self.env._trace
self.env._trace = False
for c in q:
if c not in self:
c.enter(self)
self.env._trace = savetrace
return self
def as_set(self):
return {c for c in self}
def as_list(self):
return [c for c in self]
def union(self, q, name=None, monitor=False):
"""
Parameters
----------
q : Queue
queue to be unioned with self
name : str
name of the new queue |n|
if omitted, self.name() + q.name()
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the queue
Returns
-------
queue containing all elements of self and q : Queue
Note
----
the priority will be set to 0 for all components in the
resulting queue |n|
the order of the resulting queue is as follows: |n|
first all components of self, in that order,
followed by all components in q that are not in self,
in that order. |n|
Alternatively, the more pythonic | operator is also supported, e.g. q1 | q2
"""
save_trace = self.env._trace
self.env._trace = False
if name is None:
name = self.name() + " | " + q.name()
q1 = type(self)(name=name, monitor=monitor, env=self.env)
self_set = self.as_set()
mx = self._head.successor
while mx != self._tail:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, 0)
mx = mx.successor
mx = q._head.successor
while mx != q._tail:
if mx.component not in self_set:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, 0)
mx = mx.successor
self.env._trace = save_trace
return q1
def intersection(self, q, name=None, monitor=False):
"""
returns the intersect of two queues
Parameters
----------
q : Queue
queue to be intersected with self
name : str
name of the new queue |n|
if omitted, self.name() + q.name()
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the queue
Returns
-------
queue with all elements that are in self and q : Queue
Note
----
the priority will be set to 0 for all components in the
resulting queue |n|
the order of the resulting queue is as follows: |n|
in the same order as in self. |n|
Alternatively, the more pythonic & operator is also supported, e.g. q1 & q2
"""
save_trace = self.env._trace
self.env._trace = False
if name is None:
name = self.name() + " & " + q.name()
q1 = type(self)(name=name, monitor=monitor, env=self.env)
q_set = q.as_set()
mx = self._head.successor
while mx != self._tail:
if mx.component in q_set:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, 0)
mx = mx.successor
self.env._trace = save_trace
return q1
def difference(self, q, name=None, monitor=monitor):
"""
returns the difference of two queues
Parameters
----------
q : Queue
queue to be 'subtracted' from self
name : str
name of the new queue |n|
if omitted, self.name() - q.name()
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the queue
Returns
-------
queue containing all elements of self that are not in q
Note
----
the priority will be copied from the original queue.
Also, the order will be maintained. |n|
Alternatively, the more pythonic - operator is also supported, e.g. q1 - q2
"""
if name is None:
name = self.name() + " - " + q.name()
save_trace = self.env._trace
self.env._trace = False
q1 = type(self)(name=name, monitor=monitor, env=self.env)
q_set = q.as_set()
mx = self._head.successor
while mx != self._tail:
if mx.component not in q_set:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, mx.priority)
mx = mx.successor
self.env._trace = save_trace
return q1
def symmetric_difference(self, q, name=None, monitor=monitor):
"""
returns the symmetric difference of two queues
Parameters
----------
q : Queue
queue to be 'subtracted' from self
name : str
name of the new queue |n|
if omitted, self.name() - q.name()
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the queue
Returns
-------
queue containing all elements that are either in self or q, but not in both
Note
----
the priority of all elements will be set to 0 for all components in the new queue.
Order: First, elelements in self (in that order), then elements in q (in that order)
Alternatively, the more pythonic ^ operator is also supported, e.g. q1 ^ q2
"""
if name is None:
name = self.name() + " ^ " + q.name()
save_trace = self.env._trace
self.env._trace = False
q1 = type(self)(name=name, monitor=monitor, env=self.env)
intersection_set = self.as_set() & q.as_set()
mx = self._head.successor
while mx != self._tail:
if mx.component not in intersection_set:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, 0)
mx = mx.successor
mx = q._head.successor
while mx != q._tail:
if mx.component not in intersection_set:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, 0)
mx = mx.successor
self.env._trace = save_trace
return q1
def copy(self, name=None, monitor=monitor):
"""
returns a copy of two queues
Parameters
----------
name : str
name of the new queue |n|
if omitted, "copy of " + self.name()
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the queue
Returns
-------
queue with all elements of self : Queue
Note
----
The priority will be copied from original queue.
Also, the order will be maintained.
"""
save_trace = self.env._trace
self.env._trace = False
if name is None:
name = "copy of " + self.name()
q1 = type(self)(name=name, env=self.env)
mx = self._head.successor
while mx != self._tail:
Qmember().insert_in_front_of(q1._tail, mx.component, q1, mx.priority)
mx = mx.successor
self.env._trace = save_trace
return q1
def move(self, name=None, monitor=monitor):
"""
makes a copy of a queue and empties the original
Parameters
----------
name : str
name of the new queue
monitor : bool
if True, monitor the queue |n|
if False (default), do not monitor the yqueue
Returns
-------
queue containing all elements of self: Queue
Note
----
Priorities will be kept |n|
self will be emptied
"""
q1 = self.copy(name, monitor=monitor)
self.clear()
return q1
def clear(self):
"""
empties a queue
removes all components from a queue
"""
savetrace = self.env._trace
self.env._trace = False
mx = self._head.successor
while mx != self._tail:
c = mx.component
mx = mx.successor
c.leave(self)
self.env._trace = savetrace
if self.env._trace:
self.env.print_trace("", "", self.name() + " clear")
class Environment(object):
"""
environment object
Parameters
----------
trace : bool
defines whether to trace or not |n|
if omitted, False
random_seed : hashable object, usually int
the seed for random, equivalent to random.seed() |n|
if "*", a purely random value (based on the current time) will be used
(not reproducable) |n|
if the null string, no action on random is taken |n|
if None (the default), 1234567 will be used.
time_unit : str
Supported time_units: |n|
"years", "weeks", "days", "hours", "minutes", "seconds", "milliseconds", "microseconds", "n/a"
name : str
name of the environment |n|
if the name ends with a period (.),
auto serializing will be applied |n|
if the name end with a comma,
auto serializing starting at 1 will be applied |n|
if omitted, the name will be derived from the class (lowercased)
or "default environment" if isdefault_env is True.
print_trace_header : bool
if True (default) print a (two line) header line as a legend |n|
if False, do not print a header |n|
note that the header is only printed if trace=True
isdefault_env : bool
if True (default), this environment becomes the default environment |n|
if False, this environment will not be the default environment |n|
if omitted, this environment becomes the default environment |n|
Note
----
The trace may be switched on/off later with trace |n|
The seed may be later set with random_seed() |n|
Initially, the random stream will be seeded with the value 1234567.
If required to be purely, not not reproducable, values, use
random_seed="*".
"""
_nameserialize = {}
cached_modelname_width = [None, None]
def __init__(
self,
trace=False,
random_seed=None,
time_unit="n/a",
name=